OpenTelemetry enables comprehensive monitoring of Celery applications by automatically collecting telemetry data including task execution times, worker performance, queue depths, and error rates. By integrating OpenTelemetry, you can capture distributed traces across your task pipeline and export this data to observability backends for analysis and visualization.
What is Celery?
Celery is a distributed task queue for Python that allows you to run asynchronous and scheduled tasks. It's built on message passing and can operate with multiple brokers like Redis, RabbitMQ, and Amazon SQS. Celery is commonly used for background processing, periodic tasks, and distributed computing.
Celery consists of several components: producers (clients that send tasks), brokers (message transport), workers (processes that execute tasks), and result backends (stores for task results). This architecture makes it ideal for scaling applications horizontally and handling time-consuming operations without blocking user requests.
What is OpenTelemetry?
OpenTelemetry is an open-source observability framework that aims to standardize and simplify the collection, processing, and export of telemetry data from applications and systems.
OpenTelemetry supports multiple programming languages and platforms, making it suitable for a wide range of applications and environments. For detailed Python instrumentation, see the OpenTelemetry Python guide.
OpenTelemetry enables developers to instrument their code and collect telemetry data, which can then be exported to various OpenTelemetry backends or observability platforms for analysis and visualization. Using the OpenTelemetry Collector, you can centralize telemetry data collection, perform data transformations, and route data to multiple observability backends simultaneously.
Installation
To instrument a Celery application with OpenTelemetry, install the required packages:
pip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation-celery Additional Instrumentation
Depending on your broker and result backend, you may also want to install additional instrumentation:
# For Redis broker/backend pip install opentelemetry-instrumentation-redis # For RabbitMQ (if using kombu) pip install opentelemetry-instrumentation-kombu # For database result backends pip install opentelemetry-instrumentation-psycopg2 # PostgreSQL pip install opentelemetry-instrumentation-sqlite3 # SQLite Exporter Installation
To export telemetry data to observability backends, install an appropriate exporter:
# For OTLP (recommended) pip install opentelemetry-exporter-otlp # For console output (development/testing) pip install opentelemetry-exporter-otlp-proto-http Basic Instrumentation
Automatic Instrumentation
The simplest way to instrument Celery is using the worker process initialization hook. This ensures proper initialization in Celery's prefork worker model:
from celery import Celery from celery.signals import worker_process_init from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ( BatchSpanProcessor, ConsoleSpanExporter, ) from opentelemetry.sdk.resources import SERVICE_NAME, Resource # Create Celery app first app = Celery('tasks', broker='redis://localhost:6379/0') @worker_process_init.connect(weak=False) def init_celery_tracing(*args, **kwargs): """Initialize OpenTelemetry in each worker process""" # Configure OpenTelemetry resource = Resource(attributes={ SERVICE_NAME: "celery-worker" }) provider = TracerProvider(resource=resource) processor = BatchSpanProcessor(ConsoleSpanExporter()) provider.add_span_processor(processor) trace.set_tracer_provider(provider) # Instrument Celery CeleryInstrumentor().instrument() @app.task def add(x, y): return x + y Manual Instrumentation
For more control over tracing, you can manually instrument specific tasks:
from celery import Celery from celery.signals import worker_process_init from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource # Create Celery app first app = Celery('tasks', broker='redis://localhost:6379/0') @worker_process_init.connect(weak=False) def init_celery_tracing(*args, **kwargs): """Initialize OpenTelemetry in each worker process""" resource = Resource(attributes={SERVICE_NAME: "celery-worker"}) provider = TracerProvider(resource=resource) processor = BatchSpanProcessor(ConsoleSpanExporter()) provider.add_span_processor(processor) trace.set_tracer_provider(provider) # Instrument Celery CeleryInstrumentor().instrument() # Get tracer after initialization tracer = trace.get_tracer(__name__) @app.task def process_data(data_id): with tracer.start_as_current_span("process_data") as span: # Add custom attributes span.set_attribute("data.id", data_id) span.set_attribute("worker.name", "data_processor") # Your task logic here result = expensive_computation(data_id) # Record result information span.set_attribute("result.size", len(result)) span.set_attribute("task.status", "completed") return result Worker Configuration
Worker Startup with Instrumentation
Create a worker startup script that initializes OpenTelemetry properly:
# worker.py import os from celery import Celery from celery.signals import worker_process_init from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource # Create Celery app app = Celery('tasks', broker='redis://localhost:6379/0') @worker_process_init.connect(weak=False) def initialize_tracing(*args, **kwargs): """Initialize OpenTelemetry tracing for Celery worker""" # Create resource with worker information resource = Resource(attributes={ SERVICE_NAME: "celery-worker", "service.version": "1.0.0", "deployment.environment": os.environ.get("ENVIRONMENT", "development"), "worker.hostname": os.environ.get("HOSTNAME", "unknown"), }) # Configure tracer provider provider = TracerProvider(resource=resource) # Configure OTLP exporter otlp_exporter = OTLPSpanExporter( endpoint="http://localhost:4317", insecure=True, ) processor = BatchSpanProcessor(otlp_exporter) provider.add_span_processor(processor) trace.set_tracer_provider(provider) # Instrument Celery CeleryInstrumentor().instrument() @app.task def example_task(data): # Your task logic here return f"Processed: {data}" Worker Execution
Start the worker with the instrumented configuration:
# Start worker with instrumentation celery -A worker worker --loglevel=info # For production with multiple workers celery -A worker worker --loglevel=info --concurrency=4 Producer (Client) Instrumentation
Instrument the client code that sends tasks to Celery:
# client.py from celery import Celery from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource # Initialize OpenTelemetry for producer resource = Resource(attributes={ SERVICE_NAME: "celery-producer" }) provider = TracerProvider(resource=resource) otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True) processor = BatchSpanProcessor(otlp_exporter) provider.add_span_processor(processor) trace.set_tracer_provider(provider) # Instrument Celery CeleryInstrumentor().instrument() # Create Celery app app = Celery('tasks', broker='redis://localhost:6379/0') def send_task(): # This will be traced automatically result = app.send_task('tasks.process_data', args=[123]) return result Advanced Configuration
Custom Span Attributes
Add custom attributes to Celery spans for better observability:
from celery import Celery from celery.signals import worker_process_init from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource # Create Celery app app = Celery('tasks', broker='redis://localhost:6379/0') @worker_process_init.connect(weak=False) def init_celery_tracing(*args, **kwargs): """Initialize OpenTelemetry in each worker process""" resource = Resource(attributes={SERVICE_NAME: "celery-worker"}) provider = TracerProvider(resource=resource) processor = BatchSpanProcessor(ConsoleSpanExporter()) provider.add_span_processor(processor) trace.set_tracer_provider(provider) def custom_span_processor(span, task): """Custom function to add attributes to Celery spans""" # Add task-specific attributes span.set_attribute("celery.task.queue", task.request.delivery_info.get('routing_key', 'default')) span.set_attribute("celery.task.retries", task.request.retries) span.set_attribute("celery.task.eta", str(task.request.eta) if task.request.eta else "immediate") # Add worker information span.set_attribute("celery.worker.hostname", task.request.hostname) # Add custom business logic attributes if hasattr(task.request, 'correlation_id'): span.set_attribute("business.correlation_id", task.request.correlation_id) # Configure instrumentation with custom processor CeleryInstrumentor().instrument( span_name_callback=lambda task: f"celery.task.{task.name}", span_processor_callback=custom_span_processor ) Error Handling and Exception Tracking
Enhance error tracking in Celery tasks:
from celery import Celery from celery.signals import worker_process_init from celery.exceptions import Retry from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry import trace from opentelemetry.trace import Status, StatusCode from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource # Create Celery app app = Celery('tasks', broker='redis://localhost:6379/0') @worker_process_init.connect(weak=False) def init_celery_tracing(*args, **kwargs): """Initialize OpenTelemetry in each worker process""" resource = Resource(attributes={SERVICE_NAME: "celery-worker"}) provider = TracerProvider(resource=resource) processor = BatchSpanProcessor(ConsoleSpanExporter()) provider.add_span_processor(processor) trace.set_tracer_provider(provider) CeleryInstrumentor().instrument() @app.task(bind=True) def risky_task(self, data): span = trace.get_current_span() try: # Your task logic here result = process_risky_operation(data) # Set success attributes span.set_attribute("task.result.success", True) span.set_attribute("task.result.items_processed", len(result)) return result except RetryableError as e: # Handle retryable errors span.record_exception(e) span.set_attribute("task.retry.attempt", self.request.retries + 1) span.set_attribute("task.retry.reason", str(e)) # Retry with exponential backoff raise self.retry(exc=e, countdown=60, max_retries=3) except Exception as e: # Handle permanent failures span.record_exception(e) span.set_status(Status(StatusCode.ERROR, str(e))) span.set_attribute("task.error.type", type(e).__name__) span.set_attribute("task.error.fatal", True) raise Task Result Tracking
Track task results and completion status:
from celery.signals import task_success, task_failure, task_retry from opentelemetry import trace @task_success.connect def task_success_handler(sender=None, result=None, **kwargs): """Handle successful task completion""" span = trace.get_current_span() span.set_attribute("celery.task.status", "success") span.set_attribute("celery.task.result.type", type(result).__name__) @task_failure.connect def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, einfo=None, **kwargs): """Handle task failures""" span = trace.get_current_span() span.set_attribute("celery.task.status", "failure") span.set_attribute("celery.task.exception", str(exception)) span.record_exception(exception) @task_retry.connect def task_retry_handler(sender=None, task_id=None, reason=None, einfo=None, **kwargs): """Handle task retries""" span = trace.get_current_span() span.set_attribute("celery.task.status", "retry") span.set_attribute("celery.task.retry.reason", str(reason)) Broker-Specific Configuration
Redis Configuration
For Redis broker, add Redis instrumentation:
from celery import Celery from celery.signals import worker_process_init from opentelemetry.instrumentation.redis import RedisInstrumentor from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource # Create Celery app with Redis configuration app = Celery('tasks') app.conf.update( broker_url='redis://localhost:6379/0', result_backend='redis://localhost:6379/0', task_serializer='json', accept_content=['json'], result_serializer='json', timezone='UTC', enable_utc=True, ) @worker_process_init.connect(weak=False) def init_celery_tracing(*args, **kwargs): """Initialize OpenTelemetry in each worker process""" resource = Resource(attributes={SERVICE_NAME: "celery-worker"}) provider = TracerProvider(resource=resource) processor = BatchSpanProcessor(ConsoleSpanExporter()) provider.add_span_processor(processor) trace.set_tracer_provider(provider) # Instrument Redis RedisInstrumentor().instrument() # Instrument Celery CeleryInstrumentor().instrument() RabbitMQ Configuration
For RabbitMQ broker, configure with appropriate instrumentation:
from celery import Celery from celery.signals import worker_process_init from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource # Create Celery app with RabbitMQ configuration app = Celery('tasks') app.conf.update( broker_url='pyamqp://guest@localhost//', result_backend='rpc://', task_serializer='json', accept_content=['json'], result_serializer='json', ) @worker_process_init.connect(weak=False) def init_celery_tracing(*args, **kwargs): """Initialize OpenTelemetry in each worker process""" resource = Resource(attributes={SERVICE_NAME: "celery-worker"}) provider = TracerProvider(resource=resource) processor = BatchSpanProcessor(ConsoleSpanExporter()) provider.add_span_processor(processor) trace.set_tracer_provider(provider) # Instrument Celery (includes kombu instrumentation) CeleryInstrumentor().instrument() Monitoring and Metrics
Custom Metrics Collection
Collect custom metrics alongside traces:
from opentelemetry import metrics from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter # Configure metrics metric_reader = PeriodicExportingMetricReader( OTLPMetricExporter(endpoint="http://localhost:4317", insecure=True), export_interval_millis=30000, ) metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader])) # Create meter and instruments meter = metrics.get_meter("celery_metrics") task_duration_histogram = meter.create_histogram( "celery.task.duration", description="Duration of Celery task execution", unit="ms" ) task_counter = meter.create_counter( "celery.tasks.total", description="Total number of Celery tasks" ) @app.task def monitored_task(data): start_time = time.time() try: # Your task logic result = process_data(data) # Record metrics duration = (time.time() - start_time) * 1000 task_duration_histogram.record(duration, {"task_name": "monitored_task", "status": "success"}) task_counter.add(1, {"task_name": "monitored_task", "status": "success"}) return result except Exception as e: # Record failure metrics duration = (time.time() - start_time) * 1000 task_duration_histogram.record(duration, {"task_name": "monitored_task", "status": "error"}) task_counter.add(1, {"task_name": "monitored_task", "status": "error"}) raise Production Deployment
Environment Configuration
Use environment variables for production configuration:
import os from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter def configure_tracing(): """Configure OpenTelemetry based on environment variables""" # OTLP endpoint configuration otlp_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") # Service configuration service_name = os.environ.get("OTEL_SERVICE_NAME", "celery-worker") service_version = os.environ.get("OTEL_SERVICE_VERSION", "1.0.0") environment = os.environ.get("DEPLOYMENT_ENVIRONMENT", "production") # Resource attributes resource = Resource(attributes={ SERVICE_NAME: service_name, "service.version": service_version, "deployment.environment": environment, }) # Configure exporter exporter = OTLPSpanExporter(endpoint=otlp_endpoint) processor = BatchSpanProcessor(exporter) provider = TracerProvider(resource=resource) provider.add_span_processor(processor) trace.set_tracer_provider(provider) Docker Configuration
Example Dockerfile for containerized Celery workers:
FROM python:3.11-slim WORKDIR /app COPY requirements.txt . RUN pip install -r requirements.txt COPY . . # Set OpenTelemetry environment variables ENV OTEL_SERVICE_NAME=celery-worker ENV OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4317 # Start worker with instrumentation CMD ["celery", "-A", "worker", "worker", "--loglevel=info"] What is Uptrace?
Uptrace is a OpenTelemetry APM that supports distributed tracing, metrics, and logs. You can use it to monitor applications and troubleshoot issues. Compare with other top APM tools for task queue monitoring.
Uptrace comes with an intuitive query builder, rich dashboards, alerting rules with notifications, and integrations for most languages and frameworks.
Uptrace can process billions of spans and metrics on a single server and allows you to monitor your applications at 10x lower cost.
In just a few minutes, you can try Uptrace by visiting the cloud demo (no login required) or running it locally with Docker. The source code is available on GitHub.
uptrace-python with Celery
For simplified configuration, you can use the uptrace-python wrapper:
Installation
pip install uptrace Configuration
import uptrace from opentelemetry.instrumentation.celery import CeleryInstrumentor # Configure OpenTelemetry with Uptrace uptrace.configure_opentelemetry( # Set DSN or use UPTRACE_DSN environment variable dsn="<your-uptrace-dsn>", service_name="celery-worker", service_version="1.0.0", deployment_environment="production", ) # Instrument Celery CeleryInstrumentor().instrument() # Your Celery app from celery import Celery app = Celery('tasks', broker='redis://localhost:6379/0') Environment Variables
# Uptrace configuration export UPTRACE_DSN="https://<token>@uptrace.dev/<project_id>" # OpenTelemetry configuration export OTEL_SERVICE_NAME="celery-worker" export OTEL_SERVICE_VERSION="1.0.0" # Start worker celery -A tasks worker --loglevel=info Troubleshooting
Common Issues
- Double instrumentation: Ensure you only call
CeleryInstrumentor().instrument()once per worker process - Missing broker traces: Install appropriate broker instrumentation (Redis/RabbitMQ)
- Worker startup issues: Initialize OpenTelemetry using
worker_process_inithook, not at module level - Span not appearing: Check that exporters are configured correctly and tracer provider is set
- High overhead: Adjust sampling rates and batch processor settings
- Fork-safety issues: Use worker process initialization hook to avoid sharing connections between processes
Debug Configuration
Enable debug logging to troubleshoot instrumentation:
import logging # Enable OpenTelemetry debug logging logging.getLogger("opentelemetry").setLevel(logging.DEBUG) # Enable Celery debug logging logging.getLogger("celery").setLevel(logging.DEBUG) # Configure root logger logging.basicConfig(level=logging.DEBUG) Performance Optimization
Configure appropriate settings for production:
from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.trace.sampling import TraceIdRatioBased # Configure sampling (sample 10% of traces) sampler = TraceIdRatioBased(0.1) provider = TracerProvider(sampler=sampler) # Optimize batch processor processor = BatchSpanProcessor( exporter, max_queue_size=2048, schedule_delay_millis=5000, max_export_batch_size=512, ) What's next?
By integrating OpenTelemetry with Celery, you gain valuable insights into your distributed task processing pipeline. You can monitor task performance, track queue depths, identify bottlenecks, and troubleshoot issues across your asynchronous workflow.
The telemetry data collected helps you:
- Monitor task execution times and success rates
- Track worker performance and resource utilization
- Identify bottlenecks in task processing pipelines
- Debug failed tasks and retry patterns
- Optimize queue management and worker scaling
- Understand task dependencies and data flow
- Improve overall system reliability and performance

Top comments (0)