Skip to main content
Temporal provides durable execution for your agent workflows, enabling automatic retries, pause/resume capabilities, and time-travel debugging. Perfect for production deployments.

Why Temporal?

mcp-agent supports both asyncio and temporal execution engines. While asyncio works great for development and simple workflows, Temporal is recommended for production deployments because it provides:

Durable Execution

Workflows survive failures, restarts, and infrastructure issues

Automatic Retries

Failed activities are automatically retried with configurable policies

Pause & Resume

Workflows can be paused indefinitely and resumed with new data

Observability

Complete workflow history and time-travel debugging via Temporal UI

Scalability

Distribute workflow execution across multiple workers

Long-Running Workflows

Support for workflows that run for days, weeks, or months

Quick Start

1

Install Temporal CLI

Install the Temporal CLI for local development:
# macOS brew install temporal  # Linux/WSL curl -sSf https://temporal.download/cli.sh | sh  # Windows # Download from https://github.com/temporalio/cli/releases 
2

Start Temporal Server

Run a local Temporal server for development:
temporal server start-dev 
This starts:
  • Temporal Server on localhost:7233
  • Web UI on http://localhost:8233
3

Configure mcp-agent

Update your mcp_agent.config.yaml:
execution_engine: temporal  # Optional: preload modules that register @workflow_task activities workflow_task_modules:  - my_package.custom_temporal_tasks  # Optional: override retry behaviour for specific workflow tasks/activities workflow_task_retry_policies:  my_package.custom_temporal_tasks.my_activity:  maximum_attempts: 1  temporal:  host: localhost  port: 7233  namespace: default  task_queue: mcp-agent  max_concurrent_activities: 10 
mcp-agent preloads its built-in LLM providers automatically. Add extra modules when you register custom @workflow_task activities outside the core packages so the worker can discover them before starting. Entries are standard Python import paths. The optional workflow_task_retry_policies mapping lets you tune Temporal retry behaviour per activity (supports exact names, wildcards like prefix*, or *). For provider SDKs, common non-retryable error types include:
  • OpenAI/Azure OpenAI: AuthenticationError, PermissionDeniedError, BadRequestError, NotFoundError, UnprocessableEntityError.
  • Anthropic: AuthenticationError, PermissionDeniedError, BadRequestError, NotFoundError, UnprocessableEntityError.
  • Azure AI Inference: HttpResponseError (400/401/403/404/422).
  • Google GenAI: InvalidArgument, FailedPrecondition, PermissionDenied, NotFound, Unauthenticated. mcp-agent raises a WorkflowApplicationError for these cases so Temporal (or the asyncio executor) avoids retry loops even when the Temporal SDK is not installed locally.
4

Create Worker

Create a worker to process workflows:
worker.py
import asyncio from mcp_agent.app import MCPApp from mcp_agent.executor.workflow import Workflow, WorkflowResult from mcp_agent.executor.temporal import create_temporal_worker_for_app  app = MCPApp(name="my_agent")  # Define your workflows here @app.workflow class MyWorkflow(Workflow[str]):  @app.workflow_run  async def run(self, input: str) -> WorkflowResult[str]:  return WorkflowResult(value=f"Processed: {input}")  async def main():  async with create_temporal_worker_for_app(app) as worker:  await worker.run()  if __name__ == "__main__":  asyncio.run(main()) 
5

Run Workflow

Execute your workflow:
main.py
import asyncio from mcp_agent.app import MCPApp  app = MCPApp(name="my_agent")  async def main():  async with app.run() as agent_app:  executor = agent_app.executor    # Start workflow  handle = await executor.start_workflow(  "MyWorkflow",  "Hello Temporal!"  )    # Wait for result  result = await handle.result()  print(f"Result: {result}")  if __name__ == "__main__":  asyncio.run(main()) 

Temporal Architecture

Core Components

Temporal’s architecture provides robust workflow orchestration through several key components:

Temporal Server

Manages workflow state, persists event history, and coordinates execution

Workers

Execute workflow and activity code, poll for tasks from the server

Event Store

Immutable log of all workflow events, enabling replay and fault tolerance

Task Queues

Distribute work between server and workers, enabling load balancing

Benefits of Temporal Architecture

Durability & Fault Tolerance: Temporal’s event sourcing model ensures that every workflow step is persisted. If a worker crashes, another worker can pick up where it left off by replaying the event history.
# This workflow will survive any infrastructure failure @app.workflow class ResilientWorkflow(Workflow[dict]):  @app.workflow_run  async def run(self, data: dict) -> WorkflowResult[dict]:  # Step 1: Process data (checkpointed)  result1 = await self.process_step_1(data)    # Step 2: Validate results (checkpointed)  result2 = await self.validate_step_2(result1)    # Step 3: Finalize (checkpointed)  # If worker crashes here, it will resume from this point  result3 = await self.finalize_step_3(result2)    return WorkflowResult(value=result3) 
Automatic Retries & Exponential Backoff: Temporal handles activity failures with configurable retry policies:
from temporalio.common import RetryPolicy from datetime import timedelta  @app.workflow class RetryWorkflow(Workflow[str]):  @app.workflow_run  async def run(self, input: str) -> WorkflowResult[str]:  # Configure retry policy for this activity  retry_policy = RetryPolicy(  initial_interval=timedelta(seconds=1),  maximum_interval=timedelta(minutes=5),  backoff_coefficient=2.0,  maximum_attempts=10,  non_retryable_error_types=["ValidationError"]  )    # This will automatically retry on failure  result = await workflow.execute_activity(  self.unreliable_activity,  input,  start_to_close_timeout=timedelta(minutes=5),  retry_policy=retry_policy  )    return WorkflowResult(value=result)    async def unreliable_activity(self, data: str) -> str:  """Activity that might fail and needs retries."""  # Simulate unreliable external API call  agent = Agent(name="api_caller", server_names=["http"])  async with agent:  llm = await agent.attach_llm(OpenAIAugmentedLLM)  return await llm.generate_str(f"Process via API: {data}") 

Activity vs Workflow Distinction

Workflows are orchestration logic that must be deterministic:
  • No direct I/O operations
  • No random number generation without seeds
  • No current time checks (use workflow.now())
  • Pure coordination and decision making
Activities handle non-deterministic operations:
  • External API calls
  • Database operations
  • File I/O
  • Any side effects
@app.workflow class ProperWorkflow(Workflow[dict]):  @app.workflow_run  async def run(self, input: dict) -> WorkflowResult[dict]:  # ✅ Workflow: Pure orchestration logic  if input.get("requires_validation"):  # ✅ Call activity for external operations  validated = await workflow.execute_activity(  self.validate_data_activity,  input,  start_to_close_timeout=timedelta(minutes=2)  )    # ✅ Workflow: Decision making based on results  if validated.get("is_valid"):  return await self.process_valid_data(validated)  else:  return await self.handle_invalid_data(validated)    return WorkflowResult(value=input)    async def validate_data_activity(self, data: dict) -> dict:  """❌ Activity: Non-deterministic operations allowed here."""  agent = Agent(name="validator", server_names=["database", "api"])  async with agent:  llm = await agent.attach_llm(OpenAIAugmentedLLM)    # ✅ External I/O operations in activities  validation_result = await llm.generate_str(  f"Validate this data against external service: {data}"  )    return {"is_valid": "valid" in validation_result.lower(), "result": validation_result} 

Advanced Workflow Features

Signal and Query Handlers

Signals allow external systems to communicate with running workflows:
from temporalio import workflow from typing import Optional  @app.workflow class ApprovalWorkflow(Workflow[dict]):  def __init__(self):  self.approval_status: Optional[str] = None  self.approval_comments: Optional[str] = None    @workflow.signal  async def approve_signal(self, comments: str):  """Signal handler for approval."""  self.approval_status = "approved"  self.approval_comments = comments    @workflow.signal  async def reject_signal(self, reason: str):  """Signal handler for rejection."""  self.approval_status = "rejected"  self.approval_comments = reason    @workflow.query  def get_status(self) -> dict:  """Query handler to check current status."""  return {  "status": self.approval_status,  "comments": self.approval_comments  }    @app.workflow_run  async def run(self, document: dict) -> WorkflowResult[dict]:  # Process initial document  agent = Agent(name="processor", server_names=["filesystem"])  async with agent:  llm = await agent.attach_llm(OpenAIAugmentedLLM)  processed = await llm.generate_str(f"Process document: {document}")    # Wait for approval signal (can wait indefinitely)  await workflow.wait_condition(lambda: self.approval_status is not None)    if self.approval_status == "approved":  # Continue with approved workflow  async with agent:  finalized = await llm.generate_str(  f"Finalize approved document: {processed}. Comments: {self.approval_comments}"  )    return WorkflowResult(value={  "status": "completed",  "document": finalized,  "approval_comments": self.approval_comments  })  else:  # Handle rejection  return WorkflowResult(  value=None,  error=f"Document rejected: {self.approval_comments}"  )  # Send signals from external code async def send_approval():  async with app.run() as agent_app:  executor = agent_app.executor    # Send approval signal to running workflow  await executor.signal_workflow(  "ApprovalWorkflow",  "workflow-123",  "approve_signal",  "Document looks good after review!"  )    # Query workflow status  status = await executor.query_workflow(  "ApprovalWorkflow",  "workflow-123",  "get_status"  )  print(f"Workflow status: {status}") 

Workflow Versioning

Handle workflow updates without breaking running instances:
@app.workflow class VersionedWorkflow(Workflow[dict]):  @app.workflow_run  async def run(self, data: dict) -> WorkflowResult[dict]:  # Use versioning for backward compatibility  version = workflow.get_version("data_processing_logic", 1, 3)    if version == 1:  # Original processing logic  result = await self.process_v1(data)  elif version == 2:  # Enhanced processing with validation  validated = await self.validate_data(data)  result = await self.process_v2(validated)  else: # version == 3  # Latest version with advanced features  validated = await self.validate_data_v2(data)  enriched = await self.enrich_data(validated)  result = await self.process_v3(enriched)    # Common post-processing (no versioning needed)  final_result = await self.post_process(result)    return WorkflowResult(value=final_result)    async def process_v1(self, data: dict) -> dict:  """Original processing logic."""  agent = Agent(name="processor_v1", server_names=["filesystem"])  async with agent:  llm = await agent.attach_llm(OpenAIAugmentedLLM)  return await llm.generate_str(f"Process v1: {data}")    async def process_v2(self, data: dict) -> dict:  """Enhanced processing with validation."""  agent = Agent(name="processor_v2", server_names=["filesystem", "validation"])  async with agent:  llm = await agent.attach_llm(OpenAIAugmentedLLM)  return await llm.generate_str(f"Process v2 with validation: {data}")    async def process_v3(self, data: dict) -> dict:  """Latest version with advanced features."""  agent = Agent(name="processor_v3", server_names=["filesystem", "validation", "ml"])  async with agent:  llm = await agent.attach_llm(OpenAIAugmentedLLM)  return await llm.generate_str(f"Process v3 with ML enhancement: {data}") 

Workflow Timeouts and Cancellation

Configure comprehensive timeout policies:
from datetime import timedelta  @app.workflow class TimeoutWorkflow(Workflow[dict]):  @app.workflow_run  async def run(self, data: dict) -> WorkflowResult[dict]:  try:  # Set workflow-level timeout  async with workflow.timeout(timedelta(hours=2)):  # Step 1: Quick processing (30 seconds max)  result1 = await workflow.execute_activity(  self.quick_process,  data,  start_to_close_timeout=timedelta(seconds=30)  )    # Step 2: Medium processing (5 minutes max)  result2 = await workflow.execute_activity(  self.medium_process,  result1,  start_to_close_timeout=timedelta(minutes=5),  heartbeat_timeout=timedelta(seconds=30) # For long-running activities  )    # Step 3: Long processing (1 hour max)  result3 = await workflow.execute_activity(  self.long_process,  result2,  start_to_close_timeout=timedelta(hours=1),  schedule_to_close_timeout=timedelta(hours=1, minutes=30)  )    return WorkflowResult(value=result3)    except workflow.TimeoutError:  # Handle timeout gracefully  return WorkflowResult(  value=None,  error="Workflow timed out after 2 hours"  )  except workflow.CancelledError:  # Handle cancellation  return WorkflowResult(  value=None,  error="Workflow was cancelled"  )    async def long_process(self, data: dict) -> dict:  """Long-running activity with heartbeat."""  agent = Agent(name="long_processor", server_names=["ml", "database"])  async with agent:  llm = await agent.attach_llm(OpenAIAugmentedLLM)    # Send heartbeats for long operations  for i in range(60): # Simulate 1-hour process  # Send heartbeat every minute  workflow.heartbeat(f"Processing step {i+1}/60")    # Do some processing  partial_result = await llm.generate_str(  f"Process chunk {i}: {data}"  )    # Sleep for 1 minute (simulated)  await asyncio.sleep(60)    return {"processed": True, "data": data} 

Core Concepts

Workflow Definition

Temporal workflows are defined the same way as asyncio workflows:
from mcp_agent.app import MCPApp from mcp_agent.executor.workflow import Workflow, WorkflowResult from mcp_agent.agents.agent import Agent from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM  app = MCPApp(name="temporal_agent")  @app.workflow class DurableWorkflow(Workflow[dict]):  """A durable workflow that can survive failures."""    @app.workflow_run  async def run(self, request: dict) -> WorkflowResult[dict]:  # This workflow is durable - it will resume from  # where it left off if the worker crashes    agent = Agent(  name="analyst",  instruction="Analyze the provided data thoroughly.",  server_names=["fetch", "filesystem"]  )    async with agent:  llm = await agent.attach_llm(OpenAIAugmentedLLM)    # Each step is automatically checkpointed  step1 = await llm.generate_str(f"Analyze: {request['data']}")  step2 = await llm.generate_str(f"Summarize findings: {step1}")  step3 = await llm.generate_str(f"Generate report: {step2}")    return WorkflowResult(value={  "analysis": step1,  "summary": step2,  "report": step3  }) 

Signals for Human-in-the-Loop

Implement workflows that wait for human input:
from mcp_agent.executor.temporal import Signal  @app.workflow class ApprovalWorkflow(Workflow[str]):  @app.workflow_run  async def run(self, document: str) -> WorkflowResult[str]:  # Process document with AI  agent = Agent(  name="processor",  instruction="Process and improve the document.",  server_names=["filesystem"]  )    async with agent:  llm = await agent.attach_llm(OpenAIAugmentedLLM)  processed = await llm.generate_str(f"Improve this document: {document}")    # Wait for human approval  print(f"Waiting for approval. Workflow ID: {self.id}, Run ID: {self.run_id}")    await app.context.executor.signal_bus.wait_for_signal(  Signal(name="approve", workflow_id=self.id, run_id=self.run_id)  )    # Continue after approval  print("Approval received! Finalizing document...")    async with agent:  llm = await agent.attach_llm(OpenAIAugmentedLLM)  finalized = await llm.generate_str(f"Finalize approved document: {processed}")    return WorkflowResult(value=finalized) 
Send signals from external code:
# Send approval signal await app.context.executor.signal_bus.send_signal(  Signal(  name="approve",  workflow_id="ApprovalWorkflow",  run_id="run_abc123",  payload={"approved_by": "john.doe", "comments": "Looks good!"}  ) ) 

Long-Running Workflows

Handle workflows that run for extended periods:
import asyncio from datetime import timedelta  @app.workflow class MonitoringWorkflow(Workflow[dict]):  @app.workflow_run  async def run(self, config: dict) -> WorkflowResult[dict]:  monitoring_results = []    # Run for 30 days, checking every hour  for day in range(30):  for hour in range(24):  # Durable sleep - survives restarts  await asyncio.sleep(3600) # 1 hour    # Check system status  agent = Agent(  name="monitor",  instruction="Check system health and report issues.",  server_names=["fetch"]  )    async with agent:  llm = await agent.attach_llm(OpenAIAugmentedLLM)  status = await llm.generate_str(f"Check status of: {config['systems']}")    monitoring_results.append({  "day": day,  "hour": hour,  "status": status  })    # Alert if issues found  if "critical" in status.lower():  await self.send_alert(status)    return WorkflowResult(value={"monitoring_complete": monitoring_results}) 

Advanced Patterns

Parallel Agent Execution

Run multiple agents in parallel with Temporal:
import asyncio  @app.workflow class ParallelAnalysisWorkflow(Workflow[dict]):  @app.workflow_run  async def run(self, document: str) -> WorkflowResult[dict]:  # Define parallel tasks  async def analyze_sentiment():  agent = Agent(name="sentiment", instruction="Analyze sentiment.")  async with agent:  llm = await agent.attach_llm(OpenAIAugmentedLLM)  return await llm.generate_str(f"Analyze sentiment: {document}")    async def extract_entities():  agent = Agent(name="entities", instruction="Extract entities.")  async with agent:  llm = await agent.attach_llm(OpenAIAugmentedLLM)  return await llm.generate_str(f"Extract entities: {document}")    async def summarize():  agent = Agent(name="summarizer", instruction="Summarize content.")  async with agent:  llm = await agent.attach_llm(OpenAIAugmentedLLM)  return await llm.generate_str(f"Summarize: {document}")    # Execute in parallel - Temporal handles orchestration  sentiment, entities, summary = await asyncio.gather(  analyze_sentiment(),  extract_entities(),  summarize()  )    return WorkflowResult(value={  "sentiment": sentiment,  "entities": entities,  "summary": summary  }) 

Workflow Composition

Compose complex workflows from simpler ones:
@app.workflow class DataPipelineWorkflow(Workflow[dict]):  @app.workflow_run  async def run(self, source: str) -> WorkflowResult[dict]:  # Step 1: Data extraction workflow  extraction = DataExtractionWorkflow()  data = await extraction.run(source)    # Step 2: Data validation workflow  validation = DataValidationWorkflow()  validated = await validation.run(data.value)    # Step 3: Data processing workflow  processing = DataProcessingWorkflow()  processed = await processing.run(validated.value)    # Step 4: Report generation workflow  reporting = ReportGenerationWorkflow()  report = await reporting.run(processed.value)    return WorkflowResult(value={  "data": data.value,  "validation": validated.value,  "processed": processed.value,  "report": report.value  }) 

Error Handling with Compensations

Implement saga pattern for distributed transactions:
@app.workflow class OrderProcessingWorkflow(Workflow[dict]):  @app.workflow_run  async def run(self, order: dict) -> WorkflowResult[dict]:  compensations = []    try:  # Step 1: Reserve inventory  inventory_agent = Agent(name="inventory", server_names=["database"])  async with inventory_agent:  llm = await inventory_agent.attach_llm(OpenAIAugmentedLLM)  reservation = await llm.generate_str(f"Reserve items: {order['items']}")  compensations.append(("inventory", reservation))    # Step 2: Process payment  payment_agent = Agent(name="payment", server_names=["payment_api"])  async with payment_agent:  llm = await payment_agent.attach_llm(OpenAIAugmentedLLM)  payment = await llm.generate_str(f"Process payment: {order['total']}")  compensations.append(("payment", payment))    # Step 3: Ship order  shipping_agent = Agent(name="shipping", server_names=["shipping_api"])  async with shipping_agent:  llm = await shipping_agent.attach_llm(OpenAIAugmentedLLM)  shipment = await llm.generate_str(f"Ship to: {order['address']}")    return WorkflowResult(value={  "success": True,  "reservation": reservation,  "payment": payment,  "shipment": shipment  })    except Exception as e:  # Run compensations in reverse order  for service, data in reversed(compensations):  await self.compensate(service, data)    return WorkflowResult(  value=None,  error=f"Order failed: {e}. Compensations executed."  )    async def compensate(self, service: str, data: str):  """Execute compensation for failed step."""  agent = Agent(name=f"{service}_compensation")  async with agent:  llm = await agent.attach_llm(OpenAIAugmentedLLM)  await llm.generate_str(f"Compensate {service}: {data}") 

Production Deployment

Infrastructure Requirements

Minimum Production Setup:
  • Temporal Server cluster (3+ nodes for HA)
  • PostgreSQL/MySQL database with replication
  • Elasticsearch for visibility (optional but recommended)
  • Load balancer for Temporal frontend
  • Monitoring stack (Prometheus, Grafana)
Resource Planning:
# Example Kubernetes deployment apiVersion: apps/v1 kind: Deployment metadata:  name: temporal-server spec:  replicas: 3  selector:  matchLabels:  app: temporal-server  template:  spec:  containers:  - name: temporal  image: temporalio/auto-setup:latest  resources:  requests:  memory: "1Gi"  cpu: "500m"  limits:  memory: "2Gi"  cpu: "1000m"  env:  - name: DB  value: postgresql  - name: POSTGRES_SEEDS  value: postgres-primary:5432  - name: DYNAMIC_CONFIG_FILE_PATH  value: /etc/temporal/config/dynamicconfig.yaml 

High Availability Configuration

Configure Temporal for production resilience:
# temporal-server-config.yaml persistence:  defaultStore: default  visibilityStore: visibility  numHistoryShards: 512  datastores:  default:  sql:  pluginName: postgres  databaseName: temporal  connectAddr: postgres-cluster:5432  connectProtocol: tcp  maxConns: 20  maxIdleConns: 20  maxConnLifetime: 1h  visibility:  sql:  pluginName: postgres  databaseName: temporal_visibility  connectAddr: postgres-cluster:5432  global:  membership:  maxJoinDuration: 30s  broadcastAddress: 0.0.0.0  pprof:  port: 7936  services:  frontend:  rpc:  grpcPort: 7233  membershipPort: 6933  bindOnLocalHost: false  history:  rpc:  grpcPort: 7234  membershipPort: 6934  bindOnLocalHost: false  matching:  rpc:  grpcPort: 7235  membershipPort: 6935  bindOnLocalHost: false  worker:  rpc:  grpcPort: 7236  membershipPort: 6936  bindOnLocalHost: false  clusterMetadata:  enableGlobalNamespace: true  failoverVersionIncrement: 10  masterClusterName: primary  currentClusterName: primary  clusterInformation:  primary:  enabled: true  initialFailoverVersion: 0  rpcName: frontend  rpcAddress: 0.0.0.0:7233 

Temporal Cloud

For production, use Temporal Cloud:
mcp_agent.config.yaml
execution_engine: temporal  temporal:  host: your-namespace.tmprl.cloud  port: 7233  namespace: your-namespace  task_queue: mcp-agent-production  tls:  client_cert_path: /path/to/client.crt  client_key_path: /path/to/client.key  ca_cert_path: /path/to/ca.crt  server_name: your-namespace.tmprl.cloud  data_converter:  encryption_key: ${TEMPORAL_ENCRYPTION_KEY}  codec: aes256gcm  retry_policy:  initial_interval: 1  maximum_interval: 100  backoff_coefficient: 2  maximum_attempts: 50  auth:  api_key: ${TEMPORAL_API_KEY}  namespace: your-namespace 

Security Best Practices

Data Encryption:
from temporalio.client import Client from temporalio.converter import EncryptionConverter, CompositeConverter from cryptography.fernet import Fernet  # Generate encryption key (store securely) encryption_key = Fernet.generate_key()  # Create encrypted client client = await Client.connect(  "your-namespace.tmprl.cloud:7233",  namespace="your-namespace",  data_converter=CompositeConverter(  EncryptionConverter(  encryption_key,  compress=True # Enable compression  )  ),  tls=True ) 
Access Control:
# RBAC configuration for Temporal namespaces namespaces:  production:  retention: "30d"  archival:  history:  state: "enabled"  uri: "s3://temporal-history-archive"  visibility:  state: "enabled"  uri: "s3://temporal-visibility-archive"  authorization:  default_role: "worker"  roles:  admin:  permissions:  - "namespace:*"  - "workflow:*"  - "activity:*"  worker:  permissions:  - "workflow:execute"  - "activity:execute"  monitor:  permissions:  - "workflow:read"  - "namespace:read" 
Network Security:
# Network policies for Kubernetes apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata:  name: temporal-network-policy spec:  podSelector:  matchLabels:  app: temporal  policyTypes:  - Ingress  - Egress  ingress:  - from:  - podSelector:  matchLabels:  app: mcp-agent-worker  ports:  - protocol: TCP  port: 7233  egress:  - to:  - podSelector:  matchLabels:  app: postgres  ports:  - protocol: TCP  port: 5432 

Worker Scaling

Scale workers for production workloads:
# worker.py for production import asyncio from concurrent.futures import ThreadPoolExecutor from mcp_agent.executor.temporal import create_temporal_worker_for_app  async def main():  # Create worker with production settings  worker = await create_temporal_worker_for_app(  app,  task_queue="mcp-agent-production",  max_concurrent_activities=50,  max_concurrent_workflows=20,  max_cached_workflows=100,  activity_executor=ThreadPoolExecutor(max_workers=100),  )    # Run worker  await worker.run()  if __name__ == "__main__":  # Run multiple worker instances for scaling  asyncio.run(main()) 

Monitoring and Observability

Monitor workflows with Temporal UI and custom metrics:
# Add custom metrics from temporalio.runtime import Runtime, TelemetryConfig, PrometheusConfig  # Configure Prometheus metrics runtime = Runtime(  telemetry=TelemetryConfig(  metrics=PrometheusConfig(bind_address="0.0.0.0:9090")  ) )  # Track custom metrics in workflows @app.workflow class MetricWorkflow(Workflow[str]):  @app.workflow_run  async def run(self, input: str) -> WorkflowResult[str]:  start_time = time.time()    # Your workflow logic  result = await self.process(input)    # Record metrics  duration = time.time() - start_time  app.context.metrics.record("workflow_duration", duration, {  "workflow": "MetricWorkflow",  "status": "success"  })    return WorkflowResult(value=result) 

Debugging

Temporal Web UI

Access the Temporal Web UI at http://localhost:8233 to:
  • View all workflow executions
  • Inspect workflow history step-by-step
  • See pending activities and their retry attempts
  • Send signals and queries to running workflows
  • Download workflow history for offline debugging
  • Monitor worker health and task queues

Workflow Replay

Debug production issues by replaying workflow history:
from temporalio.worker import Replayer import json  async def debug_workflow():  # Download history from Temporal UI or API  with open("workflow_history.json") as f:  history = json.load(f)    # Create replayer with your workflow definitions  replayer = Replayer(workflows=[MyWorkflow])    # Replay workflow to debug  try:  await replayer.replay_workflow(history)  print("Replay successful - workflow logic is correct")  except Exception as e:  print(f"Replay failed - logic error: {e}") 

Testing with Time Skipping

Test long-running workflows efficiently:
import pytest from temporalio.testing import WorkflowEnvironment from temporalio.worker import Worker  @pytest.mark.asyncio async def test_long_running_workflow():  # Start test environment with time skipping  async with await WorkflowEnvironment.start_time_skipping() as env:  # Create worker  worker = Worker(  env.client,  task_queue="test-queue",  workflows=[MonitoringWorkflow],  )    async with worker:  # Start workflow  handle = await env.client.start_workflow(  MonitoringWorkflow.run,  {"systems": ["api", "database"]},  id="test-monitoring",  task_queue="test-queue",  )    # Time automatically advances during sleep  # 30 days completes instantly in tests  result = await handle.result()    assert len(result["monitoring_complete"]) == 720 # 30 days * 24 hours 

Migration Guide

From Asyncio to Temporal

Your workflow code remains largely the same. Here’s what changes:
execution_engine: asyncio logger:  transports: [console]  level: info 

Running Workflows

async with app.run():  workflow = MyWorkflow()  result = await workflow.run("input")  print(result.value) 

Best Practices

Workflows must be deterministic. Avoid:
  • Random number generation without seeds
  • Current time checks (use workflow.now())
  • Direct I/O operations (use activities)
  • Non-deterministic data structures
Configure timeouts for workflows and activities:
handle = await executor.start_workflow(  "MyWorkflow",  input_data,  execution_timeout=timedelta(hours=1),  run_timeout=timedelta(minutes=30),  task_timeout=timedelta(minutes=5), ) 
Set meaningful workflow IDs for idempotency:
workflow_id = f"process-document-{document_id}" handle = await executor.start_workflow(  "DocumentWorkflow",  document,  workflow_id=workflow_id,  id_reuse_policy="allow_duplicate_failed_only", ) 
Version your workflows for safe updates:
@app.workflow class VersionedWorkflow(Workflow[str]):  @app.workflow_run  async def run(self, input: str) -> WorkflowResult[str]:  version = workflow.get_version("processing_logic", 1, 2)    if version == 1:  # Old logic  result = await self.process_v1(input)  else:  # New logic  result = await self.process_v2(input)    return WorkflowResult(value=result) 

Common Patterns

Polling External Systems

@app.workflow class PollingWorkflow(Workflow[dict]):  @app.workflow_run  async def run(self, job_id: str) -> WorkflowResult[dict]:  max_attempts = 100    for attempt in range(max_attempts):  # Check job status  agent = Agent(name="checker", server_names=["api"])  async with agent:  llm = await agent.attach_llm(OpenAIAugmentedLLM)  status = await llm.generate_str(f"Check job status: {job_id}")    if "completed" in status:  return WorkflowResult(value={"status": "completed", "result": status})    if "failed" in status:  return WorkflowResult(value=None, error=f"Job failed: {status}")    # Wait before next poll (durable)  await asyncio.sleep(60) # 1 minute    return WorkflowResult(value=None, error="Job timed out") 

Scheduled Workflows

@app.workflow class ScheduledWorkflow(Workflow[None]):  @app.workflow_run  async def run(self, schedule: dict) -> WorkflowResult[None]:  """Run daily at specified time."""  while True:  # Wait until next scheduled time  next_run = self.calculate_next_run(schedule)  await workflow.sleep_until(next_run)    # Execute scheduled task  await self.execute_scheduled_task()    # Continue as new to prevent history growth  workflow.continue_as_new(schedule) 

Examples

Explore complete Temporal examples:

Next Steps