Skip to content
2 changes: 0 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,5 @@ jobs:
enable-cache: true
- name: Install dependencies
run: make sync
- name: Install Python 3.9 dependencies
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to makefile

run: UV_PROJECT_ENVIRONMENT=.venv_39 uv sync --all-extras --all-packages --group dev
- name: Run tests
run: make old_version_tests
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ celerybeat.pid
*.sage.py

# Environments
.env
.python-version
.env*
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for local python 3.9 tests

.venv
env/
venv/
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ snapshots-create:
uv run pytest --inline-snapshot=create

.PHONY: old_version_tests
old_version_tests:
old_version_tests:
UV_PROJECT_ENVIRONMENT=.venv_39 uv sync --python 3.9 --all-extras --all-packages --group dev
UV_PROJECT_ENVIRONMENT=.venv_39 uv run --python 3.9 -m pytest

.PHONY: build-docs
Expand Down
11 changes: 8 additions & 3 deletions examples/realtime/app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,19 @@ To use the same UI with your own agents, edit `agent.py` and ensure get_starting
1. Click **Connect** to establish a realtime session
2. Audio capture starts automatically - just speak naturally
3. Click the **Mic On/Off** button to mute/unmute your microphone
4. Watch the conversation unfold in the left pane
5. Monitor raw events in the right pane (click to expand/collapse)
6. Click **Disconnect** when done
4. To send an image, enter an optional prompt and click **🖼️ Send Image** (select a file)
5. Watch the conversation unfold in the left pane (image thumbnails are shown)
6. Monitor raw events in the right pane (click to expand/collapse)
7. Click **Disconnect** when done

## Architecture

- **Backend**: FastAPI server with WebSocket connections for real-time communication
- **Session Management**: Each connection gets a unique session with the OpenAI Realtime API
- **Image Inputs**: The UI uploads images and the server forwards a
`conversation.item.create` event with `input_image` (plus optional `input_text`),
followed by `response.create` to start the model response. The messages pane
renders image bubbles for `input_image` content.
- **Audio Processing**: 24kHz mono audio capture and playback
- **Event Handling**: Full event stream processing with transcript generation
- **Frontend**: Vanilla JavaScript with clean, responsive CSS
Expand Down
163 changes: 161 additions & 2 deletions examples/realtime/app/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from typing_extensions import assert_never

from agents.realtime import RealtimeRunner, RealtimeSession, RealtimeSessionEvent
from agents.realtime.config import RealtimeUserInputMessage
from agents.realtime.model_inputs import RealtimeModelSendRawMessage

# Import TwilioHandler class - handle both module and package use cases
if TYPE_CHECKING:
Expand Down Expand Up @@ -64,6 +66,34 @@ async def send_audio(self, session_id: str, audio_bytes: bytes):
if session_id in self.active_sessions:
await self.active_sessions[session_id].send_audio(audio_bytes)

async def send_client_event(self, session_id: str, event: dict[str, Any]):
"""Send a raw client event to the underlying realtime model."""
session = self.active_sessions.get(session_id)
if not session:
return
await session.model.send_event(
RealtimeModelSendRawMessage(
message={
"type": event["type"],
"other_data": {k: v for k, v in event.items() if k != "type"},
}
)
)

async def send_user_message(self, session_id: str, message: RealtimeUserInputMessage):
"""Send a structured user message via the higher-level API (supports input_image)."""
session = self.active_sessions.get(session_id)
if not session:
return
await session.send_message(message) # delegates to RealtimeModelSendUserInput path

async def interrupt(self, session_id: str) -> None:
"""Interrupt current model playback/response for a session."""
session = self.active_sessions.get(session_id)
if not session:
return
await session.interrupt()

async def _process_events(self, session_id: str):
try:
session = self.active_sessions[session_id]
Expand Down Expand Up @@ -101,7 +131,11 @@ async def _serialize_event(self, event: RealtimeSessionEvent) -> dict[str, Any]:
elif event.type == "history_updated":
base_event["history"] = [item.model_dump(mode="json") for item in event.history]
elif event.type == "history_added":
pass
# Provide the added item so the UI can render incrementally.
try:
base_event["item"] = event.item.model_dump(mode="json")
except Exception:
base_event["item"] = None
elif event.type == "guardrail_tripped":
base_event["guardrail_results"] = [
{"name": result.guardrail.name} for result in event.guardrail_results
Expand Down Expand Up @@ -134,6 +168,7 @@ async def lifespan(app: FastAPI):
@app.websocket("/ws/{session_id}")
async def websocket_endpoint(websocket: WebSocket, session_id: str):
await manager.connect(websocket, session_id)
image_buffers: dict[str, dict[str, Any]] = {}
try:
while True:
data = await websocket.receive_text()
Expand All @@ -144,6 +179,124 @@ async def websocket_endpoint(websocket: WebSocket, session_id: str):
int16_data = message["data"]
audio_bytes = struct.pack(f"{len(int16_data)}h", *int16_data)
await manager.send_audio(session_id, audio_bytes)
elif message["type"] == "image":
logger.info("Received image message from client (session %s).", session_id)
# Build a conversation.item.create with input_image (and optional input_text)
data_url = message.get("data_url")
prompt_text = message.get("text") or "Please describe this image."
if data_url:
logger.info(
"Forwarding image (structured message) to Realtime API (len=%d).",
len(data_url),
)
user_msg: RealtimeUserInputMessage = {
"type": "message",
"role": "user",
"content": (
[
{"type": "input_image", "image_url": data_url, "detail": "high"},
{"type": "input_text", "text": prompt_text},
]
if prompt_text
else [
{"type": "input_image", "image_url": data_url, "detail": "high"}
]
),
}
await manager.send_user_message(session_id, user_msg)
# Acknowledge to client UI
await websocket.send_text(
json.dumps(
{
"type": "client_info",
"info": "image_enqueued",
"size": len(data_url),
}
)
)
else:
await websocket.send_text(
json.dumps(
{
"type": "error",
"error": "No data_url for image message.",
}
)
)
elif message["type"] == "commit_audio":
# Force close the current input audio turn
await manager.send_client_event(session_id, {"type": "input_audio_buffer.commit"})
elif message["type"] == "image_start":
img_id = str(message.get("id"))
image_buffers[img_id] = {
"text": message.get("text") or "Please describe this image.",
"chunks": [],
}
await websocket.send_text(
json.dumps({"type": "client_info", "info": "image_start_ack", "id": img_id})
)
elif message["type"] == "image_chunk":
img_id = str(message.get("id"))
chunk = message.get("chunk", "")
if img_id in image_buffers:
image_buffers[img_id]["chunks"].append(chunk)
if len(image_buffers[img_id]["chunks"]) % 10 == 0:
await websocket.send_text(
json.dumps(
{
"type": "client_info",
"info": "image_chunk_ack",
"id": img_id,
"count": len(image_buffers[img_id]["chunks"]),
}
)
)
elif message["type"] == "image_end":
img_id = str(message.get("id"))
buf = image_buffers.pop(img_id, None)
if buf is None:
await websocket.send_text(
json.dumps({"type": "error", "error": "Unknown image id for image_end."})
)
else:
data_url = "".join(buf["chunks"]) if buf["chunks"] else None
prompt_text = buf["text"]
if data_url:
logger.info(
"Forwarding chunked image (structured message) to Realtime API (len=%d).",
len(data_url),
)
user_msg2: RealtimeUserInputMessage = {
"type": "message",
"role": "user",
"content": (
[
{"type": "input_image", "image_url": data_url, "detail": "high"},
{"type": "input_text", "text": prompt_text},
]
if prompt_text
else [
{"type": "input_image", "image_url": data_url, "detail": "high"}
]
),
}
await manager.send_user_message(session_id, user_msg2)
await websocket.send_text(
json.dumps(
{
"type": "client_info",
"info": "image_enqueued",
"id": img_id,
"size": len(data_url),
}
)
)
else:
await websocket.send_text(
json.dumps({"type": "error", "error": "Empty image."})
)
elif message["type"] == "interrupt":
await manager.interrupt(session_id)

except WebSocketDisconnect:
await manager.disconnect(session_id)
Expand All @@ -160,4 +313,10 @@ async def read_index():
if __name__ == "__main__":
import uvicorn

uvicorn.run(app, host="0.0.0.0", port=8000)
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
# Increased WebSocket frame size to comfortably handle image data URLs.
ws_max_size=16 * 1024 * 1024,
)
Loading