Back to site
ProsodyAI Docs
TypeScript SDK

Streaming

Process audio with WebSocket streaming

Streaming

The streaming API sends audio chunks over WebSocket and returns emotion, VAD, and prosodic signal estimates. End-to-end latency depends on chunk size, network path, API concurrency, model deployment, cold starts, and any downstream ASR.

Basic Usage

const stream = client.stream({
  vertical: 'contact_center',
  sessionId: 'call-12345',
});

// Handle events
stream.on('emotion', (data) => {
  console.log(`Emotion: ${data.emotion} (${data.confidence})`);
});

stream.on('prediction', (data) => {
  console.log(`Escalation risk: ${data.escalationRisk}`);
});

// Send audio chunks
stream.send(audioChunk);

// Close when done
await stream.close();

Stream Options

interface StreamOptions {
  // Required
  vertical: Vertical;
  
  // Optional
  sessionId?: string;          // Session identifier
  speakerId?: string;          // Speaker identifier
  sampleRate?: number;         // Audio sample rate (default: 16000)
  channels?: number;           // Audio channels (default: 1)
  
  // Callbacks
  onEmotion?: (data: EmotionEvent) => void;
  onPrediction?: (data: PredictionEvent) => void;
  onError?: (error: Error) => void;
  onClose?: () => void;
}

Events

Event Types

// Emotion detected
stream.on('emotion', (data: EmotionEvent) => {
  data.emotion;      // Detected emotion
  data.confidence;   // Confidence score
  data.valence;      // VAD valence
  data.arousal;      // VAD arousal
  data.dominance;    // VAD dominance
  data.timestamp;    // Event timestamp
});

// Conversation prediction updated
stream.on('prediction', (data: PredictionEvent) => {
  data.escalationRisk;    // 0-1 probability
  data.churnRisk;         // 0-1 probability
  data.predictedCsat;     // 1-5 score
  data.recommendedTone;   // Suggested agent tone
  data.confidence;        // Prediction confidence
});

// Vertical-specific state change
stream.on('state', (data: StateEvent) => {
  data.state;        // Vertical-specific state
  data.previous;     // Previous state
  data.metrics;      // Updated metrics
});

// Stream errors
stream.on('error', (error: Error) => {
  console.error('Stream error:', error);
});

// Stream closed
stream.on('close', () => {
  console.log('Stream closed');
});

Sending Audio

From MediaRecorder (Browser)

const stream = client.stream({ vertical: 'contact_center' });

navigator.mediaDevices.getUserMedia({ audio: true })
  .then((mediaStream) => {
    const recorder = new MediaRecorder(mediaStream, {
      mimeType: 'audio/webm;codecs=opus',
    });
    
    recorder.ondataavailable = (e) => {
      if (e.data.size > 0) {
        stream.send(e.data);
      }
    };
    
    recorder.start(100); // Send chunks every 100ms
  });

From Audio File (Simulated Real-Time)

import { createReadStream } from 'fs';

const stream = client.stream({ vertical: 'sales' });
const audioStream = createReadStream('call.wav', { highWaterMark: 4096 });

audioStream.on('data', (chunk) => {
  stream.send(chunk);
});

audioStream.on('end', async () => {
  await stream.close();
});

From WebRTC

const peerConnection = new RTCPeerConnection();
const stream = client.stream({ vertical: 'contact_center' });

peerConnection.ontrack = (event) => {
  const audioTrack = event.streams[0].getAudioTracks()[0];
  const processor = new MediaStreamTrackProcessor({ track: audioTrack });
  const reader = processor.readable.getReader();
  
  const pump = async () => {
    const { value, done } = await reader.read();
    if (done) return;
    
    stream.send(value);
    pump();
  };
  
  pump();
};

Session Management

Multi-Speaker Sessions

const stream = client.stream({
  vertical: 'contact_center',
  sessionId: 'call-12345',
});

// Specify speaker for each chunk
stream.send(audioChunk, { speakerId: 'customer' });
stream.send(audioChunk, { speakerId: 'agent' });

Session Metadata

const stream = client.stream({
  vertical: 'sales',
  sessionId: 'demo-call',
  metadata: {
    dealId: 'deal-456',
    stage: 'discovery',
    rep: 'john@company.com',
  },
});

Connection Management

Reconnection

const stream = client.stream({
  vertical: 'contact_center',
  // Auto-reconnect is enabled by default
});

stream.on('reconnect', (attempt) => {
  console.log(`Reconnecting... attempt ${attempt}`);
});

stream.on('reconnected', () => {
  console.log('Reconnected successfully');
});

Manual Reconnection

// Disable auto-reconnect
const stream = client.stream({
  vertical: 'contact_center',
  reconnect: false,
});

stream.on('close', async () => {
  // Manual reconnection logic
  await sleep(1000);
  stream.reconnect();
});

Graceful Shutdown

// Signal end of audio (waits for final predictions)
await stream.end();

// Force close (immediate)
stream.close();

Latency Considerations

Measure latency against your actual deployment before publishing targets. Smaller chunks can reduce waiting time, but also increase request overhead and may affect model quality.

const stream = client.stream({
  vertical: 'contact_center',
  sampleRate: 16000,
  // Tune for your latency/quality target
  lowLatency: true,
});

// Send smaller, more frequent chunks only after testing the tradeoff
mediaRecorder.start(100); // 100ms chunks

Complete Example

import { Prosody } from '@prosody/sdk';

const client = new Prosody({ apiKey: process.env.PROSODY_API_KEY });

async function analyzeCall(audioStream: ReadableStream) {
  const stream = client.stream({
    vertical: 'contact_center',
    sessionId: `call-${Date.now()}`,
  });
  
  // Track conversation state
  let escalationRisk = 0;
  let currentEmotion = 'neutral';
  
  stream.on('emotion', (data) => {
    currentEmotion = data.emotion;
    updateUI({ emotion: data.emotion, confidence: data.confidence });
  });
  
  stream.on('prediction', (data) => {
    escalationRisk = data.escalationRisk;
    
    if (data.escalationRisk > 0.7) {
      alertSupervisor(stream.sessionId);
    }
    
    updateUI({
      escalationRisk: data.escalationRisk,
      predictedCsat: data.predictedCsat,
      recommendedTone: data.recommendedTone,
    });
  });
  
  stream.on('error', (error) => {
    console.error('Stream error:', error);
  });
  
  // Pipe audio to stream
  const reader = audioStream.getReader();
  while (true) {
    const { value, done } = await reader.read();
    if (done) break;
    stream.send(value);
  }
  
  // Get final results
  const summary = await stream.end();
  
  return {
    sessionId: stream.sessionId,
    duration: summary.duration,
    finalEmotion: currentEmotion,
    peakEscalationRisk: Math.max(...summary.escalationHistory),
    averageCsat: summary.averagePredictedCsat,
  };
}