DEV Community

Claret Ibeawuchi
Claret Ibeawuchi

Posted on

Mastering Google ADK DatabaseSessionService and Events: Complete Guide to Event Injection and Extraction

adk-logo

Introduction

Building sophisticated conversational AI systems requires robust event management. Google's Agent Development Kit (ADK) provides powerful tools for managing conversation state, but mastering event injection and extraction patterns is crucial for creating production-ready applications.

In this comprehensive guide, we'll explore real-world patterns from a production AI system, covering everything from basic event injection to advanced extraction techniques.

Table of Contents

Understanding Events and Sessions in ADK

Before going into injection and extraction patterns, it's crucial to understand what events are and how they relate to sessions in the Google ADK ecosystem.

What Are Events?

Events are the fundamental units of information flow within the Agent Development Kit (ADK). According to the official ADK documentation, events represent every significant occurrence during an agent's interaction lifecycle, from initial user input to the final response and all the steps in between.

An Event in ADK is an immutable record representing a specific point in the agent's execution. It captures:

  • User messages
  • Agent replies
  • Tool requests (function calls)
  • Tool results
  • State changes
  • Control signals
  • Errors

Event Structure

Based on the ADK Event class, events have this conceptual structure:

ADK-event-structure

Why Events Matter

Events are central to ADK's operation for several key reasons:

  1. Communication: They serve as the standard message format between the user interface, the Runner, agents, the LLM, and tools. Everything flows as an Event.

  2. Signaling State & Artifact Changes: Events carry instructions for state modifications and track artifact updates. The SessionService uses these signals to ensure persistence via event.actions.state_delta and event.actions.artifact_delta.

  3. Control Flow: Specific fields like event.actions.transfer_to_agent or event.actions.escalate act as signals that direct the framework, determining which agent runs next or if a loop should terminate.

  4. History & Observability: The sequence of events recorded in session.events provides a complete, chronological history of an interaction, invaluable for debugging, auditing, and understanding agent behavior step-by-step.

Understanding Sessions

Sessions are containers that hold the complete conversation state. The DatabaseSessionService is the core component that manages conversation state in ADK applications. It handles:

  • Session Management: Creating, retrieving, and managing conversation sessions
  • Event Storage: Persisting conversation events with full history
  • State Management: Maintaining session state across interactions
  • Event Processing: Enabling real-time event injection and extraction

Event-Session Relationship

The relationship between events and sessions is fundamental:

  1. Events are stored in sessions: Each session maintains a chronological list of all events that have occurred
  2. Sessions provide context: Events can reference and modify session state through EventActions
  3. Sessions enable persistence: The DatabaseSessionService ensures events are persisted and can be retrieved later
  4. Events drive session evolution: Each event can modify session state, creating a living record of the conversation

Key Components

from google.adk.sessions import DatabaseSessionService from google.adk.events import Event, EventActions from google.genai import types 
Enter fullscreen mode Exit fullscreen mode

The EventActions class is particularly important as it contains the side effects and control signals that events can carry.

Event Flow and Processing

Understanding how events flow through the ADK system is crucial for effective event management. According to the ADK documentation, events follow this processing flow:

  1. Generation: Events are created at different points:

    • User Input: The Runner wraps initial user messages into an Event with author='user'
    • Agent Logic: Agents explicitly yield Event(...) objects with author=self.name
    • LLM Responses: The ADK model integration layer translates raw LLM output into Event objects
    • Tool Results: After tool execution, the framework generates an Event containing the function_response
  2. Processing Flow:

    • Yield/Return: An event is generated and yielded by its source
    • Runner Receives: The main Runner executing the agent receives the event
    • SessionService Processing: The Runner sends the event to the configured SessionService
    • Apply Deltas: The service merges event.actions.state_delta into session.state
    • Persist to History: Appends the processed event to the session.events list
    • External Yield: The Runner yields the processed event to the calling application

This flow ensures that state changes and history are consistently recorded alongside the communication content of each event.

Event Types and Identification

Based on the ADK documentation, you can identify event types by checking:

  • Who sent it? (event.author): 'user' for end-user input, agent name for agent output
  • What's the main payload? (event.content and event.content.parts):
    • Text: Conversational messages
    • Tool Call Request: LLM asking to execute tools
    • Tool Result: Results from tool execution
  • Is it streaming? (event.partial): Indicates incomplete chunks of text

Event Injection Patterns

1. Direct Event Injection with append_event()

This is the most fundamental pattern for injecting events into a session. The append_event() method is part of the DatabaseSessionService and is perfect for system notifications and background processing.

Basic Implementation

async def inject_system_notification( session_id: str, user_id: str, notification_data: dict ) -> None: """Inject a system notification into an existing session""" # Initialize service  svc = DatabaseSessionService(db_url=settings.sql_database_url) session = await svc.get_session( app_name="my_ai_app", user_id=user_id, session_id=session_id ) # Create structured notification content  system_content = f"[SYSTEM_NOTIFICATION]\n{json.dumps(notification_data, default=str)}" # Build the event  sys_event = Event( invocation_id=f"sys_notif_{int(time.time())}", author="system", content=types.Content( role="system", parts=[types.Part(text=system_content)] ), actions=EventActions() ) # Inject the event  await svc.append_event(session=session, event=sys_event) 
Enter fullscreen mode Exit fullscreen mode

Advanced: Background Processing Events

async def process_order_created( order_id: str, session_id: str, offer_id: str, user_id: str ) -> None: """Process order creation with agent response""" svc = DatabaseSessionService(db_url=settings.sql_database_url) session = await svc.get_session( app_name="my_ai_app", user_id=user_id, session_id=session_id ) # 1. Inject system event with order data  order_data = await api_service.fetch_order_details(order_id, offer_id) system_content = "[ORDER.CREATED]\n" + json.dumps(order_data, default=str) sys_event = Event( invocation_id=f"bkgd_sys_{order_id}", author="system", content=types.Content( role="system", parts=[types.Part(text=system_content)] ), actions=EventActions() ) await svc.append_event(session=session, event=sys_event) # 2. Generate agent response  prompt = "Summarize next steps based on the above order data." result = await non_streaming_agent.invoke( content=types.Content( role="user", parts=[types.Part(text=prompt)] ) ) assistant_text = next( (p.text for p in result.parts if getattr(p, "text", None)), None ) if assistant_text: # 3. Inject agent response  assistant_event = Event( invocation_id=f"bkgd_asst_{order_id}", author="assistant", content=types.Content( role="assistant", parts=[types.Part(text=assistant_text)] ), actions=EventActions() ) await svc.append_event(session=session, event=assistant_event) 
Enter fullscreen mode Exit fullscreen mode

When to use:

  • System notifications
  • Background data processing
  • Automated agent responses
  • Data synchronization events

2. State Management with State Deltas

Update session state without visible content using the EventActions.state_delta field. This pattern leverages the EventActions class and is essential for maintaining session metadata and user context.

Authentication State Management

async def inject_admin_authentication( session: Session, svc: DatabaseSessionService, auth_token: str, admin_user_id: str, customer_user_id: str = None ) -> None: """Inject admin authentication state into session""" # Build state delta  state_delta = { "session:auth_token": auth_token, "session:admin_user_id": admin_user_id, "session:admin_mode": True } if customer_user_id: state_delta["session:customer_user_id"] = customer_user_id # Create state update event  admin_auth_evt = Event( invocation_id="inject_admin_auth", author="system", actions=EventActions(state_delta=state_delta) ) await svc.append_event(session=session, event=admin_auth_evt) 
Enter fullscreen mode Exit fullscreen mode

User Preference Management

async def update_user_preferences( session: Session, svc: DatabaseSessionService, timezone: str = None, offset_minutes: int = None ) -> None: """Update user timezone and offset preferences""" state_delta = {} if timezone is not None: state_delta["user:time_zone"] = timezone if offset_minutes is not None: state_delta["user:offset_minutes"] = int(offset_minutes) if state_delta: actions = EventActions(state_delta=state_delta) tz_evt = Event( invocation_id=str(uuid.uuid4()), author="system", actions=actions, timestamp=time.time() ) await svc.append_event(session=session, event=tz_evt) 
Enter fullscreen mode Exit fullscreen mode

When to use:

  • Authentication state updates
  • User preference management
  • Session metadata storage
  • Temporary session data

3. Chat Service Integration

For user-facing notifications that appear in the chat UI. This pattern bridges ADK sessions with chat interfaces.

Status Notifications

async def process_status_notification( session_id: str, user_id: str, order_id: str, event_type: str, status_data: StatusNotificationData, admin_user: str ) -> None: """Process status notification and inject into chat session""" try: # Create formatted message  message_content = _format_status_message(event_type, status_data, order_id) # Validate session  is_valid, error_msg = _validate_session_for_notification(db, session_id, user_id) if not is_valid: logger.error(f"Session validation failed: {error_msg}") return # Inject via chat service  chat_service = ChatService(db) status_dict = status_data.model_dump() # Ensure datetime serialization  for key, value in status_dict.items(): if isinstance(value, datetime): status_dict[key] = value.isoformat() await chat_service.inject_system_message( session_id=session_id, user_id=user_id, message_content=message_content, message_type="status_notification", metadata={ "order_id": order_id, "event_type": event_type, "admin_user": admin_user, "timestamp": datetime.utcnow().isoformat(), "status_data": status_dict } ) except Exception as e: logger.error(f"Failed to inject status notification: {e}") # Don't raise - webhook should still succeed 
Enter fullscreen mode Exit fullscreen mode

Payment Notifications

async def inject_payment_notification( session_id: str, user_email: str, payment_type: str, amount: float, reference: str ) -> None: """Inject payment notification into chat session""" message = f""" 💳 Payment {payment_type.title()} Notification Amount: ${amount:.2f} Reference: {reference} Your payment has been processed successfully! """ try: chat_service = ChatService() await chat_service.inject_system_message( session_id=session_id, user_id=user_email, message_content=message, message_type="payment_notification", metadata={ "payment_type": payment_type, "amount": amount, "reference": reference, "timestamp": datetime.utcnow().isoformat() } ) except Exception as e: logger.error(f"Failed to inject payment notification: {e}") 
Enter fullscreen mode Exit fullscreen mode

When to use:

  • User-facing notifications
  • Status updates
  • Payment alerts
  • System status messages

Event Extraction Patterns

1. Direct Session Access

The simplest way to access all events in a session.

async def get_session_history( user_id: str, session_id: str ) -> List[Event]: """Retrieve complete session history""" svc = DatabaseSessionService(db_url=settings.sql_database_url) session = await svc.get_session( app_name="my_ai_app", user_id=user_id, session_id=session_id ) return session.events # Complete event history 
Enter fullscreen mode Exit fullscreen mode

2. Intelligent Text Extraction

Process agent responses safely with proper error handling. This pattern is based on the ADK documentation for identifying and extracting text content from events.

def _extract_text_from_event(evt) -> str | None: """ Safely extract the first text part from an ADK event. Handles cases where evt.content might be None and provides robust error handling for production systems. """ content = getattr(evt, "content", None) if content is None: return None parts = getattr(content, "parts", None) if not parts: return None first_part = parts[0] # Skip function calls - they are handled internally by the runner  if getattr(first_part, "function_call", None) is not None: return None return getattr(first_part, "text", None) or None # Usage in agent response processing async def process_agent_response(invocation_events: List[Event]) -> str: """Process agent response events and extract text content""" response_parts = [] content = None try: for i, evt in enumerate(invocation_events): maybe_text = _extract_text_from_event(evt) if maybe_text: response_parts.append(maybe_text) content = maybe_text # Keep the last non-empty response  logger.debug(f"Extracted text from event {i}: {maybe_text[:100]}...") logger.info(f"Response extraction completed: {len(response_parts)} text parts found") except Exception as extraction_error: logger.error(f"Response extraction failed: {extraction_error}", exc_info=True) if response_parts: content = response_parts[-1] # Use last successful extraction  if content is None: raise ValueError("No response generated by agent") return content 
Enter fullscreen mode Exit fullscreen mode

3. Specialized Event Extraction

Extract specific event types like payment events, errors, or custom data structures.

Payment Event Extraction

def _extract_payment_event_from_event(evt) -> dict | None: """Extract payment events from agent tool execution events""" try: content = getattr(evt, "content", None) if content is None: return None parts = getattr(content, "parts", None) if not parts: return None # Look for function result parts that might contain payment events  for part in parts: function_result = getattr(part, "function_result", None) if function_result is None: continue # Get the result content  result_content = getattr(function_result, "content", None) if result_content is None: continue # Parse the result as JSON to check for payment events  try: if isinstance(result_content, str): result_data = json.loads(result_content) elif isinstance(result_content, dict): result_data = result_content else: continue # Check if this result contains a payment event  if isinstance(result_data, dict) and result_data.get('payment_event'): payment_event = result_data['payment_event'] if payment_event.get('type') == 'open_payment_modal': return payment_event except (json.JSONDecodeError, AttributeError): continue return None except Exception as e: logger.debug(f"Error extracting payment event: {e}") return None # Usage in payment processing async def process_payment_events(invocation_events: List[Event]) -> dict | None: """Process agent events and extract payment information""" payment_event = None try: for evt in invocation_events: payment_evt = _extract_payment_event_from_event(evt) if payment_evt: payment_event = payment_evt logger.info(f"Payment event detected: {payment_evt.get('type', 'unknown')}") break except Exception as e: logger.error(f"Payment event processing failed: {e}") return payment_event 
Enter fullscreen mode Exit fullscreen mode

Detecting Final Responses

According to the ADK documentation, you can use the built-in event.is_final_response() method to identify events suitable for display as the agent's complete output:

async def process_agent_events(invocation_events: List[Event]) -> str: """Process agent events and extract final response""" full_response_text = "" final_response = None for event in invocation_events: # Accumulate streaming text if needed  if event.partial and event.content and event.content.parts and event.content.parts[0].text: full_response_text += event.content.parts[0].text # Check if it's a final, displayable event  if event.is_final_response(): if event.content and event.content.parts and event.content.parts[0].text: # Use accumulated text for final response  final_response = full_response_text + (event.content.parts[0].text if not event.partial else "") break return final_response or "No final response generated" 
Enter fullscreen mode Exit fullscreen mode

When is_final_response() returns True:

  • The event contains a tool result with skip_summarization=True
  • The event contains a tool call for a long-running tool
  • All of the following are met:
    • No function calls
    • No function responses
    • Not a partial stream chunk
    • Doesn't end with code execution results

Error Event Extraction

def _extract_error_from_event(evt) -> dict | None: """Extract error information from agent events""" try: content = getattr(evt, "content", None) if content is None: return None parts = getattr(content, "parts", None) if not parts: return None for part in parts: # Check for error indicators  if hasattr(part, 'text') and part.text: text = part.text.lower() if any(keyword in text for keyword in ['error', 'failed', 'exception']): return { 'type': 'error', 'message': part.text, 'timestamp': getattr(evt, 'timestamp', None) } except Exception as e: logger.debug(f"Error extracting error event: {e}") return None 
Enter fullscreen mode Exit fullscreen mode

Real-World Implementation Examples

Session Migration Pattern

async def migrate_anonymous_session( svc: DatabaseSessionService, anonymous_session_id: str, authenticated_user_id: str ) -> Session: """Migrate anonymous session to authenticated user""" # Get anonymous session  anonymous_session = await svc.get_session( app_name="my_ai_app", user_id="anonymous", session_id=anonymous_session_id ) if not anonymous_session: raise ValueError("Anonymous session not found") # Get all events from anonymous session  anonymous_events = anonymous_session.events # Create new session for authenticated user  session = await svc.create_session( app_name="my_ai_app", user_id=authenticated_user_id, session_id=anonymous_session_id # Keep same session ID  ) # Migrate events (filtering out system events that will be recreated)  migrated_events = 0 if anonymous_events: for event in anonymous_events: # Skip system events that will be recreated  if event.author == "system" and event.invocation_id.startswith("sys_"): continue # Re-inject the event into the new session  await svc.append_event(session=session, event=event) migrated_events += 1 logger.info(f"Migrated {migrated_events} events from anonymous to authenticated session") return session 
Enter fullscreen mode Exit fullscreen mode

Real-time Event Processing

async def process_live_events(live_events): """Process live agent events in real-time""" async for event in live_events: try: # Extract text content  text_content = _extract_text_from_event(event) if text_content: yield { "type": "text", "content": text_content, "timestamp": getattr(event, 'timestamp', time.time()) } # Extract payment events  payment_event = _extract_payment_event_from_event(event) if payment_event: yield { "type": "payment", "content": payment_event, "timestamp": getattr(event, 'timestamp', time.time()) } except Exception as e: logger.error(f"Error processing live event: {e}") yield { "type": "error", "content": str(e), "timestamp": time.time() } 
Enter fullscreen mode Exit fullscreen mode

Best Practices and Common Pitfalls

1. Session Validation

Always validate sessions before injection:

def _validate_session_for_notification( db: Session, session_id: str, user_id: str ) -> Tuple[bool, Optional[str]]: """Validate session exists and belongs to user""" try: # Check if session exists in chat system  room = db.query(ChatRoom).filter( ChatRoom.id == UUID(session_id), ChatRoom.user_id == user_id ).first() if not room: return False, f"Session {session_id} not found for user {user_id}" return True, None except Exception as e: return False, f"Session validation error: {str(e)}" 
Enter fullscreen mode Exit fullscreen mode

2. Error Handling

Implement robust error handling to prevent system failures:

async def safe_event_injection( svc: DatabaseSessionService, session: Session, event: Event ) -> bool: """Safely inject event with error handling""" try: await svc.append_event(session=session, event=event) return True except Exception as e: logger.error(f"Failed to inject event: {e}") # Don't raise - allow system to continue  return False 
Enter fullscreen mode Exit fullscreen mode

3. Unique Invocation IDs

Use unique invocation IDs to prevent duplicate events:

def generate_unique_invocation_id(prefix: str) -> str: """Generate unique invocation ID with timestamp and UUID""" timestamp = int(time.time()) uuid_suffix = str(uuid.uuid4())[:8] return f"{prefix}_{timestamp}_{uuid_suffix}" 
Enter fullscreen mode Exit fullscreen mode

4. Event Ordering

Maintain proper event ordering:

async def inject_ordered_events( svc: DatabaseSessionService, session: Session, events: List[Event] ) -> None: """Inject events in proper order""" # Sort events by timestamp if available  sorted_events = sorted( events, key=lambda e: getattr(e, 'timestamp', 0) ) for event in sorted_events: await svc.append_event(session=session, event=event) 
Enter fullscreen mode Exit fullscreen mode

Performance Considerations

1. Batch Operations

For multiple events, consider batch operations:

async def batch_event_injection( svc: DatabaseSessionService, session: Session, events: List[Event] ) -> None: """Inject multiple events efficiently""" # Process events in batches to avoid overwhelming the system  batch_size = 10 for i in range(0, len(events), batch_size): batch = events[i:i + batch_size] # Process batch concurrently  tasks = [ svc.append_event(session=session, event=event) for event in batch ] await asyncio.gather(*tasks, return_exceptions=True) 
Enter fullscreen mode Exit fullscreen mode

2. Event Filtering

Filter events during extraction to improve performance:

def filter_recent_events(events: List[Event], hours: int = 24) -> List[Event]: """Filter events to only recent ones""" cutoff_time = time.time() - (hours * 3600) return [ event for event in events if getattr(event, 'timestamp', 0) > cutoff_time ] 
Enter fullscreen mode Exit fullscreen mode

3. Memory Management

For large session histories, implement pagination:

async def get_paginated_events( svc: DatabaseSessionService, user_id: str, session_id: str, page: int = 0, page_size: int = 100 ) -> List[Event]: """Get paginated session events""" session = await svc.get_session( app_name="my_ai_app", user_id=user_id, session_id=session_id ) all_events = session.events start_idx = page * page_size end_idx = start_idx + page_size return all_events[start_idx:end_idx] 
Enter fullscreen mode Exit fullscreen mode

Conclusion

Mastering ADK DatabaseSessionService event injection and extraction is crucial for building production-ready conversational AI systems. The patterns covered in this guide provide a solid foundation for:

  • System Integration: Seamlessly integrating with external systems
  • Real-time Processing: Handling live events and notifications
  • State Management: Maintaining conversation context
  • Error Handling: Building robust, fault-tolerant systems

Key Takeaways

  1. Choose the right injection pattern for your use case
  2. Implement robust error handling to prevent system failures
  3. Use unique invocation IDs to avoid duplicate events
  4. Consider performance implications for large-scale deployments
  5. Validate sessions before injection operations

Next Steps

  • Experiment with the patterns in your own projects
  • Consider implementing monitoring for event injection/extraction
  • Explore advanced patterns like event streaming and real-time processing
  • Share your experiences and learnings with the community

The ADK DatabaseSessionService is a powerful tool when used correctly. These patterns will help you build sophisticated conversational AI systems that can handle real-world complexity while maintaining reliability and performance.


References

This guide is based on the official Google ADK documentation and source code. Here are the key references used:

Official Documentation

Source Code References

Key Concepts Referenced

  • Event Structure: Based on the Event class extending LlmResponse with ADK-specific fields
  • Event Flow: Processing flow from generation to persistence as documented in ADK docs
  • State Management: Using EventActions.state_delta for session state updates
  • Session Persistence: How DatabaseSessionService.append_event() handles event storage
  • Event Identification: Methods for identifying event types as described in ADK documentation

Additional Resources


Have you implemented similar patterns in your projects? Share your experiences and any additional insights in the comments below!

Top comments (4)

Collapse
 
sammychinedu2ky profile image
sammychinedu2ky

Quite elaborate 😎

Collapse
 
vicradon profile image
Osinachi Chukwujama

Detailed and well written. In what cases might we use Threadpool executor instead of simple batch processing as part of the performance consideration?

Collapse
 
kalio profile image
Kalio Princewill

This puts perspective to an issue I had recently, thanks

Collapse
 
parag_nandy_roy profile image
Parag Nandy Roy

This is a goldmine for anyone diving into ADK...