Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions litellm/integrations/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,26 @@ def __init__(
self.get_labels_for_metric("litellm_deployment_failed_fallbacks"),
)

########################################
# Deployment Queue Depth Metrics
# Track active and queued requests per deployment
########################################
self.litellm_deployment_active_requests = self._gauge_factory(
"litellm_deployment_active_requests",
"Number of requests currently being processed by a deployment (holding semaphore)",
labelnames=self.get_labels_for_metric(
"litellm_deployment_active_requests"
),
)

self.litellm_deployment_queued_requests = self._gauge_factory(
"litellm_deployment_queued_requests",
"Number of requests waiting for a deployment slot (waiting on semaphore)",
labelnames=self.get_labels_for_metric(
"litellm_deployment_queued_requests"
),
)

# Callback Logging Failure Metrics
self.litellm_callback_logging_failures_metric = self._counter_factory(
name="litellm_callback_logging_failures_metric",
Expand Down Expand Up @@ -1723,6 +1743,35 @@ def set_deployment_complete_outage(
2, litellm_model_name, model_id, api_base, api_provider
)

def set_deployment_queue_depth_metrics(
self,
queue_stats: List[Dict[str, Any]],
) -> None:
"""
Update Prometheus gauges for deployment queue depth.

Args:
queue_stats: List of dicts from Router.get_deployment_queue_stats()
Each dict contains: model_id, model_name, model_group,
max_concurrent, active, queued
"""
for stat in queue_stats:
_labels = prometheus_label_factory(
supported_enum_labels=self.get_labels_for_metric(
"litellm_deployment_active_requests"
),
enum_values=UserAPIKeyLabelValues(
litellm_model_name=stat.get("model_name", ""),
model_group=stat.get("model_group", ""),
),
)
self.litellm_deployment_active_requests.labels(**_labels).set(
stat.get("active", 0)
)
self.litellm_deployment_queued_requests.labels(**_labels).set(
stat.get("queued", 0)
)

def increment_deployment_cooled_down(
self,
litellm_model_name: str,
Expand Down
54 changes: 46 additions & 8 deletions litellm/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# Thank you ! We ❤️ you! - Krrish & Ishaan

import asyncio
from litellm.router_utils.tracked_semaphore import TrackedSemaphore
import copy
import enum
import hashlib
Expand Down Expand Up @@ -1515,7 +1516,7 @@ async def _acompletion(
client_type="max_parallel_requests",
)
if rpm_semaphore is not None and isinstance(
rpm_semaphore, asyncio.Semaphore
rpm_semaphore, (asyncio.Semaphore, TrackedSemaphore)
):
async with rpm_semaphore:
"""
Expand Down Expand Up @@ -2373,7 +2374,7 @@ async def _aimage_generation(self, prompt: str, model: str, **kwargs):
)

if rpm_semaphore is not None and isinstance(
rpm_semaphore, asyncio.Semaphore
rpm_semaphore, (asyncio.Semaphore, TrackedSemaphore)
):
async with rpm_semaphore:
"""
Expand Down Expand Up @@ -2485,7 +2486,7 @@ async def _atranscription(self, file: FileTypes, model: str, **kwargs):
)

if rpm_semaphore is not None and isinstance(
rpm_semaphore, asyncio.Semaphore
rpm_semaphore, (asyncio.Semaphore, TrackedSemaphore)
):
async with rpm_semaphore:
"""
Expand Down Expand Up @@ -2779,7 +2780,7 @@ async def _atext_completion(self, model: str, prompt: str, **kwargs):
)

if rpm_semaphore is not None and isinstance(
rpm_semaphore, asyncio.Semaphore
rpm_semaphore, (asyncio.Semaphore, TrackedSemaphore)
):
async with rpm_semaphore:
"""
Expand Down Expand Up @@ -2878,7 +2879,7 @@ async def _aadapter_completion(self, adapter_id: str, model: str, **kwargs):
)

if rpm_semaphore is not None and isinstance(
rpm_semaphore, asyncio.Semaphore
rpm_semaphore, (asyncio.Semaphore, TrackedSemaphore)
):
async with rpm_semaphore:
"""
Expand Down Expand Up @@ -3049,7 +3050,7 @@ async def _ageneric_api_call_with_fallbacks_helper(
)

if rpm_semaphore is not None and isinstance(
rpm_semaphore, asyncio.Semaphore
rpm_semaphore, (asyncio.Semaphore, TrackedSemaphore)
):
async with rpm_semaphore:
"""
Expand Down Expand Up @@ -3286,7 +3287,7 @@ async def _aembedding(self, input: Union[str, List], model: str, **kwargs):
)

if rpm_semaphore is not None and isinstance(
rpm_semaphore, asyncio.Semaphore
rpm_semaphore, (asyncio.Semaphore, TrackedSemaphore)
):
async with rpm_semaphore:
"""
Expand Down Expand Up @@ -3553,7 +3554,7 @@ async def _acreate_batch(
)

if rpm_semaphore is not None and isinstance(
rpm_semaphore, asyncio.Semaphore
rpm_semaphore, (asyncio.Semaphore, TrackedSemaphore)
):
async with rpm_semaphore:
"""
Expand Down Expand Up @@ -6991,6 +6992,43 @@ def get_model_list(

return returned_models

def get_deployment_queue_stats(self) -> List[Dict[str, Any]]:
"""
Get queue depth statistics for all deployments with max_parallel_requests.

Returns a list of dicts with:
- model_id: The deployment's model ID
- model_name: The deployment's litellm model name
- model_group: The model group name
- max_concurrent: Maximum parallel requests allowed
- active: Number of requests currently being processed
- queued: Number of requests waiting for a slot

Only includes deployments using TrackedSemaphore.
"""
stats = []
for deployment in self.model_list:
model_info = deployment.get("model_info", {})
model_id = model_info.get("id")
if not model_id:
continue

cache_key = f"{model_id}_max_parallel_requests_client"
semaphore = self.cache.get_cache(key=cache_key, local_only=True)

if semaphore is not None and isinstance(semaphore, TrackedSemaphore):
sem_stats = semaphore.stats
stats.append({
"model_id": model_id,
"model_name": deployment.get("litellm_params", {}).get("model", ""),
"model_group": deployment.get("model_name", ""),
"max_concurrent": sem_stats.max_concurrent,
"active": sem_stats.active,
"queued": sem_stats.queued,
})

return stats

def get_model_access_groups(
self,
model_name: Optional[str] = None,
Expand Down
4 changes: 2 additions & 2 deletions litellm/router_utils/client_initalization_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
from typing import TYPE_CHECKING, Any

from litellm.router_utils.tracked_semaphore import TrackedSemaphore
from litellm.utils import calculate_max_parallel_requests

if TYPE_CHECKING:
Expand Down Expand Up @@ -28,7 +28,7 @@ def set_max_parallel_requests_client(
default_max_parallel_requests=litellm_router_instance.default_max_parallel_requests,
)
if calculated_max_parallel_requests:
semaphore = asyncio.Semaphore(calculated_max_parallel_requests)
semaphore = TrackedSemaphore(calculated_max_parallel_requests)
cache_key = f"{model_id}_max_parallel_requests_client"
litellm_router_instance.cache.set_cache(
key=cache_key,
Expand Down
144 changes: 144 additions & 0 deletions litellm/router_utils/tracked_semaphore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
"""
Tracked Semaphore for Queue Depth Metrics.

Provides a semaphore wrapper with public queue depth tracking,
avoiding reliance on private asyncio.Semaphore internals.

Used by LiteLLM Router for max_parallel_requests limiting with
Prometheus metrics support.

GitHub Issue: https://github.com/BerriAI/litellm/issues/17764
"""

import asyncio
from dataclasses import dataclass


@dataclass
class SemaphoreStats:
"""Statistics for a TrackedSemaphore instance."""

max_concurrent: int
active: int
queued: int

@property
def available(self) -> int:
"""Number of available slots."""
return self.max_concurrent - self.active


class TrackedSemaphore:
"""
Asyncio semaphore wrapper with public queue depth tracking.

Unlike asyncio.Semaphore, this class tracks waiting tasks explicitly,
providing public access to queue depth without accessing private internals.

Example:
semaphore = TrackedSemaphore(max_concurrent=3)

async with semaphore:
# Do work with concurrency limit
pass

stats = semaphore.stats
print(f"Active: {stats.active}, Queued: {stats.queued}")
"""

def __init__(self, max_concurrent: int):
"""
Initialize TrackedSemaphore.

Args:
max_concurrent: Maximum number of concurrent acquisitions.
"""
if max_concurrent < 1:
raise ValueError("max_concurrent must be at least 1")

self._semaphore = asyncio.Semaphore(max_concurrent)
self._max_concurrent = max_concurrent
self._active = 0
self._queued = 0
self._lock = asyncio.Lock()

async def acquire(self) -> None:
"""
Acquire the semaphore, waiting if necessary.

Increments queued count while waiting, then increments active count
once acquired.
"""
async with self._lock:
self._queued += 1

try:
await self._semaphore.acquire()
except asyncio.CancelledError:
async with self._lock:
self._queued -= 1
raise

async with self._lock:
self._queued -= 1
self._active += 1

def release(self) -> None:
"""
Release the semaphore, decrementing active count.
"""
# Note: We need to handle the case where release is called
# synchronously but we need to update _active atomically.
# Since release() in asyncio.Semaphore is sync, we use a
# thread-safe approach here.

# Decrement active count
# Using a simple flag since asyncio.Lock can't be used in sync context
self._active -= 1
self._semaphore.release()

async def __aenter__(self) -> "TrackedSemaphore":
"""Async context manager entry."""
await self.acquire()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Async context manager exit."""
self.release()

@property
def stats(self) -> SemaphoreStats:
"""
Get current semaphore statistics.

Returns:
SemaphoreStats with max_concurrent, active, and queued counts.
"""
return SemaphoreStats(
max_concurrent=self._max_concurrent,
active=self._active,
queued=self._queued,
)

@property
def max_concurrent(self) -> int:
"""Maximum number of concurrent acquisitions."""
return self._max_concurrent

@property
def active(self) -> int:
"""Number of currently active (acquired) slots."""
return self._active

@property
def queued(self) -> int:
"""Number of tasks waiting to acquire."""
return self._queued

def locked(self) -> bool:
"""
Return True if semaphore cannot be acquired immediately.

Compatible with asyncio.Semaphore interface.
"""
return self._semaphore.locked()
12 changes: 12 additions & 0 deletions litellm/types/integrations/prometheus.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@ class UserAPIKeyLabelNames(Enum):
"litellm_redis_daily_spend_update_queue_size",
"litellm_in_memory_spend_update_queue_size",
"litellm_redis_spend_update_queue_size",
"litellm_deployment_active_requests",
"litellm_deployment_queued_requests",
]


Expand Down Expand Up @@ -435,6 +437,16 @@ class PrometheusMetricLabels:

litellm_redis_spend_update_queue_size: List[str] = []

litellm_deployment_active_requests = [
UserAPIKeyLabelNames.v1_LITELLM_MODEL_NAME.value,
UserAPIKeyLabelNames.MODEL_GROUP.value,
]

litellm_deployment_queued_requests = [
UserAPIKeyLabelNames.v1_LITELLM_MODEL_NAME.value,
UserAPIKeyLabelNames.MODEL_GROUP.value,
]

@staticmethod
def get_labels(label_name: DEFINED_PROMETHEUS_METRICS) -> List[str]:
default_labels = getattr(PrometheusMetricLabels, label_name)
Expand Down
Loading
Loading