Celery
This guide demonstrates how to Auto instrument tracing, metrics and logs using OpenTelemetry for Celery and export them to a collector using python OTEL sdk.
Note: This guide provides a concise overview based on the official OpenTelemetry documentation. For complete information, please consult the official OpenTelemetry documentation.
Setup
opentelemetry-api defines the API interfaces for tracing, metrics, and logging
and opentelemetry-sdk provides the implementation for these APIs.
Install the following necessary packages or add it to
requirements.txt
and install it.
opentelemetry-api
opentelemetry-sdk
opentelemetry-exporter-otlp-proto-http
opentelemetry-instrumentation-celery
Traces
Traces give us the big picture of what happens when a request is made to an application. Whether your application is a monolith with a single database or a sophisticated mesh of services, traces are essential to understanding the full “path” a request takes in your application.
Auto Instrumentation of Traces
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from celery import Celery
from celery.signals import worker_process_init
@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
CeleryInstrumentor().instrument()
app = Celery("tasks", broker="amqp://localhost")
@app.task
def add(x, y):
return x + y
add.delay(42, 50)
Trace data will now be sent to the OTEL Collector.
Adding Custom Instrumentation
from opentelemetry.propagate import inject, extract
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.trace import get_tracer
from celery import Celery
from celery.signals import worker_process_init
from opentelemetry.context import get_current
@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
CeleryInstrumentor().instrument()
app = Celery("tasks", broker="amqp://localhost")
@app.task
def add(x, y, carrier):
with tracer.start_as_current_span("add", context=ctx):
return x + y
def do_work():
carrier = {}
inject(carrier)
add.delay(1, 2, carrier)
tracer = get_tracer(__name__)
# Extract the context from the incoming carrier
if context:
ctx = extract(context)
else:
ctx = get_current()
do_work()
Metrics
A metric is a measurement of a service captured at runtime. The moment of capturing a measurements is known as a metric event, which consists not only of the measurement itself, but also the time at which it was captured and associated metadata.
Auto Instrumentation of Metrics
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from celery import Celery
from celery.signals import worker_process_init
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.metrics import MeterProvider
@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
CeleryInstrumentor().instrument()
app = Celery("tasks", broker="amqp://localhost")
resource = Resource(attributes={SERVICE_NAME: "celery"})
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint="http://0.0.0.0:4318/v1/metrics"),
export_interval_millis=1000
)
metrics.set_meter_provider(
MeterProvider(resource=resource, metric_readers=[metric_reader])
)
@app.task
def add(x, y):
return x + y
add.delay(42, 50)
Metrics will now be exported to the OTEL Collector.
Official Metrics Documentation
Logs
A log is a timestamped text record, either structured (recommended) or unstructured, with optional metadata.
Auto Instrumentation of Logs
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
from opentelemetry import _logs
import logging
from celery import Celery
from celery.signals import worker_process_init
@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
CeleryInstrumentor().instrument()
app = Celery("tasks", broker="amqp://localhost")
provider = LoggerProvider(resource=resource)
_logs.set_logger_provider(provider)
log_exporter = OTLPLogExporter(endpoint="http://localhost:4318/v1/logs")
provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
otel_handler = LoggingHandler(level=logging.INFO)
root_logger = logging.getLogger()
root_logger.addHandler(otel_handler)
for name in [
"celery",
"celery.app.trace",
"celery.worker",
"kombu",
"amqp"
]:
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
logger.addHandler(otel_handler)
@app.task
def add(x, y):
return x + y
add.delay(42, 50)
Logs will now be exported to OTEL Collector.