Skip to main content

Overview

Streaming allows you to receive responses token-by-token as they’re generated, providing a better user experience for long responses and enabling real-time feedback.

Basic Streaming

from openai import OpenAI

client = OpenAI(
    base_url="http://localhost:18080/v1",
    api_key="sk-test-123",
)

stream = client.chat.completions.create(
    model="gemini-2.5-pro",
    messages=[{"role": "user", "content": "Tell me a story"}],
    stream=True,
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)

Server-Sent Events (SSE) Format

When streaming, switchAILocal sends data in SSE format:
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gemini-2.5-pro","choices":[{"index":0,"delta":{"content":"Once"},"finish_reason":null}]}

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gemini-2.5-pro","choices":[{"index":0,"delta":{"content":" upon"},"finish_reason":null}]}

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gemini-2.5-pro","choices":[{"index":0,"delta":{"content":" a"},"finish_reason":null}]}

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1694268190,"model":"gemini-2.5-pro","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}

data: [DONE]

Streaming with CLI Providers

CLI providers like Gemini, Claude, and Vibe support streaming:
from openai import OpenAI

client = OpenAI(
    base_url="http://localhost:18080/v1",
    api_key="sk-test-123",
)

# Stream from Gemini CLI
stream = client.chat.completions.create(
    model="geminicli:gemini-2.5-pro",
    messages=[{"role": "user", "content": "Explain quantum computing"}],
    stream=True,
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)
print()  # New line at end

Collecting Streamed Content

Accumulate the full response while streaming:
from openai import OpenAI

client = OpenAI(
    base_url="http://localhost:18080/v1",
    api_key="sk-test-123",
)

stream = client.chat.completions.create(
    model="gemini-2.5-pro",
    messages=[{"role": "user", "content": "Write a Python sorting function"}],
    stream=True,
)

full_response = ""
for chunk in stream:
    if chunk.choices[0].delta.content:
        content = chunk.choices[0].delta.content
        full_response += content
        print(content, end="", flush=True)

print("\n\n--- Full Response ---")
print(full_response)

Streaming with Callbacks

Python with Custom Handler

from openai import OpenAI
import time

class StreamHandler:
    def __init__(self):
        self.chunks = []
        self.start_time = None
        self.first_token_time = None
        
    def on_start(self):
        self.start_time = time.time()
        print("[Stream started]")
        
    def on_chunk(self, content):
        if self.first_token_time is None:
            self.first_token_time = time.time()
            ttft = (self.first_token_time - self.start_time) * 1000
            print(f"\n[First token: {ttft:.0f}ms]\n")
            
        self.chunks.append(content)
        print(content, end="", flush=True)
        
    def on_end(self):
        total_time = (time.time() - self.start_time) * 1000
        print(f"\n\n[Stream ended: {total_time:.0f}ms, {len(self.chunks)} chunks]")

client = OpenAI(
    base_url="http://localhost:18080/v1",
    api_key="sk-test-123",
)

handler = StreamHandler()
handler.on_start()

stream = client.chat.completions.create(
    model="gemini-2.5-pro",
    messages=[{"role": "user", "content": "Explain machine learning in 3 paragraphs"}],
    stream=True,
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        handler.on_chunk(chunk.choices[0].delta.content)
        
handler.on_end()

JavaScript with Event Emitter

import OpenAI from 'openai';
import { EventEmitter } from 'events';

class StreamHandler extends EventEmitter {
  constructor() {
    super();
    this.chunks = [];
    this.startTime = null;
    this.firstTokenTime = null;
  }

  async handleStream(stream) {
    this.startTime = Date.now();
    this.emit('start');

    for await (const chunk of stream) {
      const content = chunk.choices[0]?.delta?.content;
      if (content) {
        if (!this.firstTokenTime) {
          this.firstTokenTime = Date.now();
          const ttft = this.firstTokenTime - this.startTime;
          this.emit('firstToken', ttft);
        }
        this.chunks.push(content);
        this.emit('chunk', content);
      }
    }

    const totalTime = Date.now() - this.startTime;
    this.emit('end', { totalTime, chunks: this.chunks.length });
  }
}

const client = new OpenAI({
  baseURL: 'http://localhost:18080/v1',
  apiKey: 'sk-test-123',
});

const handler = new StreamHandler();

handler.on('start', () => console.log('[Stream started]'));
handler.on('firstToken', (ttft) => console.log(`\n[First token: ${ttft}ms]\n`));
handler.on('chunk', (content) => process.stdout.write(content));
handler.on('end', ({ totalTime, chunks }) => {
  console.log(`\n\n[Stream ended: ${totalTime}ms, ${chunks} chunks]`);
});

const stream = await client.chat.completions.create({
  model: 'gemini-2.5-pro',
  messages: [{ role: 'user', content: 'Explain machine learning in 3 paragraphs' }],
  stream: true,
});

await handler.handleStream(stream);

Streaming Long Outputs

For very long responses, streaming is essential:
from openai import OpenAI

client = OpenAI(
    base_url="http://localhost:18080/v1",
    api_key="sk-test-123",
)

stream = client.chat.completions.create(
    model="gemini-2.5-pro",
    messages=[
        {
            "role": "user",
            "content": "Write a detailed 2000-word essay on the history of artificial intelligence"
        }
    ],
    stream=True,
    max_tokens=3000,
)

word_count = 0
for chunk in stream:
    if chunk.choices[0].delta.content:
        content = chunk.choices[0].delta.content
        word_count += len(content.split())
        print(content, end="", flush=True)

print(f"\n\n[Total words: {word_count}]")

Streaming with Multiple Providers

Test streaming across different providers:
from openai import OpenAI
import time

client = OpenAI(
    base_url="http://localhost:18080/v1",
    api_key="sk-test-123",
)

providers = [
    "geminicli:gemini-2.5-pro",
    "ollama:llama3.2",
    "switchai:switchai-fast",
]

prompt = "Count to 10 slowly"

for model in providers:
    print(f"\n--- Testing {model} ---")
    start = time.time()
    
    stream = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    )
    
    chunk_count = 0
    for chunk in stream:
        if chunk.choices[0].delta.content:
            chunk_count += 1
            print(chunk.choices[0].delta.content, end="", flush=True)
    
    elapsed = (time.time() - start) * 1000
    print(f"\n[{chunk_count} chunks, {elapsed:.0f}ms]")

Error Handling in Streams

from openai import OpenAI, APIError
import time

client = OpenAI(
    base_url="http://localhost:18080/v1",
    api_key="sk-test-123",
)

try:
    stream = client.chat.completions.create(
        model="gemini-2.5-pro",
        messages=[{"role": "user", "content": "Tell me a story"}],
        stream=True,
    )
    
    for chunk in stream:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="", flush=True)
            
except APIError as e:
    print(f"\n\nStream error: {e.status_code} - {e.message}")
except Exception as e:
    print(f"\n\nUnexpected error: {e}")

Streaming with CLI Attachments

Combine streaming with file attachments:
from openai import OpenAI

client = OpenAI(
    base_url="http://localhost:18080/v1",
    api_key="sk-test-123",
)

stream = client.chat.completions.create(
    model="geminicli:gemini-2.5-pro",
    messages=[{"role": "user", "content": "Analyze this code and suggest improvements"}],
    stream=True,
    extra_body={
        "cli": {
            "attachments": [
                {"type": "file", "path": "./main.go"}
            ]
        }
    }
)

print("Analysis:\n")
for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)
print()

Measuring Stream Performance

from openai import OpenAI
import time

class StreamMetrics:
    def __init__(self):
        self.start_time = None
        self.first_token_time = None
        self.chunk_times = []
        self.total_tokens = 0
        
    def start(self):
        self.start_time = time.time()
        
    def record_chunk(self, content):
        current_time = time.time()
        
        if self.first_token_time is None:
            self.first_token_time = current_time
            
        self.chunk_times.append(current_time - self.start_time)
        self.total_tokens += len(content.split())
        
    def summary(self):
        total_time = (time.time() - self.start_time) * 1000
        ttft = (self.first_token_time - self.start_time) * 1000
        tokens_per_sec = self.total_tokens / (total_time / 1000)
        
        return {
            "total_time_ms": total_time,
            "time_to_first_token_ms": ttft,
            "chunks": len(self.chunk_times),
            "total_tokens": self.total_tokens,
            "tokens_per_second": tokens_per_sec,
        }

client = OpenAI(
    base_url="http://localhost:18080/v1",
    api_key="sk-test-123",
)

metrics = StreamMetrics()
metrics.start()

stream = client.chat.completions.create(
    model="gemini-2.5-pro",
    messages=[{"role": "user", "content": "Write a 500-word essay on climate change"}],
    stream=True,
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        content = chunk.choices[0].delta.content
        metrics.record_chunk(content)
        print(content, end="", flush=True)

print("\n\n--- Metrics ---")
for key, value in metrics.summary().items():
    print(f"{key}: {value:.2f}")

Raw SSE Parsing (Advanced)

For custom HTTP clients:
import requests
import json

url = "http://localhost:18080/v1/chat/completions"
headers = {
    "Content-Type": "application/json",
    "Authorization": "Bearer sk-test-123"
}
data = {
    "model": "gemini-2.5-pro",
    "messages": [{"role": "user", "content": "Tell me a joke"}],
    "stream": True
}

response = requests.post(url, headers=headers, json=data, stream=True)

for line in response.iter_lines():
    if line:
        line = line.decode('utf-8')
        if line.startswith('data: '):
            data_str = line[6:]  # Remove 'data: ' prefix
            if data_str == '[DONE]':
                break
            try:
                chunk = json.loads(data_str)
                content = chunk['choices'][0]['delta'].get('content', '')
                if content:
                    print(content, end='', flush=True)
            except json.JSONDecodeError:
                continue

print()

When to Use Streaming

Use streaming when:
  • Generating long-form content (essays, stories, code)
  • Building real-time chat interfaces
  • Providing user feedback during processing
  • Working with CLI providers that support streaming
Avoid streaming when:
  • You need the complete response for processing
  • Response is very short (<50 tokens)
  • Implementing retry logic (harder with streams)

Streaming Support by Provider

ProviderStreaming SupportNotes
Gemini CLI✅ FullNative streaming
Claude CLI✅ FullNative streaming
Vibe CLI✅ FullNative streaming
Codex CLI✅ FullNative streaming
Ollama✅ FullNative streaming
switchAI✅ FullCloud streaming
Gemini API✅ FullCloud streaming
Claude API✅ FullCloud streaming
OpenAI API✅ FullCloud streaming
LM Studio✅ FullLocal streaming

Next Steps