Temporal provides durable execution for your agent workflows, enabling automatic retries, pause/resume capabilities, and time-travel debugging. Perfect for production deployments.
Why Temporal? mcp-agent supports both asyncio and temporal execution engines. While asyncio works great for development and simple workflows, Temporal is recommended for production deployments because it provides:
Durable Execution Workflows survive failures, restarts, and infrastructure issues
Automatic Retries Failed activities are automatically retried with configurable policies
Pause & Resume Workflows can be paused indefinitely and resumed with new data
Observability Complete workflow history and time-travel debugging via Temporal UI
Scalability Distribute workflow execution across multiple workers
Long-Running Workflows Support for workflows that run for days, weeks, or months
Quick Start
Install Temporal CLI
Install the Temporal CLI for local development: # macOS brew install temporal # Linux/WSL curl -sSf https://temporal.download/cli.sh | sh # Windows # Download from https://github.com/temporalio/cli/releases
Start Temporal Server
Run a local Temporal server for development: temporal server start-dev
This starts: Temporal Server on localhost:7233 Web UI on http://localhost:8233
Configure mcp-agent
Update your mcp_agent.config.yaml: execution_engine : temporal # Optional: preload modules that register @workflow_task activities workflow_task_modules : - my_package.custom_temporal_tasks # Optional: override retry behaviour for specific workflow tasks/activities workflow_task_retry_policies : my_package.custom_temporal_tasks.my_activity : maximum_attempts : 1 temporal : host : localhost port : 7233 namespace : default task_queue : mcp-agent max_concurrent_activities : 10
mcp-agent preloads its built-in LLM providers automatically. Add extra modules when you register custom @workflow_task activities outside the core packages so the worker can discover them before starting. Entries are standard Python import paths. The optional workflow_task_retry_policies mapping lets you tune Temporal retry behaviour per activity (supports exact names, wildcards like prefix*, or *). For provider SDKs, common non-retryable error types include: OpenAI/Azure OpenAI: AuthenticationError, PermissionDeniedError, BadRequestError, NotFoundError, UnprocessableEntityError. Anthropic: AuthenticationError, PermissionDeniedError, BadRequestError, NotFoundError, UnprocessableEntityError. Azure AI Inference: HttpResponseError (400/401/403/404/422). Google GenAI: InvalidArgument, FailedPrecondition, PermissionDenied, NotFound, Unauthenticated. mcp-agent raises a WorkflowApplicationError for these cases so Temporal (or the asyncio executor) avoids retry loops even when the Temporal SDK is not installed locally.
Create Worker
Create a worker to process workflows: import asyncio from mcp_agent.app import MCPApp from mcp_agent.executor.workflow import Workflow, WorkflowResult from mcp_agent.executor.temporal import create_temporal_worker_for_app app = MCPApp( name = "my_agent" ) # Define your workflows here @app.workflow class MyWorkflow (Workflow[ str ]): @app.workflow_run async def run ( self , input : str ) -> WorkflowResult[ str ]: return WorkflowResult( value = f "Processed: { input } " ) async def main (): async with create_temporal_worker_for_app(app) as worker: await worker.run() if __name__ == "__main__" : asyncio.run(main())
Run Workflow
Execute your workflow: import asyncio from mcp_agent.app import MCPApp app = MCPApp( name = "my_agent" ) async def main (): async with app.run() as agent_app: executor = agent_app.executor # Start workflow handle = await executor.start_workflow( "MyWorkflow" , "Hello Temporal!" ) # Wait for result result = await handle.result() print ( f "Result: { result } " ) if __name__ == "__main__" : asyncio.run(main())
Temporal Architecture Core Components Temporal’s architecture provides robust workflow orchestration through several key components:
Temporal Server Manages workflow state, persists event history, and coordinates execution
Workers Execute workflow and activity code, poll for tasks from the server
Event Store Immutable log of all workflow events, enabling replay and fault tolerance
Task Queues Distribute work between server and workers, enabling load balancing
Benefits of Temporal Architecture Durability & Fault Tolerance: Temporal’s event sourcing model ensures that every workflow step is persisted. If a worker crashes, another worker can pick up where it left off by replaying the event history. # This workflow will survive any infrastructure failure @app.workflow class ResilientWorkflow (Workflow[ dict ]): @app.workflow_run async def run ( self , data : dict ) -> WorkflowResult[ dict ]: # Step 1: Process data (checkpointed) result1 = await self .process_step_1(data) # Step 2: Validate results (checkpointed) result2 = await self .validate_step_2(result1) # Step 3: Finalize (checkpointed) # If worker crashes here, it will resume from this point result3 = await self .finalize_step_3(result2) return WorkflowResult( value = result3)
Automatic Retries & Exponential Backoff: Temporal handles activity failures with configurable retry policies: from temporalio.common import RetryPolicy from datetime import timedelta @app.workflow class RetryWorkflow (Workflow[ str ]): @app.workflow_run async def run ( self , input : str ) -> WorkflowResult[ str ]: # Configure retry policy for this activity retry_policy = RetryPolicy( initial_interval = timedelta( seconds = 1 ), maximum_interval = timedelta( minutes = 5 ), backoff_coefficient = 2.0 , maximum_attempts = 10 , non_retryable_error_types = [ "ValidationError" ] ) # This will automatically retry on failure result = await workflow.execute_activity( self .unreliable_activity, input , start_to_close_timeout = timedelta( minutes = 5 ), retry_policy = retry_policy ) return WorkflowResult( value = result) async def unreliable_activity ( self , data : str ) -> str : """Activity that might fail and needs retries.""" # Simulate unreliable external API call agent = Agent( name = "api_caller" , server_names = [ "http" ]) async with agent: llm = await agent.attach_llm(OpenAIAugmentedLLM) return await llm.generate_str( f "Process via API: { data } " )
Activity vs Workflow Distinction Workflows are orchestration logic that must be deterministic: No direct I/O operations No random number generation without seeds No current time checks (use workflow.now()) Pure coordination and decision making Activities handle non-deterministic operations: External API calls Database operations File I/O Any side effects @app.workflow class ProperWorkflow (Workflow[ dict ]): @app.workflow_run async def run ( self , input : dict ) -> WorkflowResult[ dict ]: # ✅ Workflow: Pure orchestration logic if input .get( "requires_validation" ): # ✅ Call activity for external operations validated = await workflow.execute_activity( self .validate_data_activity, input , start_to_close_timeout = timedelta( minutes = 2 ) ) # ✅ Workflow: Decision making based on results if validated.get( "is_valid" ): return await self .process_valid_data(validated) else : return await self .handle_invalid_data(validated) return WorkflowResult( value = input ) async def validate_data_activity ( self , data : dict ) -> dict : """❌ Activity: Non-deterministic operations allowed here.""" agent = Agent( name = "validator" , server_names = [ "database" , "api" ]) async with agent: llm = await agent.attach_llm(OpenAIAugmentedLLM) # ✅ External I/O operations in activities validation_result = await llm.generate_str( f "Validate this data against external service: { data } " ) return { "is_valid" : "valid" in validation_result.lower(), "result" : validation_result}
Advanced Workflow Features Signal and Query Handlers Signals allow external systems to communicate with running workflows: from temporalio import workflow from typing import Optional @app.workflow class ApprovalWorkflow (Workflow[ dict ]): def __init__ ( self ): self .approval_status: Optional[ str ] = None self .approval_comments: Optional[ str ] = None @workflow.signal async def approve_signal ( self , comments : str ): """Signal handler for approval.""" self .approval_status = "approved" self .approval_comments = comments @workflow.signal async def reject_signal ( self , reason : str ): """Signal handler for rejection.""" self .approval_status = "rejected" self .approval_comments = reason @workflow.query def get_status ( self ) -> dict : """Query handler to check current status.""" return { "status" : self .approval_status, "comments" : self .approval_comments } @app.workflow_run async def run ( self , document : dict ) -> WorkflowResult[ dict ]: # Process initial document agent = Agent( name = "processor" , server_names = [ "filesystem" ]) async with agent: llm = await agent.attach_llm(OpenAIAugmentedLLM) processed = await llm.generate_str( f "Process document: { document } " ) # Wait for approval signal (can wait indefinitely) await workflow.wait_condition( lambda : self .approval_status is not None ) if self .approval_status == "approved" : # Continue with approved workflow async with agent: finalized = await llm.generate_str( f "Finalize approved document: { processed } . Comments: { self .approval_comments } " ) return WorkflowResult( value = { "status" : "completed" , "document" : finalized, "approval_comments" : self .approval_comments }) else : # Handle rejection return WorkflowResult( value = None , error = f "Document rejected: { self .approval_comments } " ) # Send signals from external code async def send_approval (): async with app.run() as agent_app: executor = agent_app.executor # Send approval signal to running workflow await executor.signal_workflow( "ApprovalWorkflow" , "workflow-123" , "approve_signal" , "Document looks good after review!" ) # Query workflow status status = await executor.query_workflow( "ApprovalWorkflow" , "workflow-123" , "get_status" ) print ( f "Workflow status: { status } " )
Workflow Versioning Handle workflow updates without breaking running instances: @app.workflow class VersionedWorkflow (Workflow[ dict ]): @app.workflow_run async def run ( self , data : dict ) -> WorkflowResult[ dict ]: # Use versioning for backward compatibility version = workflow.get_version( "data_processing_logic" , 1 , 3 ) if version == 1 : # Original processing logic result = await self .process_v1(data) elif version == 2 : # Enhanced processing with validation validated = await self .validate_data(data) result = await self .process_v2(validated) else : # version == 3 # Latest version with advanced features validated = await self .validate_data_v2(data) enriched = await self .enrich_data(validated) result = await self .process_v3(enriched) # Common post-processing (no versioning needed) final_result = await self .post_process(result) return WorkflowResult( value = final_result) async def process_v1 ( self , data : dict ) -> dict : """Original processing logic.""" agent = Agent( name = "processor_v1" , server_names = [ "filesystem" ]) async with agent: llm = await agent.attach_llm(OpenAIAugmentedLLM) return await llm.generate_str( f "Process v1: { data } " ) async def process_v2 ( self , data : dict ) -> dict : """Enhanced processing with validation.""" agent = Agent( name = "processor_v2" , server_names = [ "filesystem" , "validation" ]) async with agent: llm = await agent.attach_llm(OpenAIAugmentedLLM) return await llm.generate_str( f "Process v2 with validation: { data } " ) async def process_v3 ( self , data : dict ) -> dict : """Latest version with advanced features.""" agent = Agent( name = "processor_v3" , server_names = [ "filesystem" , "validation" , "ml" ]) async with agent: llm = await agent.attach_llm(OpenAIAugmentedLLM) return await llm.generate_str( f "Process v3 with ML enhancement: { data } " )
Workflow Timeouts and Cancellation Configure comprehensive timeout policies: from datetime import timedelta @app.workflow class TimeoutWorkflow (Workflow[ dict ]): @app.workflow_run async def run ( self , data : dict ) -> WorkflowResult[ dict ]: try : # Set workflow-level timeout async with workflow.timeout(timedelta( hours = 2 )): # Step 1: Quick processing (30 seconds max) result1 = await workflow.execute_activity( self .quick_process, data, start_to_close_timeout = timedelta( seconds = 30 ) ) # Step 2: Medium processing (5 minutes max) result2 = await workflow.execute_activity( self .medium_process, result1, start_to_close_timeout = timedelta( minutes = 5 ), heartbeat_timeout = timedelta( seconds = 30 ) # For long-running activities ) # Step 3: Long processing (1 hour max) result3 = await workflow.execute_activity( self .long_process, result2, start_to_close_timeout = timedelta( hours = 1 ), schedule_to_close_timeout = timedelta( hours = 1 , minutes = 30 ) ) return WorkflowResult( value = result3) except workflow.TimeoutError: # Handle timeout gracefully return WorkflowResult( value = None , error = "Workflow timed out after 2 hours" ) except workflow.CancelledError: # Handle cancellation return WorkflowResult( value = None , error = "Workflow was cancelled" ) async def long_process ( self , data : dict ) -> dict : """Long-running activity with heartbeat.""" agent = Agent( name = "long_processor" , server_names = [ "ml" , "database" ]) async with agent: llm = await agent.attach_llm(OpenAIAugmentedLLM) # Send heartbeats for long operations for i in range ( 60 ): # Simulate 1-hour process # Send heartbeat every minute workflow.heartbeat( f "Processing step { i + 1 } /60" ) # Do some processing partial_result = await llm.generate_str( f "Process chunk { i } : { data } " ) # Sleep for 1 minute (simulated) await asyncio.sleep( 60 ) return { "processed" : True , "data" : data}
Core Concepts Workflow Definition Temporal workflows are defined the same way as asyncio workflows: from mcp_agent.app import MCPApp from mcp_agent.executor.workflow import Workflow, WorkflowResult from mcp_agent.agents.agent import Agent from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM app = MCPApp( name = "temporal_agent" ) @app.workflow class DurableWorkflow (Workflow[ dict ]): """A durable workflow that can survive failures.""" @app.workflow_run async def run ( self , request : dict ) -> WorkflowResult[ dict ]: # This workflow is durable - it will resume from # where it left off if the worker crashes agent = Agent( name = "analyst" , instruction = "Analyze the provided data thoroughly." , server_names = [ "fetch" , "filesystem" ] ) async with agent: llm = await agent.attach_llm(OpenAIAugmentedLLM) # Each step is automatically checkpointed step1 = await llm.generate_str( f "Analyze: { request[ 'data' ] } " ) step2 = await llm.generate_str( f "Summarize findings: { step1 } " ) step3 = await llm.generate_str( f "Generate report: { step2 } " ) return WorkflowResult( value = { "analysis" : step1, "summary" : step2, "report" : step3 })
Signals for Human-in-the-Loop Implement workflows that wait for human input: from mcp_agent.executor.temporal import Signal @app.workflow class ApprovalWorkflow (Workflow[ str ]): @app.workflow_run async def run ( self , document : str ) -> WorkflowResult[ str ]: # Process document with AI agent = Agent( name = "processor" , instruction = "Process and improve the document." , server_names = [ "filesystem" ] ) async with agent: llm = await agent.attach_llm(OpenAIAugmentedLLM) processed = await llm.generate_str( f "Improve this document: { document } " ) # Wait for human approval print ( f "Waiting for approval. Workflow ID: { self .id } , Run ID: { self .run_id } " ) await app.context.executor.signal_bus.wait_for_signal( Signal( name = "approve" , workflow_id = self .id, run_id = self .run_id) ) # Continue after approval print ( "Approval received! Finalizing document..." ) async with agent: llm = await agent.attach_llm(OpenAIAugmentedLLM) finalized = await llm.generate_str( f "Finalize approved document: { processed } " ) return WorkflowResult( value = finalized)
Send signals from external code: # Send approval signal await app.context.executor.signal_bus.send_signal( Signal( name = "approve" , workflow_id = "ApprovalWorkflow" , run_id = "run_abc123" , payload = { "approved_by" : "john.doe" , "comments" : "Looks good!" } ) )
Long-Running Workflows Handle workflows that run for extended periods: import asyncio from datetime import timedelta @app.workflow class MonitoringWorkflow (Workflow[ dict ]): @app.workflow_run async def run ( self , config : dict ) -> WorkflowResult[ dict ]: monitoring_results = [] # Run for 30 days, checking every hour for day in range ( 30 ): for hour in range ( 24 ): # Durable sleep - survives restarts await asyncio.sleep( 3600 ) # 1 hour # Check system status agent = Agent( name = "monitor" , instruction = "Check system health and report issues." , server_names = [ "fetch" ] ) async with agent: llm = await agent.attach_llm(OpenAIAugmentedLLM) status = await llm.generate_str( f "Check status of: { config[ 'systems' ] } " ) monitoring_results.append({ "day" : day, "hour" : hour, "status" : status }) # Alert if issues found if "critical" in status.lower(): await self .send_alert(status) return WorkflowResult( value = { "monitoring_complete" : monitoring_results})
Advanced Patterns Parallel Agent Execution Run multiple agents in parallel with Temporal: import asyncio @app.workflow class ParallelAnalysisWorkflow (Workflow[ dict ]): @app.workflow_run async def run ( self , document : str ) -> WorkflowResult[ dict ]: # Define parallel tasks async def analyze_sentiment (): agent = Agent( name = "sentiment" , instruction = "Analyze sentiment." ) async with agent: llm = await agent.attach_llm(OpenAIAugmentedLLM) return await llm.generate_str( f "Analyze sentiment: { document } " ) async def extract_entities (): agent = Agent( name = "entities" , instruction = "Extract entities." ) async with agent: llm = await agent.attach_llm(OpenAIAugmentedLLM) return await llm.generate_str( f "Extract entities: { document } " ) async def summarize (): agent = Agent( name = "summarizer" , instruction = "Summarize content." ) async with agent: llm = await agent.attach_llm(OpenAIAugmentedLLM) return await llm.generate_str( f "Summarize: { document } " ) # Execute in parallel - Temporal handles orchestration sentiment, entities, summary = await asyncio.gather( analyze_sentiment(), extract_entities(), summarize() ) return WorkflowResult( value = { "sentiment" : sentiment, "entities" : entities, "summary" : summary })
Workflow Composition Compose complex workflows from simpler ones: @app.workflow class DataPipelineWorkflow (Workflow[ dict ]): @app.workflow_run async def run ( self , source : str ) -> WorkflowResult[ dict ]: # Step 1: Data extraction workflow extraction = DataExtractionWorkflow() data = await extraction.run(source) # Step 2: Data validation workflow validation = DataValidationWorkflow() validated = await validation.run(data.value) # Step 3: Data processing workflow processing = DataProcessingWorkflow() processed = await processing.run(validated.value) # Step 4: Report generation workflow reporting = ReportGenerationWorkflow() report = await reporting.run(processed.value) return WorkflowResult( value = { "data" : data.value, "validation" : validated.value, "processed" : processed.value, "report" : report.value })
Error Handling with Compensations Implement saga pattern for distributed transactions: @app.workflow class OrderProcessingWorkflow (Workflow[ dict ]): @app.workflow_run async def run ( self , order : dict ) -> WorkflowResult[ dict ]: compensations = [] try : # Step 1: Reserve inventory inventory_agent = Agent( name = "inventory" , server_names = [ "database" ]) async with inventory_agent: llm = await inventory_agent.attach_llm(OpenAIAugmentedLLM) reservation = await llm.generate_str( f "Reserve items: { order[ 'items' ] } " ) compensations.append(( "inventory" , reservation)) # Step 2: Process payment payment_agent = Agent( name = "payment" , server_names = [ "payment_api" ]) async with payment_agent: llm = await payment_agent.attach_llm(OpenAIAugmentedLLM) payment = await llm.generate_str( f "Process payment: { order[ 'total' ] } " ) compensations.append(( "payment" , payment)) # Step 3: Ship order shipping_agent = Agent( name = "shipping" , server_names = [ "shipping_api" ]) async with shipping_agent: llm = await shipping_agent.attach_llm(OpenAIAugmentedLLM) shipment = await llm.generate_str( f "Ship to: { order[ 'address' ] } " ) return WorkflowResult( value = { "success" : True , "reservation" : reservation, "payment" : payment, "shipment" : shipment }) except Exception as e: # Run compensations in reverse order for service, data in reversed (compensations): await self .compensate(service, data) return WorkflowResult( value = None , error = f "Order failed: { e } . Compensations executed." ) async def compensate ( self , service : str , data : str ): """Execute compensation for failed step.""" agent = Agent( name = f " { service } _compensation" ) async with agent: llm = await agent.attach_llm(OpenAIAugmentedLLM) await llm.generate_str( f "Compensate { service } : { data } " )
Production Deployment Infrastructure Requirements Minimum Production Setup: Temporal Server cluster (3+ nodes for HA) PostgreSQL/MySQL database with replication Elasticsearch for visibility (optional but recommended) Load balancer for Temporal frontend Monitoring stack (Prometheus, Grafana) Resource Planning: # Example Kubernetes deployment apiVersion : apps/v1 kind : Deployment metadata : name : temporal-server spec : replicas : 3 selector : matchLabels : app : temporal-server template : spec : containers : - name : temporal image : temporalio/auto-setup:latest resources : requests : memory : "1Gi" cpu : "500m" limits : memory : "2Gi" cpu : "1000m" env : - name : DB value : postgresql - name : POSTGRES_SEEDS value : postgres-primary:5432 - name : DYNAMIC_CONFIG_FILE_PATH value : /etc/temporal/config/dynamicconfig.yaml
High Availability Configuration Configure Temporal for production resilience: # temporal-server-config.yaml persistence : defaultStore : default visibilityStore : visibility numHistoryShards : 512 datastores : default : sql : pluginName : postgres databaseName : temporal connectAddr : postgres-cluster:5432 connectProtocol : tcp maxConns : 20 maxIdleConns : 20 maxConnLifetime : 1h visibility : sql : pluginName : postgres databaseName : temporal_visibility connectAddr : postgres-cluster:5432 global : membership : maxJoinDuration : 30s broadcastAddress : 0.0.0.0 pprof : port : 7936 services : frontend : rpc : grpcPort : 7233 membershipPort : 6933 bindOnLocalHost : false history : rpc : grpcPort : 7234 membershipPort : 6934 bindOnLocalHost : false matching : rpc : grpcPort : 7235 membershipPort : 6935 bindOnLocalHost : false worker : rpc : grpcPort : 7236 membershipPort : 6936 bindOnLocalHost : false clusterMetadata : enableGlobalNamespace : true failoverVersionIncrement : 10 masterClusterName : primary currentClusterName : primary clusterInformation : primary : enabled : true initialFailoverVersion : 0 rpcName : frontend rpcAddress : 0.0.0.0:7233
Temporal Cloud For production, use Temporal Cloud: execution_engine : temporal temporal : host : your-namespace.tmprl.cloud port : 7233 namespace : your-namespace task_queue : mcp-agent-production tls : client_cert_path : /path/to/client.crt client_key_path : /path/to/client.key ca_cert_path : /path/to/ca.crt server_name : your-namespace.tmprl.cloud data_converter : encryption_key : ${TEMPORAL_ENCRYPTION_KEY} codec : aes256gcm retry_policy : initial_interval : 1 maximum_interval : 100 backoff_coefficient : 2 maximum_attempts : 50 auth : api_key : ${TEMPORAL_API_KEY} namespace : your-namespace
Security Best Practices Data Encryption: from temporalio.client import Client from temporalio.converter import EncryptionConverter, CompositeConverter from cryptography.fernet import Fernet # Generate encryption key (store securely) encryption_key = Fernet.generate_key() # Create encrypted client client = await Client.connect( "your-namespace.tmprl.cloud:7233" , namespace = "your-namespace" , data_converter = CompositeConverter( EncryptionConverter( encryption_key, compress = True # Enable compression ) ), tls = True )
Access Control: # RBAC configuration for Temporal namespaces namespaces : production : retention : "30d" archival : history : state : "enabled" uri : "s3://temporal-history-archive" visibility : state : "enabled" uri : "s3://temporal-visibility-archive" authorization : default_role : "worker" roles : admin : permissions : - "namespace:*" - "workflow:*" - "activity:*" worker : permissions : - "workflow:execute" - "activity:execute" monitor : permissions : - "workflow:read" - "namespace:read"
Network Security: # Network policies for Kubernetes apiVersion : networking.k8s.io/v1 kind : NetworkPolicy metadata : name : temporal-network-policy spec : podSelector : matchLabels : app : temporal policyTypes : - Ingress - Egress ingress : - from : - podSelector : matchLabels : app : mcp-agent-worker ports : - protocol : TCP port : 7233 egress : - to : - podSelector : matchLabels : app : postgres ports : - protocol : TCP port : 5432
Worker Scaling Scale workers for production workloads: # worker.py for production import asyncio from concurrent.futures import ThreadPoolExecutor from mcp_agent.executor.temporal import create_temporal_worker_for_app async def main (): # Create worker with production settings worker = await create_temporal_worker_for_app( app, task_queue = "mcp-agent-production" , max_concurrent_activities = 50 , max_concurrent_workflows = 20 , max_cached_workflows = 100 , activity_executor = ThreadPoolExecutor( max_workers = 100 ), ) # Run worker await worker.run() if __name__ == "__main__" : # Run multiple worker instances for scaling asyncio.run(main())
Monitoring and Observability Monitor workflows with Temporal UI and custom metrics: # Add custom metrics from temporalio.runtime import Runtime, TelemetryConfig, PrometheusConfig # Configure Prometheus metrics runtime = Runtime( telemetry = TelemetryConfig( metrics = PrometheusConfig( bind_address = "0.0.0.0:9090" ) ) ) # Track custom metrics in workflows @app.workflow class MetricWorkflow (Workflow[ str ]): @app.workflow_run async def run ( self , input : str ) -> WorkflowResult[ str ]: start_time = time.time() # Your workflow logic result = await self .process( input ) # Record metrics duration = time.time() - start_time app.context.metrics.record( "workflow_duration" , duration, { "workflow" : "MetricWorkflow" , "status" : "success" }) return WorkflowResult( value = result)
Debugging Temporal Web UI Access the Temporal Web UI at http://localhost:8233 to: View all workflow executions Inspect workflow history step-by-step See pending activities and their retry attempts Send signals and queries to running workflows Download workflow history for offline debugging Monitor worker health and task queues Workflow Replay Debug production issues by replaying workflow history: from temporalio.worker import Replayer import json async def debug_workflow (): # Download history from Temporal UI or API with open ( "workflow_history.json" ) as f: history = json.load(f) # Create replayer with your workflow definitions replayer = Replayer( workflows = [MyWorkflow]) # Replay workflow to debug try : await replayer.replay_workflow(history) print ( "Replay successful - workflow logic is correct" ) except Exception as e: print ( f "Replay failed - logic error: { e } " )
Testing with Time Skipping Test long-running workflows efficiently: import pytest from temporalio.testing import WorkflowEnvironment from temporalio.worker import Worker @pytest.mark.asyncio async def test_long_running_workflow (): # Start test environment with time skipping async with await WorkflowEnvironment.start_time_skipping() as env: # Create worker worker = Worker( env.client, task_queue = "test-queue" , workflows = [MonitoringWorkflow], ) async with worker: # Start workflow handle = await env.client.start_workflow( MonitoringWorkflow.run, { "systems" : [ "api" , "database" ]}, id = "test-monitoring" , task_queue = "test-queue" , ) # Time automatically advances during sleep # 30 days completes instantly in tests result = await handle.result() assert len (result[ "monitoring_complete" ]) == 720 # 30 days * 24 hours
Migration Guide From Asyncio to Temporal Your workflow code remains largely the same. Here’s what changes: Before - mcp_agent.config.yaml
After - mcp_agent.config.yaml
execution_engine : asyncio logger : transports : [ console ] level : info
Running Workflows Before - Asyncio
After - Temporal
async with app.run(): workflow = MyWorkflow() result = await workflow.run( "input" ) print (result.value)
Best Practices
Workflows must be deterministic. Avoid: Random number generation without seeds Current time checks (use workflow.now()) Direct I/O operations (use activities) Non-deterministic data structures
Configure timeouts for workflows and activities: handle = await executor.start_workflow( "MyWorkflow" , input_data, execution_timeout = timedelta( hours = 1 ), run_timeout = timedelta( minutes = 30 ), task_timeout = timedelta( minutes = 5 ), )
Set meaningful workflow IDs for idempotency: workflow_id = f "process-document- { document_id } " handle = await executor.start_workflow( "DocumentWorkflow" , document, workflow_id = workflow_id, id_reuse_policy = "allow_duplicate_failed_only" , )
Version your workflows for safe updates: @app.workflow class VersionedWorkflow (Workflow[ str ]): @app.workflow_run async def run ( self , input : str ) -> WorkflowResult[ str ]: version = workflow.get_version( "processing_logic" , 1 , 2 ) if version == 1 : # Old logic result = await self .process_v1( input ) else : # New logic result = await self .process_v2( input ) return WorkflowResult( value = result)
Common Patterns Polling External Systems @app.workflow class PollingWorkflow (Workflow[ dict ]): @app.workflow_run async def run ( self , job_id : str ) -> WorkflowResult[ dict ]: max_attempts = 100 for attempt in range (max_attempts): # Check job status agent = Agent( name = "checker" , server_names = [ "api" ]) async with agent: llm = await agent.attach_llm(OpenAIAugmentedLLM) status = await llm.generate_str( f "Check job status: { job_id } " ) if "completed" in status: return WorkflowResult( value = { "status" : "completed" , "result" : status}) if "failed" in status: return WorkflowResult( value = None , error = f "Job failed: { status } " ) # Wait before next poll (durable) await asyncio.sleep( 60 ) # 1 minute return WorkflowResult( value = None , error = "Job timed out" )
Scheduled Workflows @app.workflow class ScheduledWorkflow (Workflow[ None ]): @app.workflow_run async def run ( self , schedule : dict ) -> WorkflowResult[ None ]: """Run daily at specified time.""" while True : # Wait until next scheduled time next_run = self .calculate_next_run(schedule) await workflow.sleep_until(next_run) # Execute scheduled task await self .execute_scheduled_task() # Continue as new to prevent history growth workflow.continue_as_new(schedule)
Examples Explore complete Temporal examples: Next Steps