Skip to main content

Streaming APIs: How to Consume SSE and WebSocket Data

·APIScout Team
streamingwebsocketssereal-timeapi integration

Streaming APIs: How to Consume SSE and WebSocket Data

More APIs stream data instead of returning it all at once. AI APIs stream token-by-token. Financial APIs stream price updates. Chat APIs stream messages in real-time. Consuming streaming APIs requires different patterns than traditional request-response.

Three Types of Streaming

TypeDirectionProtocolBest For
Server-Sent Events (SSE)Server → ClientHTTPAI streaming, live feeds, notifications
WebSocketsBidirectionalWS/WSSChat, gaming, collaborative editing
Streaming HTTPServer → ClientHTTP (chunked)File downloads, large responses

Server-Sent Events (SSE)

How SSE Works

Client                          Server
  │                               │
  │── GET /stream ──────────────→ │
  │   Accept: text/event-stream   │
  │                               │
  │←── HTTP 200 ──────────────── │
  │    Content-Type: text/event-stream
  │                               │
  │←── data: {"token": "Hello"}  │
  │←── data: {"token": " world"} │
  │←── data: {"token": "!"}     │
  │←── data: [DONE]             │
  │                               │

Consuming SSE in the Browser

// Native EventSource API
const source = new EventSource('https://api.example.com/stream');

source.onmessage = (event) => {
  const data = JSON.parse(event.data);
  console.log('Received:', data);
};

source.onerror = (error) => {
  console.error('SSE error:', error);
  // EventSource automatically reconnects
};

// Close when done
source.close();

Consuming SSE with fetch (More Control)

// fetch-based SSE — better for auth headers and error handling
async function consumeSSE(url: string, onEvent: (data: any) => void) {
  const response = await fetch(url, {
    headers: {
      'Authorization': `Bearer ${API_KEY}`,
      'Accept': 'text/event-stream',
    },
  });

  if (!response.ok) throw new Error(`HTTP ${response.status}`);

  const reader = response.body!.getReader();
  const decoder = new TextDecoder();
  let buffer = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });

    // Parse SSE format: "data: {...}\n\n"
    const lines = buffer.split('\n\n');
    buffer = lines.pop() || ''; // Keep incomplete chunk

    for (const chunk of lines) {
      for (const line of chunk.split('\n')) {
        if (line.startsWith('data: ')) {
          const data = line.slice(6);
          if (data === '[DONE]') return;
          onEvent(JSON.parse(data));
        }
      }
    }
  }
}

// Usage
await consumeSSE('https://api.openai.com/v1/chat/completions', (data) => {
  process.stdout.write(data.choices[0]?.delta?.content || '');
});

AI API Streaming Pattern

// Stream AI responses token-by-token
async function streamAIResponse(
  prompt: string,
  onToken: (token: string) => void,
  onComplete: (fullResponse: string) => void
): Promise<void> {
  const response = await fetch('https://api.anthropic.com/v1/messages', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'x-api-key': API_KEY,
      'anthropic-version': '2023-06-01',
    },
    body: JSON.stringify({
      model: 'claude-sonnet-4-20250514',
      max_tokens: 1024,
      stream: true,
      messages: [{ role: 'user', content: prompt }],
    }),
  });

  const reader = response.body!.getReader();
  const decoder = new TextDecoder();
  let buffer = '';
  let fullResponse = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    buffer += decoder.decode(value, { stream: true });
    const lines = buffer.split('\n');
    buffer = lines.pop() || '';

    for (const line of lines) {
      if (!line.startsWith('data: ')) continue;
      const data = line.slice(6);
      if (data === '[DONE]') {
        onComplete(fullResponse);
        return;
      }

      const event = JSON.parse(data);
      if (event.type === 'content_block_delta') {
        const token = event.delta.text;
        fullResponse += token;
        onToken(token);
      }
    }
  }

  onComplete(fullResponse);
}

// React hook for streaming
function useStreamingAI() {
  const [response, setResponse] = useState('');
  const [isStreaming, setIsStreaming] = useState(false);

  const stream = async (prompt: string) => {
    setIsStreaming(true);
    setResponse('');

    await streamAIResponse(
      prompt,
      (token) => setResponse(prev => prev + token),
      () => setIsStreaming(false)
    );
  };

  return { response, isStreaming, stream };
}

WebSockets

Basic WebSocket Client

class WebSocketClient {
  private ws: WebSocket | null = null;
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 5;
  private handlers: Map<string, ((data: any) => void)[]> = new Map();

  connect(url: string) {
    this.ws = new WebSocket(url);

    this.ws.onopen = () => {
      console.log('Connected');
      this.reconnectAttempts = 0;
    };

    this.ws.onmessage = (event) => {
      const message = JSON.parse(event.data);
      const handlers = this.handlers.get(message.type) || [];
      handlers.forEach(handler => handler(message.data));
    };

    this.ws.onclose = (event) => {
      if (!event.wasClean && this.reconnectAttempts < this.maxReconnectAttempts) {
        const delay = Math.pow(2, this.reconnectAttempts) * 1000;
        console.log(`Reconnecting in ${delay}ms...`);
        setTimeout(() => {
          this.reconnectAttempts++;
          this.connect(url);
        }, delay);
      }
    };

    this.ws.onerror = (error) => {
      console.error('WebSocket error:', error);
    };
  }

  on(type: string, handler: (data: any) => void) {
    if (!this.handlers.has(type)) {
      this.handlers.set(type, []);
    }
    this.handlers.get(type)!.push(handler);
  }

  send(type: string, data: any) {
    if (this.ws?.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify({ type, data }));
    }
  }

  close() {
    this.maxReconnectAttempts = 0; // Prevent reconnection
    this.ws?.close();
  }
}

// Usage
const ws = new WebSocketClient();
ws.connect('wss://api.example.com/ws');

ws.on('chat_message', (data) => {
  console.log(`${data.sender}: ${data.text}`);
});

ws.on('price_update', (data) => {
  console.log(`${data.symbol}: $${data.price}`);
});

ws.send('subscribe', { channel: 'BTC-USD' });

WebSocket with Authentication

// Method 1: Token in URL (common but less secure)
const ws = new WebSocket(`wss://api.example.com/ws?token=${TOKEN}`);

// Method 2: Auth message after connection
const ws = new WebSocket('wss://api.example.com/ws');
ws.onopen = () => {
  ws.send(JSON.stringify({
    type: 'auth',
    token: TOKEN,
  }));
};

// Method 3: Subprotocol
const ws = new WebSocket('wss://api.example.com/ws', [TOKEN]);

Choosing SSE vs WebSocket

FactorSSEWebSocket
DirectionServer → Client onlyBidirectional
ProtocolHTTPWS (separate protocol)
Auto-reconnectBuilt-inManual
Auth headersVia fetchToken in URL or auth message
Binary dataNo (text only)Yes
Proxy supportGood (standard HTTP)Some proxies block
Browser supportAll modern browsersAll modern browsers
Use caseAI streaming, notifications, feedsChat, gaming, collaboration

Decision Guide

Do you need to send data FROM the client?
  YES → WebSocket
  NO ↓

Is it AI token streaming?
  YES → SSE (standard for AI APIs)
  NO ↓

Do you need binary data?
  YES → WebSocket
  NO → SSE (simpler, auto-reconnect, HTTP-native)

Error Handling for Streams

// Robust streaming with error handling
async function robustStream(
  url: string,
  onData: (data: any) => void,
  options: {
    maxRetries?: number;
    timeout?: number;
    onError?: (error: Error) => void;
    onReconnect?: (attempt: number) => void;
  } = {}
) {
  const { maxRetries = 3, timeout = 30000 } = options;

  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      const controller = new AbortController();
      const timeoutId = setTimeout(() => controller.abort(), timeout);

      const response = await fetch(url, {
        headers: { 'Authorization': `Bearer ${API_KEY}` },
        signal: controller.signal,
      });

      clearTimeout(timeoutId);

      if (!response.ok) throw new Error(`HTTP ${response.status}`);

      const reader = response.body!.getReader();
      const decoder = new TextDecoder();

      while (true) {
        const { done, value } = await reader.read();
        if (done) return; // Stream completed successfully

        const text = decoder.decode(value, { stream: true });
        // Parse and deliver data...
        onData(text);
      }
    } catch (error) {
      if (attempt < maxRetries) {
        options.onReconnect?.(attempt + 1);
        await new Promise(r => setTimeout(r, Math.pow(2, attempt) * 1000));
        continue;
      }
      options.onError?.(error as Error);
      throw error;
    }
  }
}

Common Mistakes

MistakeImpactFix
No reconnection logicLost data on disconnectAuto-reconnect with backoff
Not handling partial chunksCorrupted dataBuffer and parse complete messages
Opening too many connectionsServer resource exhaustionMultiplex through single connection
No timeout on streamsHanging connectionsSet read timeout, abort stale streams
Ignoring backpressureMemory overflow on slow consumersPause stream when buffer is full
Not closing connectionsResource leaksAlways close on unmount/cleanup

Find APIs with streaming support on APIScout — SSE, WebSocket, and real-time capabilities compared across providers.

Comments