Introduction
Building sophisticated conversational AI systems requires robust event management. Google's Agent Development Kit (ADK) provides powerful tools for managing conversation state, but mastering event injection and extraction patterns is crucial for creating production-ready applications.
In this comprehensive guide, we'll explore real-world patterns from a production AI system, covering everything from basic event injection to advanced extraction techniques.
Table of Contents
- Understanding ADK DatabaseSessionService
- Event Injection Patterns
- Event Extraction Patterns
- Real-World Implementation Examples
- Best Practices and Common Pitfalls
- Performance Considerations
- Conclusion
Understanding Events and Sessions in ADK
Before going into injection and extraction patterns, it's crucial to understand what events are and how they relate to sessions in the Google ADK ecosystem.
What Are Events?
Events are the fundamental units of information flow within the Agent Development Kit (ADK). According to the official ADK documentation, events represent every significant occurrence during an agent's interaction lifecycle, from initial user input to the final response and all the steps in between.
An Event in ADK is an immutable record representing a specific point in the agent's execution. It captures:
- User messages
- Agent replies
- Tool requests (function calls)
- Tool results
- State changes
- Control signals
- Errors
Event Structure
Based on the ADK Event class, events have this conceptual structure:
Why Events Matter
Events are central to ADK's operation for several key reasons:
Communication: They serve as the standard message format between the user interface, the Runner, agents, the LLM, and tools. Everything flows as an Event.
Signaling State & Artifact Changes: Events carry instructions for state modifications and track artifact updates. The SessionService uses these signals to ensure persistence via
event.actions.state_delta
andevent.actions.artifact_delta
.Control Flow: Specific fields like
event.actions.transfer_to_agent
orevent.actions.escalate
act as signals that direct the framework, determining which agent runs next or if a loop should terminate.History & Observability: The sequence of events recorded in
session.events
provides a complete, chronological history of an interaction, invaluable for debugging, auditing, and understanding agent behavior step-by-step.
Understanding Sessions
Sessions are containers that hold the complete conversation state. The DatabaseSessionService is the core component that manages conversation state in ADK applications. It handles:
- Session Management: Creating, retrieving, and managing conversation sessions
- Event Storage: Persisting conversation events with full history
- State Management: Maintaining session state across interactions
- Event Processing: Enabling real-time event injection and extraction
Event-Session Relationship
The relationship between events and sessions is fundamental:
- Events are stored in sessions: Each session maintains a chronological list of all events that have occurred
- Sessions provide context: Events can reference and modify session state through
EventActions
- Sessions enable persistence: The
DatabaseSessionService
ensures events are persisted and can be retrieved later - Events drive session evolution: Each event can modify session state, creating a living record of the conversation
Key Components
from google.adk.sessions import DatabaseSessionService from google.adk.events import Event, EventActions from google.genai import types
The EventActions class is particularly important as it contains the side effects and control signals that events can carry.
Event Flow and Processing
Understanding how events flow through the ADK system is crucial for effective event management. According to the ADK documentation, events follow this processing flow:
-
Generation: Events are created at different points:
- User Input: The Runner wraps initial user messages into an Event with
author='user'
- Agent Logic: Agents explicitly yield
Event(...)
objects withauthor=self.name
- LLM Responses: The ADK model integration layer translates raw LLM output into Event objects
- Tool Results: After tool execution, the framework generates an Event containing the
function_response
- User Input: The Runner wraps initial user messages into an Event with
-
Processing Flow:
- Yield/Return: An event is generated and yielded by its source
- Runner Receives: The main Runner executing the agent receives the event
- SessionService Processing: The Runner sends the event to the configured SessionService
- Apply Deltas: The service merges
event.actions.state_delta
intosession.state
- Persist to History: Appends the processed event to the
session.events
list - External Yield: The Runner yields the processed event to the calling application
This flow ensures that state changes and history are consistently recorded alongside the communication content of each event.
Event Types and Identification
Based on the ADK documentation, you can identify event types by checking:
- Who sent it? (
event.author
): 'user' for end-user input, agent name for agent output - What's the main payload? (
event.content
andevent.content.parts
):- Text: Conversational messages
- Tool Call Request: LLM asking to execute tools
- Tool Result: Results from tool execution
- Is it streaming? (
event.partial
): Indicates incomplete chunks of text
Event Injection Patterns
1. Direct Event Injection with append_event()
This is the most fundamental pattern for injecting events into a session. The append_event()
method is part of the DatabaseSessionService and is perfect for system notifications and background processing.
Basic Implementation
async def inject_system_notification( session_id: str, user_id: str, notification_data: dict ) -> None: """Inject a system notification into an existing session""" # Initialize service svc = DatabaseSessionService(db_url=settings.sql_database_url) session = await svc.get_session( app_name="my_ai_app", user_id=user_id, session_id=session_id ) # Create structured notification content system_content = f"[SYSTEM_NOTIFICATION]\n{json.dumps(notification_data, default=str)}" # Build the event sys_event = Event( invocation_id=f"sys_notif_{int(time.time())}", author="system", content=types.Content( role="system", parts=[types.Part(text=system_content)] ), actions=EventActions() ) # Inject the event await svc.append_event(session=session, event=sys_event)
Advanced: Background Processing Events
async def process_order_created( order_id: str, session_id: str, offer_id: str, user_id: str ) -> None: """Process order creation with agent response""" svc = DatabaseSessionService(db_url=settings.sql_database_url) session = await svc.get_session( app_name="my_ai_app", user_id=user_id, session_id=session_id ) # 1. Inject system event with order data order_data = await api_service.fetch_order_details(order_id, offer_id) system_content = "[ORDER.CREATED]\n" + json.dumps(order_data, default=str) sys_event = Event( invocation_id=f"bkgd_sys_{order_id}", author="system", content=types.Content( role="system", parts=[types.Part(text=system_content)] ), actions=EventActions() ) await svc.append_event(session=session, event=sys_event) # 2. Generate agent response prompt = "Summarize next steps based on the above order data." result = await non_streaming_agent.invoke( content=types.Content( role="user", parts=[types.Part(text=prompt)] ) ) assistant_text = next( (p.text for p in result.parts if getattr(p, "text", None)), None ) if assistant_text: # 3. Inject agent response assistant_event = Event( invocation_id=f"bkgd_asst_{order_id}", author="assistant", content=types.Content( role="assistant", parts=[types.Part(text=assistant_text)] ), actions=EventActions() ) await svc.append_event(session=session, event=assistant_event)
When to use:
- System notifications
- Background data processing
- Automated agent responses
- Data synchronization events
2. State Management with State Deltas
Update session state without visible content using the EventActions.state_delta
field. This pattern leverages the EventActions class and is essential for maintaining session metadata and user context.
Authentication State Management
async def inject_admin_authentication( session: Session, svc: DatabaseSessionService, auth_token: str, admin_user_id: str, customer_user_id: str = None ) -> None: """Inject admin authentication state into session""" # Build state delta state_delta = { "session:auth_token": auth_token, "session:admin_user_id": admin_user_id, "session:admin_mode": True } if customer_user_id: state_delta["session:customer_user_id"] = customer_user_id # Create state update event admin_auth_evt = Event( invocation_id="inject_admin_auth", author="system", actions=EventActions(state_delta=state_delta) ) await svc.append_event(session=session, event=admin_auth_evt)
User Preference Management
async def update_user_preferences( session: Session, svc: DatabaseSessionService, timezone: str = None, offset_minutes: int = None ) -> None: """Update user timezone and offset preferences""" state_delta = {} if timezone is not None: state_delta["user:time_zone"] = timezone if offset_minutes is not None: state_delta["user:offset_minutes"] = int(offset_minutes) if state_delta: actions = EventActions(state_delta=state_delta) tz_evt = Event( invocation_id=str(uuid.uuid4()), author="system", actions=actions, timestamp=time.time() ) await svc.append_event(session=session, event=tz_evt)
When to use:
- Authentication state updates
- User preference management
- Session metadata storage
- Temporary session data
3. Chat Service Integration
For user-facing notifications that appear in the chat UI. This pattern bridges ADK sessions with chat interfaces.
Status Notifications
async def process_status_notification( session_id: str, user_id: str, order_id: str, event_type: str, status_data: StatusNotificationData, admin_user: str ) -> None: """Process status notification and inject into chat session""" try: # Create formatted message message_content = _format_status_message(event_type, status_data, order_id) # Validate session is_valid, error_msg = _validate_session_for_notification(db, session_id, user_id) if not is_valid: logger.error(f"Session validation failed: {error_msg}") return # Inject via chat service chat_service = ChatService(db) status_dict = status_data.model_dump() # Ensure datetime serialization for key, value in status_dict.items(): if isinstance(value, datetime): status_dict[key] = value.isoformat() await chat_service.inject_system_message( session_id=session_id, user_id=user_id, message_content=message_content, message_type="status_notification", metadata={ "order_id": order_id, "event_type": event_type, "admin_user": admin_user, "timestamp": datetime.utcnow().isoformat(), "status_data": status_dict } ) except Exception as e: logger.error(f"Failed to inject status notification: {e}") # Don't raise - webhook should still succeed
Payment Notifications
async def inject_payment_notification( session_id: str, user_email: str, payment_type: str, amount: float, reference: str ) -> None: """Inject payment notification into chat session""" message = f""" 💳 Payment {payment_type.title()} Notification Amount: ${amount:.2f} Reference: {reference} Your payment has been processed successfully! """ try: chat_service = ChatService() await chat_service.inject_system_message( session_id=session_id, user_id=user_email, message_content=message, message_type="payment_notification", metadata={ "payment_type": payment_type, "amount": amount, "reference": reference, "timestamp": datetime.utcnow().isoformat() } ) except Exception as e: logger.error(f"Failed to inject payment notification: {e}")
When to use:
- User-facing notifications
- Status updates
- Payment alerts
- System status messages
Event Extraction Patterns
1. Direct Session Access
The simplest way to access all events in a session.
async def get_session_history( user_id: str, session_id: str ) -> List[Event]: """Retrieve complete session history""" svc = DatabaseSessionService(db_url=settings.sql_database_url) session = await svc.get_session( app_name="my_ai_app", user_id=user_id, session_id=session_id ) return session.events # Complete event history
2. Intelligent Text Extraction
Process agent responses safely with proper error handling. This pattern is based on the ADK documentation for identifying and extracting text content from events.
def _extract_text_from_event(evt) -> str | None: """ Safely extract the first text part from an ADK event. Handles cases where evt.content might be None and provides robust error handling for production systems. """ content = getattr(evt, "content", None) if content is None: return None parts = getattr(content, "parts", None) if not parts: return None first_part = parts[0] # Skip function calls - they are handled internally by the runner if getattr(first_part, "function_call", None) is not None: return None return getattr(first_part, "text", None) or None # Usage in agent response processing async def process_agent_response(invocation_events: List[Event]) -> str: """Process agent response events and extract text content""" response_parts = [] content = None try: for i, evt in enumerate(invocation_events): maybe_text = _extract_text_from_event(evt) if maybe_text: response_parts.append(maybe_text) content = maybe_text # Keep the last non-empty response logger.debug(f"Extracted text from event {i}: {maybe_text[:100]}...") logger.info(f"Response extraction completed: {len(response_parts)} text parts found") except Exception as extraction_error: logger.error(f"Response extraction failed: {extraction_error}", exc_info=True) if response_parts: content = response_parts[-1] # Use last successful extraction if content is None: raise ValueError("No response generated by agent") return content
3. Specialized Event Extraction
Extract specific event types like payment events, errors, or custom data structures.
Payment Event Extraction
def _extract_payment_event_from_event(evt) -> dict | None: """Extract payment events from agent tool execution events""" try: content = getattr(evt, "content", None) if content is None: return None parts = getattr(content, "parts", None) if not parts: return None # Look for function result parts that might contain payment events for part in parts: function_result = getattr(part, "function_result", None) if function_result is None: continue # Get the result content result_content = getattr(function_result, "content", None) if result_content is None: continue # Parse the result as JSON to check for payment events try: if isinstance(result_content, str): result_data = json.loads(result_content) elif isinstance(result_content, dict): result_data = result_content else: continue # Check if this result contains a payment event if isinstance(result_data, dict) and result_data.get('payment_event'): payment_event = result_data['payment_event'] if payment_event.get('type') == 'open_payment_modal': return payment_event except (json.JSONDecodeError, AttributeError): continue return None except Exception as e: logger.debug(f"Error extracting payment event: {e}") return None # Usage in payment processing async def process_payment_events(invocation_events: List[Event]) -> dict | None: """Process agent events and extract payment information""" payment_event = None try: for evt in invocation_events: payment_evt = _extract_payment_event_from_event(evt) if payment_evt: payment_event = payment_evt logger.info(f"Payment event detected: {payment_evt.get('type', 'unknown')}") break except Exception as e: logger.error(f"Payment event processing failed: {e}") return payment_event
Detecting Final Responses
According to the ADK documentation, you can use the built-in event.is_final_response()
method to identify events suitable for display as the agent's complete output:
async def process_agent_events(invocation_events: List[Event]) -> str: """Process agent events and extract final response""" full_response_text = "" final_response = None for event in invocation_events: # Accumulate streaming text if needed if event.partial and event.content and event.content.parts and event.content.parts[0].text: full_response_text += event.content.parts[0].text # Check if it's a final, displayable event if event.is_final_response(): if event.content and event.content.parts and event.content.parts[0].text: # Use accumulated text for final response final_response = full_response_text + (event.content.parts[0].text if not event.partial else "") break return final_response or "No final response generated"
When is_final_response()
returns True:
- The event contains a tool result with
skip_summarization=True
- The event contains a tool call for a long-running tool
- All of the following are met:
- No function calls
- No function responses
- Not a partial stream chunk
- Doesn't end with code execution results
Error Event Extraction
def _extract_error_from_event(evt) -> dict | None: """Extract error information from agent events""" try: content = getattr(evt, "content", None) if content is None: return None parts = getattr(content, "parts", None) if not parts: return None for part in parts: # Check for error indicators if hasattr(part, 'text') and part.text: text = part.text.lower() if any(keyword in text for keyword in ['error', 'failed', 'exception']): return { 'type': 'error', 'message': part.text, 'timestamp': getattr(evt, 'timestamp', None) } except Exception as e: logger.debug(f"Error extracting error event: {e}") return None
Real-World Implementation Examples
Session Migration Pattern
async def migrate_anonymous_session( svc: DatabaseSessionService, anonymous_session_id: str, authenticated_user_id: str ) -> Session: """Migrate anonymous session to authenticated user""" # Get anonymous session anonymous_session = await svc.get_session( app_name="my_ai_app", user_id="anonymous", session_id=anonymous_session_id ) if not anonymous_session: raise ValueError("Anonymous session not found") # Get all events from anonymous session anonymous_events = anonymous_session.events # Create new session for authenticated user session = await svc.create_session( app_name="my_ai_app", user_id=authenticated_user_id, session_id=anonymous_session_id # Keep same session ID ) # Migrate events (filtering out system events that will be recreated) migrated_events = 0 if anonymous_events: for event in anonymous_events: # Skip system events that will be recreated if event.author == "system" and event.invocation_id.startswith("sys_"): continue # Re-inject the event into the new session await svc.append_event(session=session, event=event) migrated_events += 1 logger.info(f"Migrated {migrated_events} events from anonymous to authenticated session") return session
Real-time Event Processing
async def process_live_events(live_events): """Process live agent events in real-time""" async for event in live_events: try: # Extract text content text_content = _extract_text_from_event(event) if text_content: yield { "type": "text", "content": text_content, "timestamp": getattr(event, 'timestamp', time.time()) } # Extract payment events payment_event = _extract_payment_event_from_event(event) if payment_event: yield { "type": "payment", "content": payment_event, "timestamp": getattr(event, 'timestamp', time.time()) } except Exception as e: logger.error(f"Error processing live event: {e}") yield { "type": "error", "content": str(e), "timestamp": time.time() }
Best Practices and Common Pitfalls
1. Session Validation
Always validate sessions before injection:
def _validate_session_for_notification( db: Session, session_id: str, user_id: str ) -> Tuple[bool, Optional[str]]: """Validate session exists and belongs to user""" try: # Check if session exists in chat system room = db.query(ChatRoom).filter( ChatRoom.id == UUID(session_id), ChatRoom.user_id == user_id ).first() if not room: return False, f"Session {session_id} not found for user {user_id}" return True, None except Exception as e: return False, f"Session validation error: {str(e)}"
2. Error Handling
Implement robust error handling to prevent system failures:
async def safe_event_injection( svc: DatabaseSessionService, session: Session, event: Event ) -> bool: """Safely inject event with error handling""" try: await svc.append_event(session=session, event=event) return True except Exception as e: logger.error(f"Failed to inject event: {e}") # Don't raise - allow system to continue return False
3. Unique Invocation IDs
Use unique invocation IDs to prevent duplicate events:
def generate_unique_invocation_id(prefix: str) -> str: """Generate unique invocation ID with timestamp and UUID""" timestamp = int(time.time()) uuid_suffix = str(uuid.uuid4())[:8] return f"{prefix}_{timestamp}_{uuid_suffix}"
4. Event Ordering
Maintain proper event ordering:
async def inject_ordered_events( svc: DatabaseSessionService, session: Session, events: List[Event] ) -> None: """Inject events in proper order""" # Sort events by timestamp if available sorted_events = sorted( events, key=lambda e: getattr(e, 'timestamp', 0) ) for event in sorted_events: await svc.append_event(session=session, event=event)
Performance Considerations
1. Batch Operations
For multiple events, consider batch operations:
async def batch_event_injection( svc: DatabaseSessionService, session: Session, events: List[Event] ) -> None: """Inject multiple events efficiently""" # Process events in batches to avoid overwhelming the system batch_size = 10 for i in range(0, len(events), batch_size): batch = events[i:i + batch_size] # Process batch concurrently tasks = [ svc.append_event(session=session, event=event) for event in batch ] await asyncio.gather(*tasks, return_exceptions=True)
2. Event Filtering
Filter events during extraction to improve performance:
def filter_recent_events(events: List[Event], hours: int = 24) -> List[Event]: """Filter events to only recent ones""" cutoff_time = time.time() - (hours * 3600) return [ event for event in events if getattr(event, 'timestamp', 0) > cutoff_time ]
3. Memory Management
For large session histories, implement pagination:
async def get_paginated_events( svc: DatabaseSessionService, user_id: str, session_id: str, page: int = 0, page_size: int = 100 ) -> List[Event]: """Get paginated session events""" session = await svc.get_session( app_name="my_ai_app", user_id=user_id, session_id=session_id ) all_events = session.events start_idx = page * page_size end_idx = start_idx + page_size return all_events[start_idx:end_idx]
Conclusion
Mastering ADK DatabaseSessionService event injection and extraction is crucial for building production-ready conversational AI systems. The patterns covered in this guide provide a solid foundation for:
- System Integration: Seamlessly integrating with external systems
- Real-time Processing: Handling live events and notifications
- State Management: Maintaining conversation context
- Error Handling: Building robust, fault-tolerant systems
Key Takeaways
- Choose the right injection pattern for your use case
- Implement robust error handling to prevent system failures
- Use unique invocation IDs to avoid duplicate events
- Consider performance implications for large-scale deployments
- Validate sessions before injection operations
Next Steps
- Experiment with the patterns in your own projects
- Consider implementing monitoring for event injection/extraction
- Explore advanced patterns like event streaming and real-time processing
- Share your experiences and learnings with the community
The ADK DatabaseSessionService is a powerful tool when used correctly. These patterns will help you build sophisticated conversational AI systems that can handle real-world complexity while maintaining reliability and performance.
References
This guide is based on the official Google ADK documentation and source code. Here are the key references used:
Official Documentation
- ADK Events Documentation - Comprehensive guide to understanding events in ADK
- Google ADK Python Repository - Official ADK Python SDK
Source Code References
- Event Class - Core Event implementation
- EventActions Class - EventActions for state deltas and control signals
- DatabaseSessionService - Session management and event persistence
- Session Class - Session data structure
- BaseSessionService - Base session service interface
Key Concepts Referenced
- Event Structure: Based on the Event class extending LlmResponse with ADK-specific fields
- Event Flow: Processing flow from generation to persistence as documented in ADK docs
- State Management: Using
EventActions.state_delta
for session state updates - Session Persistence: How
DatabaseSessionService.append_event()
handles event storage - Event Identification: Methods for identifying event types as described in ADK documentation
Additional Resources
- ADK GitHub Repository - Full source code and examples
- ADK Documentation Hub - Complete ADK documentation
- ADK Python SDK - PyPI package for installation
Have you implemented similar patterns in your projects? Share your experiences and any additional insights in the comments below!
Top comments (4)
Quite elaborate 😎
Detailed and well written. In what cases might we use Threadpool executor instead of simple batch processing as part of the performance consideration?
This puts perspective to an issue I had recently, thanks
This is a goldmine for anyone diving into ADK...