|  | 
|  | 1 | +"""Minimal FastAPI server for handling OpenAI Realtime SIP calls with Twilio.""" | 
|  | 2 | + | 
|  | 3 | +from __future__ import annotations | 
|  | 4 | + | 
|  | 5 | +import asyncio | 
|  | 6 | +import logging | 
|  | 7 | +import os | 
|  | 8 | + | 
|  | 9 | +import websockets | 
|  | 10 | +from fastapi import FastAPI, HTTPException, Request, Response | 
|  | 11 | +from openai import APIStatusError, AsyncOpenAI, InvalidWebhookSignatureError | 
|  | 12 | + | 
|  | 13 | +from agents.realtime.config import RealtimeSessionModelSettings | 
|  | 14 | +from agents.realtime.items import ( | 
|  | 15 | + AssistantAudio, | 
|  | 16 | + AssistantMessageItem, | 
|  | 17 | + AssistantText, | 
|  | 18 | + InputText, | 
|  | 19 | + UserMessageItem, | 
|  | 20 | +) | 
|  | 21 | +from agents.realtime.model_inputs import RealtimeModelSendRawMessage | 
|  | 22 | +from agents.realtime.openai_realtime import OpenAIRealtimeSIPModel | 
|  | 23 | +from agents.realtime.runner import RealtimeRunner | 
|  | 24 | + | 
|  | 25 | +from .agents import WELCOME_MESSAGE, get_starting_agent | 
|  | 26 | + | 
|  | 27 | +logging.basicConfig(level=logging.INFO) | 
|  | 28 | + | 
|  | 29 | +logger = logging.getLogger("twilio_sip_example") | 
|  | 30 | + | 
|  | 31 | + | 
|  | 32 | +def _get_env(name: str) -> str: | 
|  | 33 | + value = os.getenv(name) | 
|  | 34 | + if not value: | 
|  | 35 | + raise RuntimeError(f"Missing environment variable: {name}") | 
|  | 36 | + return value | 
|  | 37 | + | 
|  | 38 | + | 
|  | 39 | +OPENAI_API_KEY = _get_env("OPENAI_API_KEY") | 
|  | 40 | +OPENAI_WEBHOOK_SECRET = _get_env("OPENAI_WEBHOOK_SECRET") | 
|  | 41 | + | 
|  | 42 | +client = AsyncOpenAI(api_key=OPENAI_API_KEY, webhook_secret=OPENAI_WEBHOOK_SECRET) | 
|  | 43 | + | 
|  | 44 | +# Build the multi-agent graph (triage + specialist agents) from agents.py. | 
|  | 45 | +assistant_agent = get_starting_agent() | 
|  | 46 | + | 
|  | 47 | +app = FastAPI() | 
|  | 48 | + | 
|  | 49 | +# Track background tasks so repeated webhooks do not spawn duplicates. | 
|  | 50 | +active_call_tasks: dict[str, asyncio.Task[None]] = {} | 
|  | 51 | + | 
|  | 52 | + | 
|  | 53 | +async def accept_call(call_id: str) -> None: | 
|  | 54 | + """Accept the incoming SIP call and configure the realtime session.""" | 
|  | 55 | + | 
|  | 56 | + # The starting agent uses static instructions, so we can forward them directly to the accept | 
|  | 57 | + # call payload. If someone swaps in a dynamic prompt, fall back to a sensible default. | 
|  | 58 | + instructions_payload = ( | 
|  | 59 | + assistant_agent.instructions | 
|  | 60 | + if isinstance(assistant_agent.instructions, str) | 
|  | 61 | + else "You are a helpful triage agent for ABC customer service." | 
|  | 62 | + ) | 
|  | 63 | + | 
|  | 64 | + try: | 
|  | 65 | + # AsyncOpenAI does not yet expose high-level helpers like client.realtime.calls.accept, so | 
|  | 66 | + # we call the REST endpoint directly via client.post(). Keep this until the SDK grows an | 
|  | 67 | + # async helper. | 
|  | 68 | + await client.post( | 
|  | 69 | + f"/realtime/calls/{call_id}/accept", | 
|  | 70 | + body={ | 
|  | 71 | + "type": "realtime", | 
|  | 72 | + "model": "gpt-realtime", | 
|  | 73 | + "instructions": instructions_payload, | 
|  | 74 | + }, | 
|  | 75 | + cast_to=dict, | 
|  | 76 | + ) | 
|  | 77 | + except APIStatusError as exc: | 
|  | 78 | + if exc.status_code == 404: | 
|  | 79 | + # Twilio occasionally retries webhooks after the caller hangs up; treat as a no-op so | 
|  | 80 | + # the webhook still returns 200. | 
|  | 81 | + logger.warning( | 
|  | 82 | + "Call %s no longer exists when attempting accept (404). Skipping.", call_id | 
|  | 83 | + ) | 
|  | 84 | + return | 
|  | 85 | + | 
|  | 86 | + detail = exc.message | 
|  | 87 | + if exc.response is not None: | 
|  | 88 | + try: | 
|  | 89 | + detail = exc.response.text | 
|  | 90 | + except Exception: # noqa: BLE001 | 
|  | 91 | + detail = str(exc.response) | 
|  | 92 | + | 
|  | 93 | + logger.error("Failed to accept call %s: %s %s", call_id, exc.status_code, detail) | 
|  | 94 | + raise HTTPException(status_code=500, detail="Failed to accept call") from exc | 
|  | 95 | + | 
|  | 96 | + logger.info("Accepted call %s", call_id) | 
|  | 97 | + | 
|  | 98 | + | 
|  | 99 | +async def observe_call(call_id: str) -> None: | 
|  | 100 | + """Attach to the realtime session and log conversation events.""" | 
|  | 101 | + | 
|  | 102 | + runner = RealtimeRunner(assistant_agent, model=OpenAIRealtimeSIPModel()) | 
|  | 103 | + | 
|  | 104 | + try: | 
|  | 105 | + initial_model_settings: RealtimeSessionModelSettings = { | 
|  | 106 | + "turn_detection": { | 
|  | 107 | + "type": "semantic_vad", | 
|  | 108 | + "interrupt_response": True, | 
|  | 109 | + } | 
|  | 110 | + } | 
|  | 111 | + async with await runner.run( | 
|  | 112 | + model_config={ | 
|  | 113 | + "call_id": call_id, | 
|  | 114 | + "initial_model_settings": initial_model_settings, | 
|  | 115 | + } | 
|  | 116 | + ) as session: | 
|  | 117 | + # Trigger an initial greeting so callers hear the agent right away. | 
|  | 118 | + # Issue a response.create immediately after the WebSocket attaches so the model speaks | 
|  | 119 | + # before the caller says anything. Using the raw client message ensures zero latency | 
|  | 120 | + # and avoids threading the greeting through history. | 
|  | 121 | + await session.model.send_event( | 
|  | 122 | + RealtimeModelSendRawMessage( | 
|  | 123 | + message={ | 
|  | 124 | + "type": "response.create", | 
|  | 125 | + "other_data": { | 
|  | 126 | + "response": { | 
|  | 127 | + "instructions": ( | 
|  | 128 | + "Say exactly '" | 
|  | 129 | + f"{WELCOME_MESSAGE}" | 
|  | 130 | + "' now before continuing the conversation." | 
|  | 131 | + ) | 
|  | 132 | + } | 
|  | 133 | + }, | 
|  | 134 | + } | 
|  | 135 | + ) | 
|  | 136 | + ) | 
|  | 137 | + | 
|  | 138 | + async for event in session: | 
|  | 139 | + if event.type == "history_added": | 
|  | 140 | + item = event.item | 
|  | 141 | + if isinstance(item, UserMessageItem): | 
|  | 142 | + for user_content in item.content: | 
|  | 143 | + if isinstance(user_content, InputText) and user_content.text: | 
|  | 144 | + logger.info("Caller: %s", user_content.text) | 
|  | 145 | + elif isinstance(item, AssistantMessageItem): | 
|  | 146 | + for assistant_content in item.content: | 
|  | 147 | + if ( | 
|  | 148 | + isinstance(assistant_content, AssistantText) | 
|  | 149 | + and assistant_content.text | 
|  | 150 | + ): | 
|  | 151 | + logger.info("Assistant (text): %s", assistant_content.text) | 
|  | 152 | + elif ( | 
|  | 153 | + isinstance(assistant_content, AssistantAudio) | 
|  | 154 | + and assistant_content.transcript | 
|  | 155 | + ): | 
|  | 156 | + logger.info( | 
|  | 157 | + "Assistant (audio transcript): %s", | 
|  | 158 | + assistant_content.transcript, | 
|  | 159 | + ) | 
|  | 160 | + elif event.type == "error": | 
|  | 161 | + logger.error("Realtime session error: %s", event.error) | 
|  | 162 | + | 
|  | 163 | + except websockets.exceptions.ConnectionClosedError: | 
|  | 164 | + # Callers hanging up causes the WebSocket to close without a frame; log at info level so it | 
|  | 165 | + # does not surface as an error. | 
|  | 166 | + logger.info("Realtime WebSocket closed for call %s", call_id) | 
|  | 167 | + except Exception as exc: # noqa: BLE001 - demo logging only | 
|  | 168 | + logger.exception("Error while observing call %s", call_id, exc_info=exc) | 
|  | 169 | + finally: | 
|  | 170 | + logger.info("Call %s ended", call_id) | 
|  | 171 | + active_call_tasks.pop(call_id, None) | 
|  | 172 | + | 
|  | 173 | + | 
|  | 174 | +def _track_call_task(call_id: str) -> None: | 
|  | 175 | + existing = active_call_tasks.get(call_id) | 
|  | 176 | + if existing: | 
|  | 177 | + if not existing.done(): | 
|  | 178 | + logger.info( | 
|  | 179 | + "Call %s already has an active observer; ignoring duplicate webhook delivery.", | 
|  | 180 | + call_id, | 
|  | 181 | + ) | 
|  | 182 | + return | 
|  | 183 | + # Remove completed tasks so a new observer can start for a fresh call. | 
|  | 184 | + active_call_tasks.pop(call_id, None) | 
|  | 185 | + | 
|  | 186 | + task = asyncio.create_task(observe_call(call_id)) | 
|  | 187 | + active_call_tasks[call_id] = task | 
|  | 188 | + | 
|  | 189 | + | 
|  | 190 | +@app.post("/openai/webhook") | 
|  | 191 | +async def openai_webhook(request: Request) -> Response: | 
|  | 192 | + body = await request.body() | 
|  | 193 | + | 
|  | 194 | + try: | 
|  | 195 | + event = client.webhooks.unwrap(body, request.headers) | 
|  | 196 | + except InvalidWebhookSignatureError as exc: | 
|  | 197 | + raise HTTPException(status_code=400, detail="Invalid webhook signature") from exc | 
|  | 198 | + | 
|  | 199 | + if event.type == "realtime.call.incoming": | 
|  | 200 | + call_id = event.data.call_id | 
|  | 201 | + await accept_call(call_id) | 
|  | 202 | + _track_call_task(call_id) | 
|  | 203 | + return Response(status_code=200) | 
|  | 204 | + | 
|  | 205 | + # Ignore other webhook event types for brevity. | 
|  | 206 | + return Response(status_code=200) | 
|  | 207 | + | 
|  | 208 | + | 
|  | 209 | +@app.get("/") | 
|  | 210 | +async def healthcheck() -> dict[str, str]: | 
|  | 211 | + return {"status": "ok"} | 
0 commit comments