Back to site
ProsodyAI Docs
TypeScript SDK

Real-Time Streaming

Process audio in real-time with WebSocket streaming

Real-Time Streaming

The streaming API enables real-time emotion analysis with sub-500ms latency.

Basic Usage

const stream = client.stream({
  vertical: 'contact_center',
  sessionId: 'call-12345',
});

// Handle events
stream.on('emotion', (data) => {
  console.log(`Emotion: ${data.emotion} (${data.confidence})`);
});

stream.on('prediction', (data) => {
  console.log(`Escalation risk: ${data.escalationRisk}`);
});

// Send audio chunks
stream.send(audioChunk);

// Close when done
await stream.close();

Stream Options

interface StreamOptions {
  // Required
  vertical: Vertical;
  
  // Optional
  sessionId?: string;          // Session identifier
  speakerId?: string;          // Speaker identifier
  sampleRate?: number;         // Audio sample rate (default: 16000)
  channels?: number;           // Audio channels (default: 1)
  
  // Callbacks
  onEmotion?: (data: EmotionEvent) => void;
  onPrediction?: (data: PredictionEvent) => void;
  onError?: (error: Error) => void;
  onClose?: () => void;
}

Events

Event Types

// Emotion detected
stream.on('emotion', (data: EmotionEvent) => {
  data.emotion;      // Detected emotion
  data.confidence;   // Confidence score
  data.valence;      // VAD valence
  data.arousal;      // VAD arousal
  data.dominance;    // VAD dominance
  data.timestamp;    // Event timestamp
});

// Conversation prediction updated
stream.on('prediction', (data: PredictionEvent) => {
  data.escalationRisk;    // 0-1 probability
  data.churnRisk;         // 0-1 probability
  data.predictedCsat;     // 1-5 score
  data.recommendedTone;   // Suggested agent tone
  data.confidence;        // Prediction confidence
});

// Vertical-specific state change
stream.on('state', (data: StateEvent) => {
  data.state;        // Vertical-specific state
  data.previous;     // Previous state
  data.metrics;      // Updated metrics
});

// Stream errors
stream.on('error', (error: Error) => {
  console.error('Stream error:', error);
});

// Stream closed
stream.on('close', () => {
  console.log('Stream closed');
});

Sending Audio

From MediaRecorder (Browser)

const stream = client.stream({ vertical: 'contact_center' });

navigator.mediaDevices.getUserMedia({ audio: true })
  .then((mediaStream) => {
    const recorder = new MediaRecorder(mediaStream, {
      mimeType: 'audio/webm;codecs=opus',
    });
    
    recorder.ondataavailable = (e) => {
      if (e.data.size > 0) {
        stream.send(e.data);
      }
    };
    
    recorder.start(100); // Send chunks every 100ms
  });

From Audio File (Simulated Real-Time)

import { createReadStream } from 'fs';

const stream = client.stream({ vertical: 'sales' });
const audioStream = createReadStream('call.wav', { highWaterMark: 4096 });

audioStream.on('data', (chunk) => {
  stream.send(chunk);
});

audioStream.on('end', async () => {
  await stream.close();
});

From WebRTC

const peerConnection = new RTCPeerConnection();
const stream = client.stream({ vertical: 'contact_center' });

peerConnection.ontrack = (event) => {
  const audioTrack = event.streams[0].getAudioTracks()[0];
  const processor = new MediaStreamTrackProcessor({ track: audioTrack });
  const reader = processor.readable.getReader();
  
  const pump = async () => {
    const { value, done } = await reader.read();
    if (done) return;
    
    stream.send(value);
    pump();
  };
  
  pump();
};

Session Management

Multi-Speaker Sessions

const stream = client.stream({
  vertical: 'contact_center',
  sessionId: 'call-12345',
});

// Specify speaker for each chunk
stream.send(audioChunk, { speakerId: 'customer' });
stream.send(audioChunk, { speakerId: 'agent' });

Session Metadata

const stream = client.stream({
  vertical: 'sales',
  sessionId: 'demo-call',
  metadata: {
    dealId: 'deal-456',
    stage: 'discovery',
    rep: 'john@company.com',
  },
});

Connection Management

Reconnection

const stream = client.stream({
  vertical: 'contact_center',
  // Auto-reconnect is enabled by default
});

stream.on('reconnect', (attempt) => {
  console.log(`Reconnecting... attempt ${attempt}`);
});

stream.on('reconnected', () => {
  console.log('Reconnected successfully');
});

Manual Reconnection

// Disable auto-reconnect
const stream = client.stream({
  vertical: 'contact_center',
  reconnect: false,
});

stream.on('close', async () => {
  // Manual reconnection logic
  await sleep(1000);
  stream.reconnect();
});

Graceful Shutdown

// Signal end of audio (waits for final predictions)
await stream.end();

// Force close (immediate)
stream.close();

Latency Optimization

For lowest latency, use 16kHz sample rate with small chunk sizes (100-200ms of audio per chunk).

const stream = client.stream({
  vertical: 'contact_center',
  sampleRate: 16000,
  // Optimize for latency
  lowLatency: true,
});

// Send smaller, more frequent chunks
mediaRecorder.start(100); // 100ms chunks

Complete Example

import { Prosody } from '@prosody/sdk';

const client = new Prosody({ apiKey: process.env.PROSODY_API_KEY });

async function analyzeCall(audioStream: ReadableStream) {
  const stream = client.stream({
    vertical: 'contact_center',
    sessionId: `call-${Date.now()}`,
  });
  
  // Track conversation state
  let escalationRisk = 0;
  let currentEmotion = 'neutral';
  
  stream.on('emotion', (data) => {
    currentEmotion = data.emotion;
    updateUI({ emotion: data.emotion, confidence: data.confidence });
  });
  
  stream.on('prediction', (data) => {
    escalationRisk = data.escalationRisk;
    
    if (data.escalationRisk > 0.7) {
      alertSupervisor(stream.sessionId);
    }
    
    updateUI({
      escalationRisk: data.escalationRisk,
      predictedCsat: data.predictedCsat,
      recommendedTone: data.recommendedTone,
    });
  });
  
  stream.on('error', (error) => {
    console.error('Stream error:', error);
  });
  
  // Pipe audio to stream
  const reader = audioStream.getReader();
  while (true) {
    const { value, done } = await reader.read();
    if (done) break;
    stream.send(value);
  }
  
  // Get final results
  const summary = await stream.end();
  
  return {
    sessionId: stream.sessionId,
    duration: summary.duration,
    finalEmotion: currentEmotion,
    peakEscalationRisk: Math.max(...summary.escalationHistory),
    averageCsat: summary.averagePredictedCsat,
  };
}