Skip to content

Commit 311eae2

Browse files
authored
[Serve][LLM] Add Pause/Resume Control Plane API for Ray Serve LLM (#59523)
Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
1 parent 4aa3a5b commit 311eae2

File tree

11 files changed

+633
-8
lines changed

11 files changed

+633
-8
lines changed

python/ray/llm/_internal/serve/core/engine/protocol.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,30 @@ async def is_sleeping(self) -> bool:
211211
"""
212212
return False
213213

214+
async def pause(self, **kwargs: Any) -> None:
215+
"""Pause the engine.
216+
217+
Args:
218+
**kwargs: Engine-specific pause options. Passed through to the engine.
219+
"""
220+
pass
221+
222+
async def resume(self, **kwargs: Any) -> None:
223+
"""Resume the engine.
224+
225+
Args:
226+
**kwargs: Engine-specific resume options. Passed through to the engine.
227+
"""
228+
pass
229+
230+
async def is_paused(self) -> bool:
231+
"""Check whether the engine is currently paused.
232+
233+
Returns:
234+
True if the engine is paused, False otherwise.
235+
"""
236+
return False
237+
214238
def shutdown(self) -> None:
215239
"""Shuts down the engine"""
216240
pass

python/ray/llm/_internal/serve/core/ingress/dev_ingress.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
POST /sleep: Put engine to sleep (frees GPU memory)
1010
POST /wakeup: Wake up engine from sleep
1111
GET /is_sleeping: Check if engine is sleeping
12+
POST /pause: Pause generation (keeps weights in GPU)
13+
POST /resume: Resume generation after pause
14+
GET /is_paused: Check if engine is paused
1215
POST /reset_prefix_cache: Reset the KV prefix cache
1316
"""
1417

@@ -27,6 +30,7 @@
2730
)
2831
from ray.llm._internal.serve.core.ingress.mixins import (
2932
CacheManagerIngressMixin,
33+
PausableIngressMixin,
3034
SleepableIngressMixin,
3135
)
3236
from ray.llm._internal.serve.core.server.builder import build_llm_deployment
@@ -39,12 +43,18 @@
3943
# Endpoint map for DevIngress - includes all default endpoints plus control plane
4044
DEV_ENDPOINTS = {
4145
**CacheManagerIngressMixin.ENDPOINTS,
46+
**PausableIngressMixin.ENDPOINTS,
4247
**SleepableIngressMixin.ENDPOINTS,
4348
**DEFAULT_ENDPOINTS,
4449
}
4550

4651

47-
class DevIngress(OpenAiIngress, SleepableIngressMixin, CacheManagerIngressMixin):
52+
class DevIngress(
53+
OpenAiIngress,
54+
SleepableIngressMixin,
55+
PausableIngressMixin,
56+
CacheManagerIngressMixin,
57+
):
4858
"""OpenAI-compatible ingress with additional control plane endpoints.
4959
5060
This ingress extends the standard OpenAI endpoints with control plane
@@ -55,6 +65,7 @@ class DevIngress(OpenAiIngress, SleepableIngressMixin, CacheManagerIngressMixin)
5565
5666
Control plane endpoints provided by mixins:
5767
- SleepableIngressMixin: /sleep, /wakeup, /is_sleeping
68+
- PausableIngressMixin: /pause, /resume, /is_paused
5869
- CacheManagerIngressMixin: /reset_prefix_cache
5970
6071
WARNING: These endpoints are intended for development and trusted
@@ -68,8 +79,10 @@ def build_dev_openai_app(builder_config: Dict) -> Application:
6879
"""Build an OpenAI compatible app with dev/control plane endpoints.
6980
7081
This is similar to build_openai_app but uses DevIngress with
71-
additional control plane endpoints (/sleep, /wakeup, /is_sleeping,
72-
/reset_prefix_cache).
82+
additional control plane endpoints:
83+
- /sleep, /wakeup, /is_sleeping (sleep mode - offloads weights to CPU)
84+
- /pause, /resume, /is_paused (pause mode - keeps weights in GPU)
85+
- /reset_prefix_cache (cache management)
7386
7487
Args:
7588
builder_config: Configuration conforming to LLMServingArgs.

python/ray/llm/_internal/serve/core/ingress/mixins/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@
77
CacheManagerIngressMixin,
88
ResetPrefixCacheRequest,
99
)
10+
from ray.llm._internal.serve.core.ingress.mixins.pausable import (
11+
IsPausedResponse,
12+
PausableIngressMixin,
13+
PauseRequest,
14+
ResumeRequest,
15+
)
1016
from ray.llm._internal.serve.core.ingress.mixins.sleepable import (
1117
IsSleepingResponse,
1218
SleepableIngressMixin,
@@ -16,8 +22,12 @@
1622

1723
__all__ = [
1824
"CacheManagerIngressMixin",
25+
"PausableIngressMixin",
1926
"SleepableIngressMixin",
2027
"ResetPrefixCacheRequest",
28+
"PauseRequest",
29+
"ResumeRequest",
30+
"IsPausedResponse",
2131
"SleepRequest",
2232
"WakeupRequest",
2333
"IsSleepingResponse",
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
"""Pausable ingress mixin.
2+
3+
Provides HTTP endpoints for pause/resume control plane operations.
4+
"""
5+
6+
from typing import Any, Dict
7+
8+
from fastapi import Query
9+
from pydantic import BaseModel, Field
10+
from starlette.responses import Response
11+
12+
from ray.llm._internal.serve.core.ingress.mixins.broadcastable import (
13+
ReplicaBroadcastable,
14+
)
15+
from ray.llm._internal.serve.observability.logging import get_logger
16+
17+
logger = get_logger(__name__)
18+
19+
20+
# --- Pydantic Models ---
21+
22+
23+
class PauseRequest(BaseModel):
24+
"""Request to pause generation on an engine."""
25+
26+
model: str
27+
options: Dict[str, Any] = Field(
28+
default_factory=dict,
29+
description="Engine-specific pause options (e.g., wait_for_inflight_requests, clear_cache)",
30+
)
31+
32+
33+
class ResumeRequest(BaseModel):
34+
"""Request to resume generation on an engine."""
35+
36+
model: str
37+
options: Dict[str, Any] = Field(
38+
default_factory=dict,
39+
description="Engine-specific resume options",
40+
)
41+
42+
43+
class IsPausedResponse(BaseModel):
44+
"""Response indicating whether the engine is paused."""
45+
46+
is_paused: bool
47+
48+
49+
# --- Mixin ---
50+
51+
52+
class PausableIngressMixin(ReplicaBroadcastable):
53+
"""Ingress mixin for /pause, /resume, /is_paused endpoints.
54+
55+
Adds control plane endpoints for managing engine pause state.
56+
Pause mode halts generation/encoding while keeping weights in GPU memory.
57+
Unlike sleep mode, pause does not offload weights to CPU.
58+
"""
59+
60+
ENDPOINTS = {
61+
"pause": lambda app: app.post("/pause"),
62+
"resume": lambda app: app.post("/resume"),
63+
"is_paused": lambda app: app.get("/is_paused"),
64+
}
65+
66+
async def pause(self, body: PauseRequest) -> Response:
67+
"""Pause generation on all replicas for the specified model.
68+
69+
This halts generation/encoding requests while keeping model weights
70+
in GPU memory. New requests are blocked until resume is called.
71+
Unlike sleep mode, pause does not offload weights to CPU.
72+
73+
Args:
74+
body: Request containing the model ID and engine-specific options.
75+
Options may include:
76+
- wait_for_inflight_requests (bool): Wait for in-flight requests
77+
to finish before pausing. Default False (abort immediately).
78+
- clear_cache (bool): Clear KV cache after draining. Default True.
79+
80+
Returns:
81+
200 OK on success.
82+
"""
83+
logger.info("Pausing model %s with options: %s", body.model, body.options)
84+
await self._broadcast_to_replicas(body.model, "pause", kwargs=body.options)
85+
return Response(status_code=200)
86+
87+
async def resume(self, body: ResumeRequest) -> Response:
88+
"""Resume generation on all replicas for the specified model.
89+
90+
Args:
91+
body: Request containing the model ID and engine-specific options.
92+
93+
Returns:
94+
200 OK on success.
95+
"""
96+
logger.info("Resuming model %s with options: %s", body.model, body.options)
97+
await self._broadcast_to_replicas(body.model, "resume", kwargs=body.options)
98+
return Response(status_code=200)
99+
100+
async def is_paused(
101+
self, model: str = Query(..., description="The model ID to check")
102+
) -> IsPausedResponse:
103+
"""Check if the engine is paused for the specified model.
104+
105+
This checks the pause status across all replicas. Returns True if
106+
ANY replica is paused (uses logical OR across replicas).
107+
108+
Args:
109+
model: The model ID to check.
110+
111+
Returns:
112+
IsPausedResponse with is_paused boolean.
113+
"""
114+
results = await self._broadcast_to_replicas(model, "is_paused")
115+
is_paused_result = any(results) if results else False
116+
return IsPausedResponse(is_paused=is_paused_result)

python/ray/llm/_internal/serve/core/protocol.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,27 @@ async def is_sleeping(self) -> bool:
146146
True if the engine is sleeping, False otherwise.
147147
"""
148148

149+
async def pause(self, **kwargs: Any) -> None:
150+
"""Pause the engine.
151+
152+
Args:
153+
**kwargs: Engine-specific pause options. Passed through to the engine.
154+
"""
155+
156+
async def resume(self, **kwargs: Any) -> None:
157+
"""Resume the engine.
158+
159+
Args:
160+
**kwargs: Engine-specific resume options. Passed through to the engine.
161+
"""
162+
163+
async def is_paused(self) -> bool:
164+
"""Check whether the engine is currently paused.
165+
166+
Returns:
167+
True if the engine is paused, False otherwise.
168+
"""
169+
149170
# TODO (Kourosh): This does not belong here.
150171
async def llm_config(self) -> Optional["LLMConfig"]:
151172
"""Get the LLM config"""

python/ray/llm/_internal/serve/core/server/llm_server.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,51 @@ async def reset_prefix_cache(self) -> None:
522522
)
523523
raise e
524524

525+
async def pause(self, **kwargs: Any) -> None:
526+
"""Pause generation on the engine.
527+
528+
This halts generation requests while keeping model weights
529+
in GPU memory. New requests are blocked until resume is called.
530+
531+
Args:
532+
**kwargs: Engine-specific pause options. Passed through to the engine.
533+
"""
534+
if self.engine is None:
535+
return
536+
try:
537+
await self.engine.pause(**kwargs)
538+
except Exception as e:
539+
logger.error("Engine pause failed in LLMServer.pause: %s", e)
540+
raise e
541+
542+
async def resume(self, **kwargs: Any) -> None:
543+
"""Resume generation on the engine after pause.
544+
545+
Args:
546+
**kwargs: Engine-specific resume options. Passed through to the engine.
547+
"""
548+
if self.engine is None:
549+
return
550+
try:
551+
await self.engine.resume(**kwargs)
552+
except Exception as e:
553+
logger.error("Engine resume failed in LLMServer.resume: %s", e)
554+
raise e
555+
556+
async def is_paused(self) -> bool:
557+
"""Check whether the engine is currently paused.
558+
559+
Returns:
560+
True if the engine is paused, False otherwise.
561+
"""
562+
if self.engine is None:
563+
return False
564+
try:
565+
return await self.engine.is_paused()
566+
except Exception as e:
567+
logger.error("Engine is_paused failed in LLMServer.is_paused: %s", e)
568+
raise e
569+
525570
async def start_profile(self) -> None:
526571
"""Start profiling"""
527572
if self.engine is None:

python/ray/llm/_internal/serve/engines/vllm/vllm_engine.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,20 @@ def validate_tags(cls, v: Any) -> Optional[List[str]]:
157157
return v
158158

159159

160+
class VLLMPauseConfig(BaseModel):
161+
"""vLLM-specific configuration for pause operation."""
162+
163+
wait_for_inflight_requests: bool = False
164+
"""When True, waits for in-flight requests to finish before pausing.
165+
When False (default), aborts in-flight requests immediately.
166+
"""
167+
168+
clear_cache: bool = True
169+
"""Whether to clear KV and prefix caches after draining.
170+
Set to False to preserve cache for faster resume.
171+
"""
172+
173+
160174
class VLLMEngine(LLMEngine):
161175
def __init__(
162176
self,
@@ -583,6 +597,43 @@ async def is_sleeping(self) -> bool:
583597
assert self._engine_client is not None, "engine_client is not initialized"
584598
return await self._engine_client.is_sleeping()
585599

600+
async def pause(self, **kwargs: Any) -> None:
601+
"""Pause generation on the vLLM engine.
602+
603+
This halts generation/encoding requests while keeping model weights
604+
in GPU memory. New requests are blocked until resume is called.
605+
606+
Args:
607+
**kwargs: Options parsed into VLLMPauseConfig.
608+
- wait_for_inflight_requests (bool): Wait for in-flight requests
609+
to finish. Default False.
610+
- clear_cache (bool): Clear KV cache after draining. Default True.
611+
"""
612+
assert self._engine_client is not None, "engine_client is not initialized"
613+
config = VLLMPauseConfig(**kwargs)
614+
await self._engine_client.pause_generation(
615+
wait_for_inflight_requests=config.wait_for_inflight_requests,
616+
clear_cache=config.clear_cache,
617+
)
618+
619+
async def resume(self, **kwargs: Any) -> None:
620+
"""Resume generation on the vLLM engine after pause.
621+
622+
Args:
623+
**kwargs: Reserved for future options.
624+
"""
625+
assert self._engine_client is not None, "engine_client is not initialized"
626+
await self._engine_client.resume_generation()
627+
628+
async def is_paused(self) -> bool:
629+
"""Check whether the vLLM engine is currently paused.
630+
631+
Returns:
632+
True if the engine is paused, False otherwise.
633+
"""
634+
assert self._engine_client is not None, "engine_client is not initialized"
635+
return await self._engine_client.is_paused()
636+
586637
async def start_profile(self) -> None:
587638
assert self._engine_client is not None, "engine_client is not initialized"
588639
await self._engine_client.start_profile()

0 commit comments

Comments
 (0)