Overview
Streaming allows you to receive responses token-by-token as they’re generated, providing a better user experience for long responses and enabling real-time feedback.Basic Streaming
Copy
Ask AI
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:18080/v1",
api_key="sk-test-123",
)
stream = client.chat.completions.create(
model="gemini-2.5-pro",
messages=[{"role": "user", "content": "Tell me a story"}],
stream=True,
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
Server-Sent Events (SSE) Format
When streaming, switchAILocal sends data in SSE format:Copy
Ask AI
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gemini-2.5-pro","choices":[{"index":0,"delta":{"content":"Once"},"finish_reason":null}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gemini-2.5-pro","choices":[{"index":0,"delta":{"content":" upon"},"finish_reason":null}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gemini-2.5-pro","choices":[{"index":0,"delta":{"content":" a"},"finish_reason":null}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gemini-2.5-pro","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
data: [DONE]
Streaming with CLI Providers
CLI providers like Gemini, Claude, and Vibe support streaming:Copy
Ask AI
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:18080/v1",
api_key="sk-test-123",
)
# Stream from Gemini CLI
stream = client.chat.completions.create(
model="geminicli:gemini-2.5-pro",
messages=[{"role": "user", "content": "Explain quantum computing"}],
stream=True,
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print() # New line at end
Collecting Streamed Content
Accumulate the full response while streaming:Copy
Ask AI
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:18080/v1",
api_key="sk-test-123",
)
stream = client.chat.completions.create(
model="gemini-2.5-pro",
messages=[{"role": "user", "content": "Write a Python sorting function"}],
stream=True,
)
full_response = ""
for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
full_response += content
print(content, end="", flush=True)
print("\n\n--- Full Response ---")
print(full_response)
Streaming with Callbacks
Python with Custom Handler
Copy
Ask AI
from openai import OpenAI
import time
class StreamHandler:
def __init__(self):
self.chunks = []
self.start_time = None
self.first_token_time = None
def on_start(self):
self.start_time = time.time()
print("[Stream started]")
def on_chunk(self, content):
if self.first_token_time is None:
self.first_token_time = time.time()
ttft = (self.first_token_time - self.start_time) * 1000
print(f"\n[First token: {ttft:.0f}ms]\n")
self.chunks.append(content)
print(content, end="", flush=True)
def on_end(self):
total_time = (time.time() - self.start_time) * 1000
print(f"\n\n[Stream ended: {total_time:.0f}ms, {len(self.chunks)} chunks]")
client = OpenAI(
base_url="http://localhost:18080/v1",
api_key="sk-test-123",
)
handler = StreamHandler()
handler.on_start()
stream = client.chat.completions.create(
model="gemini-2.5-pro",
messages=[{"role": "user", "content": "Explain machine learning in 3 paragraphs"}],
stream=True,
)
for chunk in stream:
if chunk.choices[0].delta.content:
handler.on_chunk(chunk.choices[0].delta.content)
handler.on_end()
JavaScript with Event Emitter
Copy
Ask AI
import OpenAI from 'openai';
import { EventEmitter } from 'events';
class StreamHandler extends EventEmitter {
constructor() {
super();
this.chunks = [];
this.startTime = null;
this.firstTokenTime = null;
}
async handleStream(stream) {
this.startTime = Date.now();
this.emit('start');
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
if (!this.firstTokenTime) {
this.firstTokenTime = Date.now();
const ttft = this.firstTokenTime - this.startTime;
this.emit('firstToken', ttft);
}
this.chunks.push(content);
this.emit('chunk', content);
}
}
const totalTime = Date.now() - this.startTime;
this.emit('end', { totalTime, chunks: this.chunks.length });
}
}
const client = new OpenAI({
baseURL: 'http://localhost:18080/v1',
apiKey: 'sk-test-123',
});
const handler = new StreamHandler();
handler.on('start', () => console.log('[Stream started]'));
handler.on('firstToken', (ttft) => console.log(`\n[First token: ${ttft}ms]\n`));
handler.on('chunk', (content) => process.stdout.write(content));
handler.on('end', ({ totalTime, chunks }) => {
console.log(`\n\n[Stream ended: ${totalTime}ms, ${chunks} chunks]`);
});
const stream = await client.chat.completions.create({
model: 'gemini-2.5-pro',
messages: [{ role: 'user', content: 'Explain machine learning in 3 paragraphs' }],
stream: true,
});
await handler.handleStream(stream);
Streaming Long Outputs
For very long responses, streaming is essential:Copy
Ask AI
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:18080/v1",
api_key="sk-test-123",
)
stream = client.chat.completions.create(
model="gemini-2.5-pro",
messages=[
{
"role": "user",
"content": "Write a detailed 2000-word essay on the history of artificial intelligence"
}
],
stream=True,
max_tokens=3000,
)
word_count = 0
for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
word_count += len(content.split())
print(content, end="", flush=True)
print(f"\n\n[Total words: {word_count}]")
Streaming with Multiple Providers
Test streaming across different providers:Copy
Ask AI
from openai import OpenAI
import time
client = OpenAI(
base_url="http://localhost:18080/v1",
api_key="sk-test-123",
)
providers = [
"geminicli:gemini-2.5-pro",
"ollama:llama3.2",
"switchai:switchai-fast",
]
prompt = "Count to 10 slowly"
for model in providers:
print(f"\n--- Testing {model} ---")
start = time.time()
stream = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
stream=True,
)
chunk_count = 0
for chunk in stream:
if chunk.choices[0].delta.content:
chunk_count += 1
print(chunk.choices[0].delta.content, end="", flush=True)
elapsed = (time.time() - start) * 1000
print(f"\n[{chunk_count} chunks, {elapsed:.0f}ms]")
Error Handling in Streams
Copy
Ask AI
from openai import OpenAI, APIError
import time
client = OpenAI(
base_url="http://localhost:18080/v1",
api_key="sk-test-123",
)
try:
stream = client.chat.completions.create(
model="gemini-2.5-pro",
messages=[{"role": "user", "content": "Tell me a story"}],
stream=True,
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
except APIError as e:
print(f"\n\nStream error: {e.status_code} - {e.message}")
except Exception as e:
print(f"\n\nUnexpected error: {e}")
Streaming with CLI Attachments
Combine streaming with file attachments:Copy
Ask AI
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:18080/v1",
api_key="sk-test-123",
)
stream = client.chat.completions.create(
model="geminicli:gemini-2.5-pro",
messages=[{"role": "user", "content": "Analyze this code and suggest improvements"}],
stream=True,
extra_body={
"cli": {
"attachments": [
{"type": "file", "path": "./main.go"}
]
}
}
)
print("Analysis:\n")
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print()
Measuring Stream Performance
Copy
Ask AI
from openai import OpenAI
import time
class StreamMetrics:
def __init__(self):
self.start_time = None
self.first_token_time = None
self.chunk_times = []
self.total_tokens = 0
def start(self):
self.start_time = time.time()
def record_chunk(self, content):
current_time = time.time()
if self.first_token_time is None:
self.first_token_time = current_time
self.chunk_times.append(current_time - self.start_time)
self.total_tokens += len(content.split())
def summary(self):
total_time = (time.time() - self.start_time) * 1000
ttft = (self.first_token_time - self.start_time) * 1000
tokens_per_sec = self.total_tokens / (total_time / 1000)
return {
"total_time_ms": total_time,
"time_to_first_token_ms": ttft,
"chunks": len(self.chunk_times),
"total_tokens": self.total_tokens,
"tokens_per_second": tokens_per_sec,
}
client = OpenAI(
base_url="http://localhost:18080/v1",
api_key="sk-test-123",
)
metrics = StreamMetrics()
metrics.start()
stream = client.chat.completions.create(
model="gemini-2.5-pro",
messages=[{"role": "user", "content": "Write a 500-word essay on climate change"}],
stream=True,
)
for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
metrics.record_chunk(content)
print(content, end="", flush=True)
print("\n\n--- Metrics ---")
for key, value in metrics.summary().items():
print(f"{key}: {value:.2f}")
Raw SSE Parsing (Advanced)
For custom HTTP clients:Copy
Ask AI
import requests
import json
url = "http://localhost:18080/v1/chat/completions"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer sk-test-123"
}
data = {
"model": "gemini-2.5-pro",
"messages": [{"role": "user", "content": "Tell me a joke"}],
"stream": True
}
response = requests.post(url, headers=headers, json=data, stream=True)
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data_str = line[6:] # Remove 'data: ' prefix
if data_str == '[DONE]':
break
try:
chunk = json.loads(data_str)
content = chunk['choices'][0]['delta'].get('content', '')
if content:
print(content, end='', flush=True)
except json.JSONDecodeError:
continue
print()
When to Use Streaming
Use streaming when:
- Generating long-form content (essays, stories, code)
- Building real-time chat interfaces
- Providing user feedback during processing
- Working with CLI providers that support streaming
Avoid streaming when:
- You need the complete response for processing
- Response is very short (<50 tokens)
- Implementing retry logic (harder with streams)
Streaming Support by Provider
| Provider | Streaming Support | Notes |
|---|---|---|
| Gemini CLI | ✅ Full | Native streaming |
| Claude CLI | ✅ Full | Native streaming |
| Vibe CLI | ✅ Full | Native streaming |
| Codex CLI | ✅ Full | Native streaming |
| Ollama | ✅ Full | Native streaming |
| switchAI | ✅ Full | Cloud streaming |
| Gemini API | ✅ Full | Cloud streaming |
| Claude API | ✅ Full | Cloud streaming |
| OpenAI API | ✅ Full | Cloud streaming |
| LM Studio | ✅ Full | Local streaming |