Streaming APIs: How to Consume SSE and WebSocket Data
·APIScout Team
streamingwebsocketssereal-timeapi integration
Streaming APIs: How to Consume SSE and WebSocket Data
More APIs stream data instead of returning it all at once. AI APIs stream token-by-token. Financial APIs stream price updates. Chat APIs stream messages in real-time. Consuming streaming APIs requires different patterns than traditional request-response.
Three Types of Streaming
| Type | Direction | Protocol | Best For |
|---|---|---|---|
| Server-Sent Events (SSE) | Server → Client | HTTP | AI streaming, live feeds, notifications |
| WebSockets | Bidirectional | WS/WSS | Chat, gaming, collaborative editing |
| Streaming HTTP | Server → Client | HTTP (chunked) | File downloads, large responses |
Server-Sent Events (SSE)
How SSE Works
Client Server
│ │
│── GET /stream ──────────────→ │
│ Accept: text/event-stream │
│ │
│←── HTTP 200 ──────────────── │
│ Content-Type: text/event-stream
│ │
│←── data: {"token": "Hello"} │
│←── data: {"token": " world"} │
│←── data: {"token": "!"} │
│←── data: [DONE] │
│ │
Consuming SSE in the Browser
// Native EventSource API
const source = new EventSource('https://api.example.com/stream');
source.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Received:', data);
};
source.onerror = (error) => {
console.error('SSE error:', error);
// EventSource automatically reconnects
};
// Close when done
source.close();
Consuming SSE with fetch (More Control)
// fetch-based SSE — better for auth headers and error handling
async function consumeSSE(url: string, onEvent: (data: any) => void) {
const response = await fetch(url, {
headers: {
'Authorization': `Bearer ${API_KEY}`,
'Accept': 'text/event-stream',
},
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Parse SSE format: "data: {...}\n\n"
const lines = buffer.split('\n\n');
buffer = lines.pop() || ''; // Keep incomplete chunk
for (const chunk of lines) {
for (const line of chunk.split('\n')) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') return;
onEvent(JSON.parse(data));
}
}
}
}
}
// Usage
await consumeSSE('https://api.openai.com/v1/chat/completions', (data) => {
process.stdout.write(data.choices[0]?.delta?.content || '');
});
AI API Streaming Pattern
// Stream AI responses token-by-token
async function streamAIResponse(
prompt: string,
onToken: (token: string) => void,
onComplete: (fullResponse: string) => void
): Promise<void> {
const response = await fetch('https://api.anthropic.com/v1/messages', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'x-api-key': API_KEY,
'anthropic-version': '2023-06-01',
},
body: JSON.stringify({
model: 'claude-sonnet-4-20250514',
max_tokens: 1024,
stream: true,
messages: [{ role: 'user', content: prompt }],
}),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = '';
let fullResponse = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const data = line.slice(6);
if (data === '[DONE]') {
onComplete(fullResponse);
return;
}
const event = JSON.parse(data);
if (event.type === 'content_block_delta') {
const token = event.delta.text;
fullResponse += token;
onToken(token);
}
}
}
onComplete(fullResponse);
}
// React hook for streaming
function useStreamingAI() {
const [response, setResponse] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const stream = async (prompt: string) => {
setIsStreaming(true);
setResponse('');
await streamAIResponse(
prompt,
(token) => setResponse(prev => prev + token),
() => setIsStreaming(false)
);
};
return { response, isStreaming, stream };
}
WebSockets
Basic WebSocket Client
class WebSocketClient {
private ws: WebSocket | null = null;
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
private handlers: Map<string, ((data: any) => void)[]> = new Map();
connect(url: string) {
this.ws = new WebSocket(url);
this.ws.onopen = () => {
console.log('Connected');
this.reconnectAttempts = 0;
};
this.ws.onmessage = (event) => {
const message = JSON.parse(event.data);
const handlers = this.handlers.get(message.type) || [];
handlers.forEach(handler => handler(message.data));
};
this.ws.onclose = (event) => {
if (!event.wasClean && this.reconnectAttempts < this.maxReconnectAttempts) {
const delay = Math.pow(2, this.reconnectAttempts) * 1000;
console.log(`Reconnecting in ${delay}ms...`);
setTimeout(() => {
this.reconnectAttempts++;
this.connect(url);
}, delay);
}
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
};
}
on(type: string, handler: (data: any) => void) {
if (!this.handlers.has(type)) {
this.handlers.set(type, []);
}
this.handlers.get(type)!.push(handler);
}
send(type: string, data: any) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type, data }));
}
}
close() {
this.maxReconnectAttempts = 0; // Prevent reconnection
this.ws?.close();
}
}
// Usage
const ws = new WebSocketClient();
ws.connect('wss://api.example.com/ws');
ws.on('chat_message', (data) => {
console.log(`${data.sender}: ${data.text}`);
});
ws.on('price_update', (data) => {
console.log(`${data.symbol}: $${data.price}`);
});
ws.send('subscribe', { channel: 'BTC-USD' });
WebSocket with Authentication
// Method 1: Token in URL (common but less secure)
const ws = new WebSocket(`wss://api.example.com/ws?token=${TOKEN}`);
// Method 2: Auth message after connection
const ws = new WebSocket('wss://api.example.com/ws');
ws.onopen = () => {
ws.send(JSON.stringify({
type: 'auth',
token: TOKEN,
}));
};
// Method 3: Subprotocol
const ws = new WebSocket('wss://api.example.com/ws', [TOKEN]);
Choosing SSE vs WebSocket
| Factor | SSE | WebSocket |
|---|---|---|
| Direction | Server → Client only | Bidirectional |
| Protocol | HTTP | WS (separate protocol) |
| Auto-reconnect | Built-in | Manual |
| Auth headers | Via fetch | Token in URL or auth message |
| Binary data | No (text only) | Yes |
| Proxy support | Good (standard HTTP) | Some proxies block |
| Browser support | All modern browsers | All modern browsers |
| Use case | AI streaming, notifications, feeds | Chat, gaming, collaboration |
Decision Guide
Do you need to send data FROM the client?
YES → WebSocket
NO ↓
Is it AI token streaming?
YES → SSE (standard for AI APIs)
NO ↓
Do you need binary data?
YES → WebSocket
NO → SSE (simpler, auto-reconnect, HTTP-native)
Error Handling for Streams
// Robust streaming with error handling
async function robustStream(
url: string,
onData: (data: any) => void,
options: {
maxRetries?: number;
timeout?: number;
onError?: (error: Error) => void;
onReconnect?: (attempt: number) => void;
} = {}
) {
const { maxRetries = 3, timeout = 30000 } = options;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeout);
const response = await fetch(url, {
headers: { 'Authorization': `Bearer ${API_KEY}` },
signal: controller.signal,
});
clearTimeout(timeoutId);
if (!response.ok) throw new Error(`HTTP ${response.status}`);
const reader = response.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) return; // Stream completed successfully
const text = decoder.decode(value, { stream: true });
// Parse and deliver data...
onData(text);
}
} catch (error) {
if (attempt < maxRetries) {
options.onReconnect?.(attempt + 1);
await new Promise(r => setTimeout(r, Math.pow(2, attempt) * 1000));
continue;
}
options.onError?.(error as Error);
throw error;
}
}
}
Common Mistakes
| Mistake | Impact | Fix |
|---|---|---|
| No reconnection logic | Lost data on disconnect | Auto-reconnect with backoff |
| Not handling partial chunks | Corrupted data | Buffer and parse complete messages |
| Opening too many connections | Server resource exhaustion | Multiplex through single connection |
| No timeout on streams | Hanging connections | Set read timeout, abort stale streams |
| Ignoring backpressure | Memory overflow on slow consumers | Pause stream when buffer is full |
| Not closing connections | Resource leaks | Always close on unmount/cleanup |
Find APIs with streaming support on APIScout — SSE, WebSocket, and real-time capabilities compared across providers.