Skip to content
95 changes: 72 additions & 23 deletions tests/unit/test_logging.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
# SPDX-License-Identifier: Apache-2.0

import json
import logging
import logging.config
import threading
import uuid

from unittest import mock
Expand All @@ -15,28 +13,69 @@
from warehouse import logging as wlogging


class TestStructlogFormatter:
def test_warehouse_logger_no_renderer(self):
formatter = wlogging.StructlogFormatter()
record = logging.LogRecord(
"warehouse.request", logging.INFO, None, None, "the message", None, None
class TestGunicornAccessLogParsing:
def test_parse_gunicorn_access_log_success(self):
access_log_line = (
"192.168.1.1 - - "
'[11/Aug/2025:21:01:13 +0000] "GET /pypi/b5ee/json HTTP/1.1" 404 24 '
'"-" "dependabot-core/0.325.1 excon/1.2.5 ruby/3.4.5 (x86_64-linux)"'
)

assert formatter.format(record) == "the message"

def test_non_warehouse_logger_renders(self):
formatter = wlogging.StructlogFormatter()
record = logging.LogRecord(
"another.logger", logging.INFO, None, None, "the message", None, None
event_dict = {"logger": "gunicorn.access", "event": access_log_line}

result = wlogging._parse_gunicorn_access_log(None, None, event_dict)

assert result["event"] == "http_request"
assert result["remote_addr"] == "192.168.1.1"
assert result["user"] is None
assert result["timestamp"] == "11/Aug/2025:21:01:13 +0000"
assert result["request"] == "GET /pypi/b5ee/json HTTP/1.1"
assert result["method"] == "GET"
assert result["path"] == "/pypi/b5ee/json"
assert result["protocol"] == "HTTP/1.1"
assert result["status"] == 404
assert result["size"] == 24
assert result["referrer"] is None
assert "dependabot-core" in result["user_agent"]

def test_parse_gunicorn_access_log_with_referrer(self):
access_log_line = (
"192.168.1.1 - - "
'[12/Aug/2025:10:30:45 +0000] "POST /simple/upload HTTP/1.1" 200 500 '
'"https://pypi.org/project/test/" "Mozilla/5.0 (compatible; test)"'
)

assert json.loads(formatter.format(record)) == {
"logger": "another.logger",
"level": "INFO",
"event": "the message",
"thread": threading.get_ident(),
event_dict = {"logger": "gunicorn.access", "event": access_log_line}

result = wlogging._parse_gunicorn_access_log(None, None, event_dict)

assert result["remote_addr"] == "192.168.1.1"
assert result["method"] == "POST"
assert result["path"] == "/simple/upload"
assert result["status"] == 200
assert result["size"] == 500
assert result["referrer"] == "https://pypi.org/project/test/"
assert result["user_agent"] == "Mozilla/5.0 (compatible; test)"

def test_parse_gunicorn_access_log_unparsable(self):
event_dict = {
"logger": "gunicorn.access",
"event": "this is not a valid access log format",
}

result = wlogging._parse_gunicorn_access_log(None, None, event_dict)

# Should return unchanged if unparsable
assert result["event"] == "this is not a valid access log format"

def test_parse_gunicorn_access_log_non_access_log(self):
event_dict = {"logger": "some.other.logger", "event": "Some message"}

result = wlogging._parse_gunicorn_access_log(None, None, event_dict)

# Should return unchanged for non-access logs
assert result == event_dict


def test_create_id(monkeypatch):
uuid4 = pretend.call_recorder(lambda: "a fake uuid")
Expand Down Expand Up @@ -82,13 +121,17 @@ def test_includeme(monkeypatch, settings, expected_level):
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"structlog": {"()": "warehouse.logging.StructlogFormatter"}
"structlog_formatter": {
"()": structlog.stdlib.ProcessorFormatter,
"processor": mock.ANY,
"foreign_pre_chain": mock.ANY,
}
},
"handlers": {
"primary": {
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
"formatter": "structlog",
"formatter": "structlog_formatter",
},
},
"loggers": {
Expand Down Expand Up @@ -119,10 +162,12 @@ def test_includeme(monkeypatch, settings, expected_level):
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
mock.ANY,
mock.ANY,
mock.ANY, # PositionalArgumentsFormatter
mock.ANY, # TimeStamper
mock.ANY, # StackInfoRenderer
structlog.processors.format_exc_info,
wlogging.RENDERER,
mock.ANY, # _add_datadog_context
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=mock.ANY,
wrapper_class=structlog.stdlib.BoundLogger,
Expand All @@ -135,6 +180,10 @@ def test_includeme(monkeypatch, settings, expected_level):
)
assert isinstance(
configure.calls[0].kwargs["processors"][4],
structlog.processors.TimeStamper,
)
assert isinstance(
configure.calls[0].kwargs["processors"][5],
structlog.processors.StackInfoRenderer,
)
assert isinstance(
Expand Down
151 changes: 126 additions & 25 deletions warehouse/logging.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,158 @@
# SPDX-License-Identifier: Apache-2.0

import logging.config
import threading
import os
import re
import uuid

import structlog

request_logger = structlog.get_logger("warehouse.request")

RENDERER = structlog.processors.JSONRenderer()
# Determine if we're in development mode
DEV_MODE = os.environ.get("WAREHOUSE_ENV") == "development"


class StructlogFormatter(logging.Formatter):
def format(self, record):
# TODO: Figure out a better way of handling this besides just looking
# at the logger name, ideally this would have some way to
# really differentiate between log items which were logged by
# structlog and which were not.
if not record.name.startswith("warehouse."):
# TODO: Is there a better way to handle this? Maybe we can figure
# out a way to pass this through the structlog processors
# instead of manually duplicating the side effects here?
event_dict = {
"logger": record.name,
"level": record.levelname,
"event": record.msg,
"thread": threading.get_ident(),
}
record.msg = RENDERER(None, record.levelname, event_dict)

return super().format(record)
# Choose renderer based on environment
RENDERER: structlog.dev.ConsoleRenderer | structlog.processors.JSONRenderer
if DEV_MODE:
RENDERER = structlog.dev.ConsoleRenderer(colors=True)
else:
RENDERER = structlog.processors.JSONRenderer()


def _create_id(request):
return str(uuid.uuid4())


def _add_datadog_context(logger, method_name, event_dict):
"""Add Datadog trace context if available"""
try:
import ddtrace

span = ddtrace.tracer.current_span()
if span:
event_dict["dd.trace_id"] = str(span.trace_id)
event_dict["dd.span_id"] = str(span.span_id)
event_dict["dd.service"] = span.service
# deployment metadata
event_dict["dd.env"] = os.environ.get("DD_ENV", "development")
event_dict["dd.version"] = os.environ.get("DD_VERSION", "unknown")
except (ImportError, AttributeError):
pass
return event_dict


def _parse_gunicorn_access_log(logger, method_name, event_dict):
"""Parse Gunicorn logs into structlog ((only access logs)."""
if event_dict.get("logger") != "gunicorn.access":
return event_dict

message = event_dict.get("event", "")

# based on
# https://albersdevelopment.net/2019/08/15/using-structlog-with-gunicorn/
# and friends
# Combined log format:
# host - user [time] "request" status size "referer" "user-agent"
pattern = re.compile(
r"(?P<remote_addr>\S+) \S+ (?P<user>\S+) "
r'\[(?P<timestamp>.+?)\] "(?P<request>.+?)" '
r"(?P<status>\d+) (?P<size>\S+) "
r'"(?P<referrer>.*?)" "(?P<user_agent>.*?)"'
)

match = pattern.match(message)
if not match:
return event_dict

fields = match.groupdict()

# sanitize
fields["user"] = None if fields["user"] == "-" else fields["user"]
fields["status"] = int(fields["status"])
fields["size"] = 0 if fields["size"] == "-" else int(fields["size"])
fields["referrer"] = None if fields["referrer"] == "-" else fields["referrer"]

# Parse "GET /path HTTP/1.1" into separate fields
request_parts = fields["request"].split(" ", 2)
if len(request_parts) >= 2:
fields["method"] = request_parts[0]
fields["path"] = request_parts[1]
if len(request_parts) == 3:
fields["protocol"] = request_parts[2]

event_dict.update(fields)
event_dict["event"] = "http_request"
return event_dict


def configure_celery_logging(logfile: str | None = None, loglevel: int = logging.INFO):
"""Configure unified structlog logging for Celery that handles all log types."""
processors: list = [
structlog.contextvars.merge_contextvars,
structlog.processors.TimeStamper(fmt="iso"),
structlog.stdlib.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
_add_datadog_context,
]
formatter = structlog.stdlib.ProcessorFormatter(
processor=RENDERER,
foreign_pre_chain=processors,
)

handler = logging.FileHandler(logfile) if logfile else logging.StreamHandler()
handler.setFormatter(formatter)

root = logging.getLogger()
root.handlers.clear()
root.addHandler(handler)
root.setLevel(loglevel)

structlog.configure(
processors=processors
+ [
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
cache_logger_on_first_use=True,
)


def _create_logger(request):
# This has to use **{} instead of just a kwarg because request.id is not
# an allowed kwarg name.
return request_logger.bind(**{"request.id": request.id})


def includeme(config):
# non structlog thigns
foreign_pre_chain: list = [
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.processors.TimeStamper(fmt="iso"),
_add_datadog_context,
_parse_gunicorn_access_log,
]

# Configure the standard library logging
logging.config.dictConfig(
{
"version": 1,
"disable_existing_loggers": False,
"formatters": {"structlog": {"()": "warehouse.logging.StructlogFormatter"}},
"formatters": {
"structlog_formatter": {
"()": structlog.stdlib.ProcessorFormatter,
"processor": RENDERER,
"foreign_pre_chain": foreign_pre_chain,
}
},
"handlers": {
"primary": {
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
"formatter": "structlog",
"formatter": "structlog_formatter",
},
},
"loggers": {
Expand Down Expand Up @@ -88,9 +187,11 @@ def includeme(config):
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
RENDERER,
_add_datadog_context,
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
Expand Down
23 changes: 23 additions & 0 deletions warehouse/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
import celery.backends.redis
import pyramid.scripting
import pyramid_retry
import structlog
import transaction
import venusian

from celery import signals
from kombu import Queue
from pyramid.threadlocal import get_current_request

Expand All @@ -36,6 +38,23 @@
logger = logging.getLogger(__name__)


# Celery signal handlers for unified structlog configuration
# https://github.com/hynek/structlog/issues/287
# https://www.structlog.org/en/stable/frameworks.html#celery
@signals.after_setup_logger.connect
def on_after_setup_logger(logger, loglevel, logfile, *args, **kwargs):
"""Override Celery's default logging behavior w/ unified structlog configuration."""
from warehouse.logging import configure_celery_logging

configure_celery_logging(logfile, loglevel)


@signals.task_prerun.connect
def on_task_prerun(sender, task_id, task, **_):
"""Bind task metadata to contextvars for all logs within the task."""
structlog.contextvars.bind_contextvars(task_id=task_id, task_name=task.name)


class TLSRedisBackend(celery.backends.redis.RedisBackend):
def _params_from_url(self, url, defaults):
params = super()._params_from_url(url, defaults)
Expand Down Expand Up @@ -301,6 +320,10 @@ def includeme(config):
REDBEAT_REDIS_URL=s["celery.scheduler_url"],
# Silence deprecation warning on startup
broker_connection_retry_on_startup=False,
# Disable Celery's logger hijacking for unified structlog control
worker_hijack_root_logger=False,
worker_log_format="%(message)s",
worker_task_log_format="%(message)s",
)
config.registry["celery.app"].Task = WarehouseTask
config.registry["celery.app"].pyramid_config = config
Expand Down
Loading