Overview
Streaming allows you to receive partial responses from the Perplexity API as they are generated, rather than waiting for the complete response. This is particularly useful for: - Real-time user experiences - Display responses as they’re generated
- Long responses - Start showing content immediately for lengthy analyses
- Interactive applications - Provide immediate feedback to users
Streaming is supported across all Perplexity models including Sonar, Sonar Pro, and reasoning models.
Quick Start
The easiest way to get started is with the Perplexity SDKs, which handle all the streaming parsing automatically. To enable streaming, set stream=True
(Python) or stream: true
(TypeScript) when creating completions: Python SDK
TypeScript SDK
cURL
from perplexity import Perplexity # Initialize the client (uses PERPLEXITY_API_KEY environment variable) client = Perplexity() # Create streaming completion stream = client.chat.completions.create( model="sonar", messages=[{"role": "user", "content": "What is the latest in AI research?"}], stream=True ) # Process streaming response for chunk in stream: if chunk.choices[0].delta.content: print(chunk.choices[0].delta.content, end="")
Search results and metadata are delivered in the final chunk(s) of a streaming response, not progressively during the stream.
When streaming, you receive: - Content chunks which arrive progressively in real-time
- Search results (delivered in the final chunk(s))
- Usage stats and other metadata
Python SDK
TypeScript SDK
Raw HTTP
from perplexity import Perplexity def stream_with_metadata(): client = Perplexity() stream = client.chat.completions.create( model="sonar", messages=[{"role": "user", "content": "Explain quantum computing"}], stream=True ) content = "" search_results = [] usage_info = None for chunk in stream: # Process content if chunk.choices[0].delta.content: content_piece = chunk.choices[0].delta.content content += content_piece print(content_piece, end='', flush=True) # Collect metadata from final chunks if hasattr(chunk, 'search_results') and chunk.search_results: search_results = chunk.search_results if hasattr(chunk, 'usage') and chunk.usage: usage_info = chunk.usage # Check if streaming is complete if chunk.choices[0].finish_reason: print(f"\n\nSearch Results: {search_results}") print(f"Usage: {usage_info}") return content, search_results, usage_info stream_with_metadata()
Error Handling
Proper error handling is important to ensure your application can recover from errors and provide a good user experience. Python SDK
TypeScript SDK
Raw HTTP
import perplexity from perplexity import Perplexity client = Perplexity() try: stream = client.chat.completions.create( model="sonar-pro", messages=[ {"role": "user", "content": "Explain machine learning concepts"} ], stream=True ) content = "" for chunk in stream: if chunk.choices[0].delta.content: content_chunk = chunk.choices[0].delta.content content += content_chunk print(content_chunk, end="") except perplexity.APIConnectionError as e: print(f"Network connection failed: {e}") except perplexity.RateLimitError as e: print(f"Rate limit exceeded, please retry later: {e}") except perplexity.APIStatusError as e: print(f"API error {e.status_code}: {e.response}") except Exception as e: print(f"Unexpected error: {e}")
Proper SSE Parsing
For production use, you should properly parse Server-Sent Events (SSE) format: # pip install sseclient-py import sseclient import requests import json def stream_with_proper_sse(): url = "https://api.perplexity.ai/chat/completions" headers = { "Authorization": "Bearer YOUR_API_KEY", "Content-Type": "application/json" } payload = { "model": "sonar", "messages": [{"role": "user", "content": "Explain quantum computing"}], "stream": True } response = requests.post(url, headers=headers, json=payload, stream=True) client = sseclient.SSEClient(response) for event in client.events(): if event.data == '[DONE]': break try: chunk_data = json.loads(event.data) content = chunk_data['choices'][0]['delta'].get('content', '') if content: print(content, end='') except json.JSONDecodeError: continue stream_with_proper_sse()
Advanced Streaming Patterns
Buffered Streaming
For applications that need to process chunks in batches: Python SDK
TypeScript SDK
from perplexity import Perplexity import time def buffered_streaming(buffer_size=50, flush_interval=1.0): client = Perplexity() stream = client.chat.completions.create( model="sonar", messages=[{"role": "user", "content": "Write a detailed explanation of machine learning"}], stream=True ) buffer = "" last_flush = time.time() for chunk in stream: if chunk.choices[0].delta.content: buffer += chunk.choices[0].delta.content # Flush buffer if it's full or enough time has passed if len(buffer) >= buffer_size or (time.time() - last_flush) >= flush_interval: print(buffer, end='', flush=True) buffer = "" last_flush = time.time() # Flush remaining buffer if buffer: print(buffer, end='', flush=True) buffered_streaming()
Stream Processing with Callbacks
For applications that need to process chunks with custom logic: Python SDK
TypeScript SDK
from perplexity import Perplexity from typing import Callable, Optional def stream_with_callbacks( query: str, on_content: Optional[Callable[[str], None]] = None, on_search_results: Optional[Callable[[list], None]] = None, on_complete: Optional[Callable[[str, dict], None]] = None ): client = Perplexity() stream = client.chat.completions.create( model="sonar", messages=[{"role": "user", "content": query}], stream=True ) full_content = "" metadata = {} for chunk in stream: # Handle content chunks if chunk.choices[0].delta.content: content_piece = chunk.choices[0].delta.content full_content += content_piece if on_content: on_content(content_piece) # Handle search results if hasattr(chunk, 'search_results') and chunk.search_results: metadata['search_results'] = chunk.search_results if on_search_results: on_search_results(chunk.search_results) # Handle other metadata if hasattr(chunk, 'usage') and chunk.usage: metadata['usage'] = chunk.usage # Handle completion if chunk.choices[0].finish_reason: if on_complete: on_complete(full_content, metadata) return full_content, metadata # Usage example def print_content(content: str): print(content, end='', flush=True) def handle_search_results(results: list): print(f"\n[Found {len(results)} sources]", end='') def handle_completion(content: str, metadata: dict): print(f"\n\nCompleted. Total length: {len(content)} characters") if 'usage' in metadata: print(f"Token usage: {metadata['usage']}") stream_with_callbacks( "Explain the latest developments in renewable energy", on_content=print_content, on_search_results=handle_search_results, on_complete=handle_completion )
Best Practices
Handle network interruptions
Implement reconnection logic for robust streaming applications. Python SDK
TypeScript SDK
import time import random from perplexity import Perplexity import perplexity def robust_streaming(query: str, max_retries: int = 3): client = Perplexity() for attempt in range(max_retries): try: stream = client.chat.completions.create( model="sonar", messages=[{"role": "user", "content": query}], stream=True ) for chunk in stream: if chunk.choices[0].delta.content: print(chunk.choices[0].delta.content, end='', flush=True) return # Success, exit retry loop except (perplexity.APIConnectionError, perplexity.APITimeoutError) as e: if attempt < max_retries - 1: delay = (2 ** attempt) + random.uniform(0, 1) print(f"\nConnection error, retrying in {delay:.1f}s...") time.sleep(delay) else: print(f"\nFailed after {max_retries} attempts: {e}") raise robust_streaming("Explain quantum computing")
Implement proper buffering
Use appropriate buffering strategies for your application’s needs.# For real-time chat applications buffer_size = 1 # Character-by-character for immediate display # For document processing buffer_size = 100 # Larger chunks for efficiency # For API responses buffer_size = 500 # Balance between responsiveness and efficiency
Handle metadata appropriately
Remember that search results and metadata arrive at the end of the stream.# Don't expect search results until the stream is complete if chunk.choices[0].finish_reason == "stop": # Now search results and usage info are available process_search_results(chunk.search_results) log_usage_stats(chunk.usage)
Optimize for your use case
Choose streaming parameters based on your application requirements. Real-time Chat
Document Generation
# Optimize for immediate response stream = client.chat.completions.create( model="sonar", messages=messages, stream=True, max_tokens=1000, # Reasonable limit temperature=0.7 # Balanced creativity )
Important: If you need search results immediately for your user interface, consider using non-streaming requests for use cases where search result display is critical to the real-time user experience.
Resources