Skip to main content

Celery

This guide demonstrates how to Auto instrument tracing, metrics and logs using OpenTelemetry for Celery and export them to a collector using python OTEL sdk.

Note: This guide provides a concise overview based on the official OpenTelemetry documentation. For complete information, please consult the official OpenTelemetry documentation.

Setup​

opentelemetry-api defines the API interfaces for tracing, metrics, and logging and opentelemetry-sdk provides the implementation for these APIs. Install the following necessary packages or add it to requirements.txt and install it.

opentelemetry-api
opentelemetry-sdk
opentelemetry-exporter-otlp-proto-http
opentelemetry-instrumentation-celery

Traces​

Traces give us the big picture of what happens when a request is made to an application. Whether your application is a monolith with a single database or a sophisticated mesh of services, traces are essential to understanding the full β€œpath” a request takes in your application.

Auto Instrumentation of Traces​

from opentelemetry.instrumentation.celery import CeleryInstrumentor

from celery import Celery
from celery.signals import worker_process_init

@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
CeleryInstrumentor().instrument()

app = Celery("tasks", broker="amqp://localhost")

@app.task
def add(x, y):
return x + y

add.delay(42, 50)

Trace data will now be sent to the OTEL Collector.

Official Traces Documentation

Adding Custom Instrumentation​

from opentelemetry.propagate import inject, extract
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.trace import get_tracer
from celery import Celery
from celery.signals import worker_process_init
from opentelemetry.context import get_current

@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
CeleryInstrumentor().instrument()

app = Celery("tasks", broker="amqp://localhost")

@app.task
def add(x, y, carrier):
with tracer.start_as_current_span("add", context=ctx):
return x + y


def do_work():
carrier = {}
inject(carrier)
add.delay(1, 2, carrier)
tracer = get_tracer(__name__)

# Extract the context from the incoming carrier
if context:
ctx = extract(context)
else:
ctx = get_current()

do_work()

Metrics​

A metric is a measurement of a service captured at runtime. The moment of capturing a measurements is known as a metric event, which consists not only of the measurement itself, but also the time at which it was captured and associated metadata.

Auto Instrumentation of Metrics​

from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from celery import Celery
from celery.signals import worker_process_init
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.metrics import MeterProvider

@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
CeleryInstrumentor().instrument()

app = Celery("tasks", broker="amqp://localhost")

resource = Resource(attributes={SERVICE_NAME: "celery"})
metric_reader = PeriodicExportingMetricReader(
OTLPMetricExporter(endpoint="http://0.0.0.0:4318/v1/metrics"),
export_interval_millis=1000
)
metrics.set_meter_provider(
MeterProvider(resource=resource, metric_readers=[metric_reader])
)

@app.task
def add(x, y):
return x + y

add.delay(42, 50)

Metrics will now be exported to the OTEL Collector.

Official Metrics Documentation

Logs​

A log is a timestamped text record, either structured (recommended) or unstructured, with optional metadata.

Auto Instrumentation of Logs​

from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
from opentelemetry import _logs
import logging


from celery import Celery
from celery.signals import worker_process_init

@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
CeleryInstrumentor().instrument()

app = Celery("tasks", broker="amqp://localhost")


provider = LoggerProvider(resource=resource)
_logs.set_logger_provider(provider)

log_exporter = OTLPLogExporter(endpoint="http://localhost:4318/v1/logs")
provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))

otel_handler = LoggingHandler(level=logging.INFO)

root_logger = logging.getLogger()
root_logger.addHandler(otel_handler)

for name in [
"celery",
"celery.app.trace",
"celery.worker",
"kombu",
"amqp"
]:
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
logger.addHandler(otel_handler)

@app.task
def add(x, y):
return x + y

add.delay(42, 50)

Logs will now be exported to OTEL Collector.

Official Logs Documentation

Was this page helpful?