| 
 | 1 | +import asyncio  | 
 | 2 | +import queue  | 
 | 3 | +import sys  | 
 | 4 | +import threading  | 
 | 5 | +from typing import Any  | 
 | 6 | + | 
 | 7 | +import numpy as np  | 
 | 8 | +import sounddevice as sd  | 
 | 9 | + | 
 | 10 | +from agents import function_tool  | 
 | 11 | +from agents.realtime import RealtimeAgent, RealtimeRunner, RealtimeSession, RealtimeSessionEvent  | 
 | 12 | + | 
 | 13 | +# Audio configuration  | 
 | 14 | +CHUNK_LENGTH_S = 0.05 # 50ms  | 
 | 15 | +SAMPLE_RATE = 24000  | 
 | 16 | +FORMAT = np.int16  | 
 | 17 | +CHANNELS = 1  | 
 | 18 | + | 
 | 19 | +# Set up logging for OpenAI agents SDK  | 
 | 20 | +# logging.basicConfig(  | 
 | 21 | +# level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"  | 
 | 22 | +# )  | 
 | 23 | +# logger.logger.setLevel(logging.ERROR)  | 
 | 24 | + | 
 | 25 | + | 
 | 26 | +@function_tool  | 
 | 27 | +def get_weather(city: str) -> str:  | 
 | 28 | + """Get the weather in a city."""  | 
 | 29 | + return f"The weather in {city} is sunny."  | 
 | 30 | + | 
 | 31 | + | 
 | 32 | +agent = RealtimeAgent(  | 
 | 33 | + name="Assistant",  | 
 | 34 | + instructions="You always greet the user with 'Top of the morning to you'.",  | 
 | 35 | + tools=[get_weather],  | 
 | 36 | +)  | 
 | 37 | + | 
 | 38 | + | 
 | 39 | +def _truncate_str(s: str, max_length: int) -> str:  | 
 | 40 | + if len(s) > max_length:  | 
 | 41 | + return s[:max_length] + "..."  | 
 | 42 | + return s  | 
 | 43 | + | 
 | 44 | + | 
 | 45 | +class NoUIDemo:  | 
 | 46 | + def __init__(self) -> None:  | 
 | 47 | + self.session: RealtimeSession | None = None  | 
 | 48 | + self.audio_stream: sd.InputStream | None = None  | 
 | 49 | + self.audio_player: sd.OutputStream | None = None  | 
 | 50 | + self.recording = False  | 
 | 51 | + | 
 | 52 | + # Audio output state for callback system  | 
 | 53 | + self.output_queue: queue.Queue[Any] = queue.Queue(maxsize=10) # Buffer more chunks  | 
 | 54 | + self.interrupt_event = threading.Event()  | 
 | 55 | + self.current_audio_chunk: np.ndarray | None = None # type: ignore  | 
 | 56 | + self.chunk_position = 0  | 
 | 57 | + | 
 | 58 | + def _output_callback(self, outdata, frames: int, time, status) -> None:  | 
 | 59 | + """Callback for audio output - handles continuous audio stream from server."""  | 
 | 60 | + if status:  | 
 | 61 | + print(f"Output callback status: {status}")  | 
 | 62 | + | 
 | 63 | + # Check if we should clear the queue due to interrupt  | 
 | 64 | + if self.interrupt_event.is_set():  | 
 | 65 | + # Clear the queue and current chunk state  | 
 | 66 | + while not self.output_queue.empty():  | 
 | 67 | + try:  | 
 | 68 | + self.output_queue.get_nowait()  | 
 | 69 | + except queue.Empty:  | 
 | 70 | + break  | 
 | 71 | + self.current_audio_chunk = None  | 
 | 72 | + self.chunk_position = 0  | 
 | 73 | + self.interrupt_event.clear()  | 
 | 74 | + outdata.fill(0)  | 
 | 75 | + return  | 
 | 76 | + | 
 | 77 | + # Fill output buffer from queue and current chunk  | 
 | 78 | + outdata.fill(0) # Start with silence  | 
 | 79 | + samples_filled = 0  | 
 | 80 | + | 
 | 81 | + while samples_filled < len(outdata):  | 
 | 82 | + # If we don't have a current chunk, try to get one from queue  | 
 | 83 | + if self.current_audio_chunk is None:  | 
 | 84 | + try:  | 
 | 85 | + self.current_audio_chunk = self.output_queue.get_nowait()  | 
 | 86 | + self.chunk_position = 0  | 
 | 87 | + except queue.Empty:  | 
 | 88 | + # No more audio data available - this causes choppiness  | 
 | 89 | + # Uncomment next line to debug underruns:  | 
 | 90 | + # print(f"Audio underrun: {samples_filled}/{len(outdata)} samples filled")  | 
 | 91 | + break  | 
 | 92 | + | 
 | 93 | + # Copy data from current chunk to output buffer  | 
 | 94 | + remaining_output = len(outdata) - samples_filled  | 
 | 95 | + remaining_chunk = len(self.current_audio_chunk) - self.chunk_position  | 
 | 96 | + samples_to_copy = min(remaining_output, remaining_chunk)  | 
 | 97 | + | 
 | 98 | + if samples_to_copy > 0:  | 
 | 99 | + chunk_data = self.current_audio_chunk[  | 
 | 100 | + self.chunk_position : self.chunk_position + samples_to_copy  | 
 | 101 | + ]  | 
 | 102 | + # More efficient: direct assignment for mono audio instead of reshape  | 
 | 103 | + outdata[samples_filled : samples_filled + samples_to_copy, 0] = chunk_data  | 
 | 104 | + samples_filled += samples_to_copy  | 
 | 105 | + self.chunk_position += samples_to_copy  | 
 | 106 | + | 
 | 107 | + # If we've used up the entire chunk, reset for next iteration  | 
 | 108 | + if self.chunk_position >= len(self.current_audio_chunk):  | 
 | 109 | + self.current_audio_chunk = None  | 
 | 110 | + self.chunk_position = 0  | 
 | 111 | + | 
 | 112 | + async def run(self) -> None:  | 
 | 113 | + print("Connecting, may take a few seconds...")  | 
 | 114 | + | 
 | 115 | + # Initialize audio player with callback  | 
 | 116 | + chunk_size = int(SAMPLE_RATE * CHUNK_LENGTH_S)  | 
 | 117 | + self.audio_player = sd.OutputStream(  | 
 | 118 | + channels=CHANNELS,  | 
 | 119 | + samplerate=SAMPLE_RATE,  | 
 | 120 | + dtype=FORMAT,  | 
 | 121 | + callback=self._output_callback,  | 
 | 122 | + blocksize=chunk_size, # Match our chunk timing for better alignment  | 
 | 123 | + )  | 
 | 124 | + self.audio_player.start()  | 
 | 125 | + | 
 | 126 | + try:  | 
 | 127 | + runner = RealtimeRunner(agent)  | 
 | 128 | + async with await runner.run() as session:  | 
 | 129 | + self.session = session  | 
 | 130 | + print("Connected. Starting audio recording...")  | 
 | 131 | + | 
 | 132 | + # Start audio recording  | 
 | 133 | + await self.start_audio_recording()  | 
 | 134 | + print("Audio recording started. You can start speaking - expect lots of logs!")  | 
 | 135 | + | 
 | 136 | + # Process session events  | 
 | 137 | + async for event in session:  | 
 | 138 | + await self._on_event(event)  | 
 | 139 | + | 
 | 140 | + finally:  | 
 | 141 | + # Clean up audio player  | 
 | 142 | + if self.audio_player and self.audio_player.active:  | 
 | 143 | + self.audio_player.stop()  | 
 | 144 | + if self.audio_player:  | 
 | 145 | + self.audio_player.close()  | 
 | 146 | + | 
 | 147 | + print("Session ended")  | 
 | 148 | + | 
 | 149 | + async def start_audio_recording(self) -> None:  | 
 | 150 | + """Start recording audio from the microphone."""  | 
 | 151 | + # Set up audio input stream  | 
 | 152 | + self.audio_stream = sd.InputStream(  | 
 | 153 | + channels=CHANNELS,  | 
 | 154 | + samplerate=SAMPLE_RATE,  | 
 | 155 | + dtype=FORMAT,  | 
 | 156 | + )  | 
 | 157 | + | 
 | 158 | + self.audio_stream.start()  | 
 | 159 | + self.recording = True  | 
 | 160 | + | 
 | 161 | + # Start audio capture task  | 
 | 162 | + asyncio.create_task(self.capture_audio())  | 
 | 163 | + | 
 | 164 | + async def capture_audio(self) -> None:  | 
 | 165 | + """Capture audio from the microphone and send to the session."""  | 
 | 166 | + if not self.audio_stream or not self.session:  | 
 | 167 | + return  | 
 | 168 | + | 
 | 169 | + # Buffer size in samples  | 
 | 170 | + read_size = int(SAMPLE_RATE * CHUNK_LENGTH_S)  | 
 | 171 | + | 
 | 172 | + try:  | 
 | 173 | + while self.recording:  | 
 | 174 | + # Check if there's enough data to read  | 
 | 175 | + if self.audio_stream.read_available < read_size:  | 
 | 176 | + await asyncio.sleep(0.01)  | 
 | 177 | + continue  | 
 | 178 | + | 
 | 179 | + # Read audio data  | 
 | 180 | + data, _ = self.audio_stream.read(read_size)  | 
 | 181 | + | 
 | 182 | + # Convert numpy array to bytes  | 
 | 183 | + audio_bytes = data.tobytes()  | 
 | 184 | + | 
 | 185 | + # Send audio to session  | 
 | 186 | + await self.session.send_audio(audio_bytes)  | 
 | 187 | + | 
 | 188 | + # Yield control back to event loop  | 
 | 189 | + await asyncio.sleep(0)  | 
 | 190 | + | 
 | 191 | + except Exception as e:  | 
 | 192 | + print(f"Audio capture error: {e}")  | 
 | 193 | + finally:  | 
 | 194 | + if self.audio_stream and self.audio_stream.active:  | 
 | 195 | + self.audio_stream.stop()  | 
 | 196 | + if self.audio_stream:  | 
 | 197 | + self.audio_stream.close()  | 
 | 198 | + | 
 | 199 | + async def _on_event(self, event: RealtimeSessionEvent) -> None:  | 
 | 200 | + """Handle session events."""  | 
 | 201 | + try:  | 
 | 202 | + if event.type == "agent_start":  | 
 | 203 | + print(f"Agent started: {event.agent.name}")  | 
 | 204 | + elif event.type == "agent_end":  | 
 | 205 | + print(f"Agent ended: {event.agent.name}")  | 
 | 206 | + elif event.type == "handoff":  | 
 | 207 | + print(f"Handoff from {event.from_agent.name} to {event.to_agent.name}")  | 
 | 208 | + elif event.type == "tool_start":  | 
 | 209 | + print(f"Tool started: {event.tool.name}")  | 
 | 210 | + elif event.type == "tool_end":  | 
 | 211 | + print(f"Tool ended: {event.tool.name}; output: {event.output}")  | 
 | 212 | + elif event.type == "audio_end":  | 
 | 213 | + print("Audio ended")  | 
 | 214 | + elif event.type == "audio":  | 
 | 215 | + # Enqueue audio for callback-based playback  | 
 | 216 | + np_audio = np.frombuffer(event.audio.data, dtype=np.int16)  | 
 | 217 | + try:  | 
 | 218 | + self.output_queue.put_nowait(np_audio)  | 
 | 219 | + except queue.Full:  | 
 | 220 | + # Queue is full - only drop if we have significant backlog  | 
 | 221 | + # This prevents aggressive dropping that could cause choppiness  | 
 | 222 | + if self.output_queue.qsize() > 8: # Keep some buffer  | 
 | 223 | + try:  | 
 | 224 | + self.output_queue.get_nowait()  | 
 | 225 | + self.output_queue.put_nowait(np_audio)  | 
 | 226 | + except queue.Empty:  | 
 | 227 | + pass  | 
 | 228 | + # If queue isn't too full, just skip this chunk to avoid blocking  | 
 | 229 | + elif event.type == "audio_interrupted":  | 
 | 230 | + print("Audio interrupted")  | 
 | 231 | + # Signal the output callback to clear its queue and state  | 
 | 232 | + self.interrupt_event.set()  | 
 | 233 | + elif event.type == "error":  | 
 | 234 | + print(f"Error: {event.error}")  | 
 | 235 | + elif event.type == "history_updated":  | 
 | 236 | + pass # Skip these frequent events  | 
 | 237 | + elif event.type == "history_added":  | 
 | 238 | + pass # Skip these frequent events  | 
 | 239 | + elif event.type == "raw_model_event":  | 
 | 240 | + print(f"Raw model event: {_truncate_str(str(event.data), 50)}")  | 
 | 241 | + else:  | 
 | 242 | + print(f"Unknown event type: {event.type}")  | 
 | 243 | + except Exception as e:  | 
 | 244 | + print(f"Error processing event: {_truncate_str(str(e), 50)}")  | 
 | 245 | + | 
 | 246 | + | 
 | 247 | +if __name__ == "__main__":  | 
 | 248 | + demo = NoUIDemo()  | 
 | 249 | + try:  | 
 | 250 | + asyncio.run(demo.run())  | 
 | 251 | + except KeyboardInterrupt:  | 
 | 252 | + print("\nExiting...")  | 
 | 253 | + sys.exit(0)  | 
0 commit comments