DEV Community

Cover image for OpenTelemetry Celery Instrumentation Guide
Alexandr Bandurchin for Uptrace

Posted on • Originally published at uptrace.dev

OpenTelemetry Celery Instrumentation Guide

OpenTelemetry enables comprehensive monitoring of Celery applications by automatically collecting telemetry data including task execution times, worker performance, queue depths, and error rates. By integrating OpenTelemetry, you can capture distributed traces across your task pipeline and export this data to observability backends for analysis and visualization.

What is Celery?

Celery is a distributed task queue for Python that allows you to run asynchronous and scheduled tasks. It's built on message passing and can operate with multiple brokers like Redis, RabbitMQ, and Amazon SQS. Celery is commonly used for background processing, periodic tasks, and distributed computing.

Celery consists of several components: producers (clients that send tasks), brokers (message transport), workers (processes that execute tasks), and result backends (stores for task results). This architecture makes it ideal for scaling applications horizontally and handling time-consuming operations without blocking user requests.

What is OpenTelemetry?

OpenTelemetry is an open-source observability framework that aims to standardize and simplify the collection, processing, and export of telemetry data from applications and systems.

OpenTelemetry supports multiple programming languages and platforms, making it suitable for a wide range of applications and environments. For detailed Python instrumentation, see the OpenTelemetry Python guide.

OpenTelemetry enables developers to instrument their code and collect telemetry data, which can then be exported to various OpenTelemetry backends or observability platforms for analysis and visualization. Using the OpenTelemetry Collector, you can centralize telemetry data collection, perform data transformations, and route data to multiple observability backends simultaneously.

Installation

To instrument a Celery application with OpenTelemetry, install the required packages:

pip install opentelemetry-api opentelemetry-sdk opentelemetry-instrumentation-celery 
Enter fullscreen mode Exit fullscreen mode

Additional Instrumentation

Depending on your broker and result backend, you may also want to install additional instrumentation:

# For Redis broker/backend pip install opentelemetry-instrumentation-redis # For RabbitMQ (if using kombu) pip install opentelemetry-instrumentation-kombu # For database result backends pip install opentelemetry-instrumentation-psycopg2 # PostgreSQL pip install opentelemetry-instrumentation-sqlite3 # SQLite 
Enter fullscreen mode Exit fullscreen mode

Exporter Installation

To export telemetry data to observability backends, install an appropriate exporter:

# For OTLP (recommended) pip install opentelemetry-exporter-otlp # For console output (development/testing) pip install opentelemetry-exporter-otlp-proto-http 
Enter fullscreen mode Exit fullscreen mode

Basic Instrumentation

Automatic Instrumentation

The simplest way to instrument Celery is using the worker process initialization hook. This ensures proper initialization in Celery's prefork worker model:

from celery import Celery from celery.signals import worker_process_init from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ( BatchSpanProcessor, ConsoleSpanExporter, ) from opentelemetry.sdk.resources import SERVICE_NAME, Resource # Create Celery app first app = Celery('tasks', broker='redis://localhost:6379/0') @worker_process_init.connect(weak=False) def init_celery_tracing(*args, **kwargs): """Initialize OpenTelemetry in each worker process""" # Configure OpenTelemetry  resource = Resource(attributes={ SERVICE_NAME: "celery-worker" }) provider = TracerProvider(resource=resource) processor = BatchSpanProcessor(ConsoleSpanExporter()) provider.add_span_processor(processor) trace.set_tracer_provider(provider) # Instrument Celery  CeleryInstrumentor().instrument() @app.task def add(x, y): return x + y 
Enter fullscreen mode Exit fullscreen mode

Manual Instrumentation

For more control over tracing, you can manually instrument specific tasks:

from celery import Celery from celery.signals import worker_process_init from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource # Create Celery app first app = Celery('tasks', broker='redis://localhost:6379/0') @worker_process_init.connect(weak=False) def init_celery_tracing(*args, **kwargs): """Initialize OpenTelemetry in each worker process""" resource = Resource(attributes={SERVICE_NAME: "celery-worker"}) provider = TracerProvider(resource=resource) processor = BatchSpanProcessor(ConsoleSpanExporter()) provider.add_span_processor(processor) trace.set_tracer_provider(provider) # Instrument Celery  CeleryInstrumentor().instrument() # Get tracer after initialization tracer = trace.get_tracer(__name__) @app.task def process_data(data_id): with tracer.start_as_current_span("process_data") as span: # Add custom attributes  span.set_attribute("data.id", data_id) span.set_attribute("worker.name", "data_processor") # Your task logic here  result = expensive_computation(data_id) # Record result information  span.set_attribute("result.size", len(result)) span.set_attribute("task.status", "completed") return result 
Enter fullscreen mode Exit fullscreen mode

Worker Configuration

Worker Startup with Instrumentation

Create a worker startup script that initializes OpenTelemetry properly:

# worker.py import os from celery import Celery from celery.signals import worker_process_init from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource # Create Celery app app = Celery('tasks', broker='redis://localhost:6379/0') @worker_process_init.connect(weak=False) def initialize_tracing(*args, **kwargs): """Initialize OpenTelemetry tracing for Celery worker""" # Create resource with worker information  resource = Resource(attributes={ SERVICE_NAME: "celery-worker", "service.version": "1.0.0", "deployment.environment": os.environ.get("ENVIRONMENT", "development"), "worker.hostname": os.environ.get("HOSTNAME", "unknown"), }) # Configure tracer provider  provider = TracerProvider(resource=resource) # Configure OTLP exporter  otlp_exporter = OTLPSpanExporter( endpoint="http://localhost:4317", insecure=True, ) processor = BatchSpanProcessor(otlp_exporter) provider.add_span_processor(processor) trace.set_tracer_provider(provider) # Instrument Celery  CeleryInstrumentor().instrument() @app.task def example_task(data): # Your task logic here  return f"Processed: {data}" 
Enter fullscreen mode Exit fullscreen mode

Worker Execution

Start the worker with the instrumented configuration:

# Start worker with instrumentation celery -A worker worker --loglevel=info # For production with multiple workers celery -A worker worker --loglevel=info --concurrency=4 
Enter fullscreen mode Exit fullscreen mode

Producer (Client) Instrumentation

Instrument the client code that sends tasks to Celery:

# client.py from celery import Celery from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource # Initialize OpenTelemetry for producer resource = Resource(attributes={ SERVICE_NAME: "celery-producer" }) provider = TracerProvider(resource=resource) otlp_exporter = OTLPSpanExporter(endpoint="http://localhost:4317", insecure=True) processor = BatchSpanProcessor(otlp_exporter) provider.add_span_processor(processor) trace.set_tracer_provider(provider) # Instrument Celery CeleryInstrumentor().instrument() # Create Celery app app = Celery('tasks', broker='redis://localhost:6379/0') def send_task(): # This will be traced automatically  result = app.send_task('tasks.process_data', args=[123]) return result 
Enter fullscreen mode Exit fullscreen mode

Advanced Configuration

Custom Span Attributes

Add custom attributes to Celery spans for better observability:

from celery import Celery from celery.signals import worker_process_init from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource # Create Celery app app = Celery('tasks', broker='redis://localhost:6379/0') @worker_process_init.connect(weak=False) def init_celery_tracing(*args, **kwargs): """Initialize OpenTelemetry in each worker process""" resource = Resource(attributes={SERVICE_NAME: "celery-worker"}) provider = TracerProvider(resource=resource) processor = BatchSpanProcessor(ConsoleSpanExporter()) provider.add_span_processor(processor) trace.set_tracer_provider(provider) def custom_span_processor(span, task): """Custom function to add attributes to Celery spans""" # Add task-specific attributes  span.set_attribute("celery.task.queue", task.request.delivery_info.get('routing_key', 'default')) span.set_attribute("celery.task.retries", task.request.retries) span.set_attribute("celery.task.eta", str(task.request.eta) if task.request.eta else "immediate") # Add worker information  span.set_attribute("celery.worker.hostname", task.request.hostname) # Add custom business logic attributes  if hasattr(task.request, 'correlation_id'): span.set_attribute("business.correlation_id", task.request.correlation_id) # Configure instrumentation with custom processor  CeleryInstrumentor().instrument( span_name_callback=lambda task: f"celery.task.{task.name}", span_processor_callback=custom_span_processor ) 
Enter fullscreen mode Exit fullscreen mode

Error Handling and Exception Tracking

Enhance error tracking in Celery tasks:

from celery import Celery from celery.signals import worker_process_init from celery.exceptions import Retry from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry import trace from opentelemetry.trace import Status, StatusCode from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource # Create Celery app app = Celery('tasks', broker='redis://localhost:6379/0') @worker_process_init.connect(weak=False) def init_celery_tracing(*args, **kwargs): """Initialize OpenTelemetry in each worker process""" resource = Resource(attributes={SERVICE_NAME: "celery-worker"}) provider = TracerProvider(resource=resource) processor = BatchSpanProcessor(ConsoleSpanExporter()) provider.add_span_processor(processor) trace.set_tracer_provider(provider) CeleryInstrumentor().instrument() @app.task(bind=True) def risky_task(self, data): span = trace.get_current_span() try: # Your task logic here  result = process_risky_operation(data) # Set success attributes  span.set_attribute("task.result.success", True) span.set_attribute("task.result.items_processed", len(result)) return result except RetryableError as e: # Handle retryable errors  span.record_exception(e) span.set_attribute("task.retry.attempt", self.request.retries + 1) span.set_attribute("task.retry.reason", str(e)) # Retry with exponential backoff  raise self.retry(exc=e, countdown=60, max_retries=3) except Exception as e: # Handle permanent failures  span.record_exception(e) span.set_status(Status(StatusCode.ERROR, str(e))) span.set_attribute("task.error.type", type(e).__name__) span.set_attribute("task.error.fatal", True) raise 
Enter fullscreen mode Exit fullscreen mode

Task Result Tracking

Track task results and completion status:

from celery.signals import task_success, task_failure, task_retry from opentelemetry import trace @task_success.connect def task_success_handler(sender=None, result=None, **kwargs): """Handle successful task completion""" span = trace.get_current_span() span.set_attribute("celery.task.status", "success") span.set_attribute("celery.task.result.type", type(result).__name__) @task_failure.connect def task_failure_handler(sender=None, task_id=None, exception=None, traceback=None, einfo=None, **kwargs): """Handle task failures""" span = trace.get_current_span() span.set_attribute("celery.task.status", "failure") span.set_attribute("celery.task.exception", str(exception)) span.record_exception(exception) @task_retry.connect def task_retry_handler(sender=None, task_id=None, reason=None, einfo=None, **kwargs): """Handle task retries""" span = trace.get_current_span() span.set_attribute("celery.task.status", "retry") span.set_attribute("celery.task.retry.reason", str(reason)) 
Enter fullscreen mode Exit fullscreen mode

Broker-Specific Configuration

Redis Configuration

For Redis broker, add Redis instrumentation:

from celery import Celery from celery.signals import worker_process_init from opentelemetry.instrumentation.redis import RedisInstrumentor from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource # Create Celery app with Redis configuration app = Celery('tasks') app.conf.update( broker_url='redis://localhost:6379/0', result_backend='redis://localhost:6379/0', task_serializer='json', accept_content=['json'], result_serializer='json', timezone='UTC', enable_utc=True, ) @worker_process_init.connect(weak=False) def init_celery_tracing(*args, **kwargs): """Initialize OpenTelemetry in each worker process""" resource = Resource(attributes={SERVICE_NAME: "celery-worker"}) provider = TracerProvider(resource=resource) processor = BatchSpanProcessor(ConsoleSpanExporter()) provider.add_span_processor(processor) trace.set_tracer_provider(provider) # Instrument Redis  RedisInstrumentor().instrument() # Instrument Celery  CeleryInstrumentor().instrument() 
Enter fullscreen mode Exit fullscreen mode

RabbitMQ Configuration

For RabbitMQ broker, configure with appropriate instrumentation:

from celery import Celery from celery.signals import worker_process_init from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource # Create Celery app with RabbitMQ configuration app = Celery('tasks') app.conf.update( broker_url='pyamqp://guest@localhost//', result_backend='rpc://', task_serializer='json', accept_content=['json'], result_serializer='json', ) @worker_process_init.connect(weak=False) def init_celery_tracing(*args, **kwargs): """Initialize OpenTelemetry in each worker process""" resource = Resource(attributes={SERVICE_NAME: "celery-worker"}) provider = TracerProvider(resource=resource) processor = BatchSpanProcessor(ConsoleSpanExporter()) provider.add_span_processor(processor) trace.set_tracer_provider(provider) # Instrument Celery (includes kombu instrumentation)  CeleryInstrumentor().instrument() 
Enter fullscreen mode Exit fullscreen mode

Monitoring and Metrics

Custom Metrics Collection

Collect custom metrics alongside traces:

from opentelemetry import metrics from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter # Configure metrics metric_reader = PeriodicExportingMetricReader( OTLPMetricExporter(endpoint="http://localhost:4317", insecure=True), export_interval_millis=30000, ) metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader])) # Create meter and instruments meter = metrics.get_meter("celery_metrics") task_duration_histogram = meter.create_histogram( "celery.task.duration", description="Duration of Celery task execution", unit="ms" ) task_counter = meter.create_counter( "celery.tasks.total", description="Total number of Celery tasks" ) @app.task def monitored_task(data): start_time = time.time() try: # Your task logic  result = process_data(data) # Record metrics  duration = (time.time() - start_time) * 1000 task_duration_histogram.record(duration, {"task_name": "monitored_task", "status": "success"}) task_counter.add(1, {"task_name": "monitored_task", "status": "success"}) return result except Exception as e: # Record failure metrics  duration = (time.time() - start_time) * 1000 task_duration_histogram.record(duration, {"task_name": "monitored_task", "status": "error"}) task_counter.add(1, {"task_name": "monitored_task", "status": "error"}) raise 
Enter fullscreen mode Exit fullscreen mode

Production Deployment

Environment Configuration

Use environment variables for production configuration:

import os from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter def configure_tracing(): """Configure OpenTelemetry based on environment variables""" # OTLP endpoint configuration  otlp_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317") # Service configuration  service_name = os.environ.get("OTEL_SERVICE_NAME", "celery-worker") service_version = os.environ.get("OTEL_SERVICE_VERSION", "1.0.0") environment = os.environ.get("DEPLOYMENT_ENVIRONMENT", "production") # Resource attributes  resource = Resource(attributes={ SERVICE_NAME: service_name, "service.version": service_version, "deployment.environment": environment, }) # Configure exporter  exporter = OTLPSpanExporter(endpoint=otlp_endpoint) processor = BatchSpanProcessor(exporter) provider = TracerProvider(resource=resource) provider.add_span_processor(processor) trace.set_tracer_provider(provider) 
Enter fullscreen mode Exit fullscreen mode

Docker Configuration

Example Dockerfile for containerized Celery workers:

FROM python:3.11-slim WORKDIR /app COPY requirements.txt . RUN pip install -r requirements.txt COPY . . # Set OpenTelemetry environment variables ENV OTEL_SERVICE_NAME=celery-worker ENV OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger:4317 # Start worker with instrumentation CMD ["celery", "-A", "worker", "worker", "--loglevel=info"] 
Enter fullscreen mode Exit fullscreen mode

What is Uptrace?

Uptrace is a OpenTelemetry APM that supports distributed tracing, metrics, and logs. You can use it to monitor applications and troubleshoot issues. Compare with other top APM tools for task queue monitoring.

Uptrace Overview

Uptrace comes with an intuitive query builder, rich dashboards, alerting rules with notifications, and integrations for most languages and frameworks.

Uptrace can process billions of spans and metrics on a single server and allows you to monitor your applications at 10x lower cost.

In just a few minutes, you can try Uptrace by visiting the cloud demo (no login required) or running it locally with Docker. The source code is available on GitHub.

uptrace-python with Celery

For simplified configuration, you can use the uptrace-python wrapper:

Installation

pip install uptrace 
Enter fullscreen mode Exit fullscreen mode

Configuration

import uptrace from opentelemetry.instrumentation.celery import CeleryInstrumentor # Configure OpenTelemetry with Uptrace uptrace.configure_opentelemetry( # Set DSN or use UPTRACE_DSN environment variable  dsn="<your-uptrace-dsn>", service_name="celery-worker", service_version="1.0.0", deployment_environment="production", ) # Instrument Celery CeleryInstrumentor().instrument() # Your Celery app from celery import Celery app = Celery('tasks', broker='redis://localhost:6379/0') 
Enter fullscreen mode Exit fullscreen mode

Environment Variables

# Uptrace configuration export UPTRACE_DSN="https://<token>@uptrace.dev/<project_id>" # OpenTelemetry configuration export OTEL_SERVICE_NAME="celery-worker" export OTEL_SERVICE_VERSION="1.0.0" # Start worker celery -A tasks worker --loglevel=info 
Enter fullscreen mode Exit fullscreen mode

Troubleshooting

Common Issues

  1. Double instrumentation: Ensure you only call CeleryInstrumentor().instrument() once per worker process
  2. Missing broker traces: Install appropriate broker instrumentation (Redis/RabbitMQ)
  3. Worker startup issues: Initialize OpenTelemetry using worker_process_init hook, not at module level
  4. Span not appearing: Check that exporters are configured correctly and tracer provider is set
  5. High overhead: Adjust sampling rates and batch processor settings
  6. Fork-safety issues: Use worker process initialization hook to avoid sharing connections between processes

Debug Configuration

Enable debug logging to troubleshoot instrumentation:

import logging # Enable OpenTelemetry debug logging logging.getLogger("opentelemetry").setLevel(logging.DEBUG) # Enable Celery debug logging logging.getLogger("celery").setLevel(logging.DEBUG) # Configure root logger logging.basicConfig(level=logging.DEBUG) 
Enter fullscreen mode Exit fullscreen mode

Performance Optimization

Configure appropriate settings for production:

from opentelemetry.sdk.trace.export import BatchSpanProcessor from opentelemetry.sdk.trace.sampling import TraceIdRatioBased # Configure sampling (sample 10% of traces) sampler = TraceIdRatioBased(0.1) provider = TracerProvider(sampler=sampler) # Optimize batch processor processor = BatchSpanProcessor( exporter, max_queue_size=2048, schedule_delay_millis=5000, max_export_batch_size=512, ) 
Enter fullscreen mode Exit fullscreen mode

What's next?

By integrating OpenTelemetry with Celery, you gain valuable insights into your distributed task processing pipeline. You can monitor task performance, track queue depths, identify bottlenecks, and troubleshoot issues across your asynchronous workflow.

The telemetry data collected helps you:

  • Monitor task execution times and success rates
  • Track worker performance and resource utilization
  • Identify bottlenecks in task processing pipelines
  • Debug failed tasks and retry patterns
  • Optimize queue management and worker scaling
  • Understand task dependencies and data flow
  • Improve overall system reliability and performance

Top comments (0)