- Notifications
You must be signed in to change notification settings - Fork 3k
fix: reduce checkpointer calls lg events #2063
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
1 Skipped Deployment
|
## Walkthrough The `_stream_events` method in `langgraph_agent.py` was modified to maintain a local mutable dictionary, `current_graph_state`, which tracks the agent's state throughout event streaming. This approach reduces redundant asynchronous state fetches by updating the local state with event outputs and using it as the authoritative state during processing. Additionally, thread-specific state caching and method signature updates optimize state handling and synchronization. ## Changes | File(s) | Change Summary | |----------------------------------------|--------------------------------------------------------------------------------------------------| | sdk-python/copilotkit/langgraph_agent.py | Added `thread_state` cache; updated `prepare_stream` to accept explicit `agent_state`; maintained local mutable `current_graph_state` updated during streaming; modified `_stream_events` to use cached and updated state instead of repeated async fetches; changed `prepare_regenerate_stream` and `get_checkpoint_before_message` signatures to use `thread_id` explicitly. | | sdk-python/pyproject.toml | Updated `copilotkit` SDK version from `0.1.53` to `0.1.54-alpha.3`. | ## Sequence Diagram(s) ```mermaid sequenceDiagram participant Client participant LangGraphAgent participant EventStream Client->>LangGraphAgent: Start _stream_events(initial_state) LangGraphAgent->>LangGraphAgent: Initialize current_graph_state from agent_state loop For each event in EventStream EventStream-->>LangGraphAgent: Send event alt event.type == "on_chain_end" and event.output is dict LangGraphAgent->>LangGraphAgent: Update current_graph_state with event.output end alt Intermediate state emitted LangGraphAgent->>LangGraphAgent: Use emitted intermediate state as updated_state else LangGraphAgent->>LangGraphAgent: Use current_graph_state as updated_state end LangGraphAgent->>Client: Emit event with updated_state LangGraphAgent->>LangGraphAgent: Update current_graph_state with updated_state end Suggested reviewers
|
d8f31d5
to e035299
Compare e035299
to 69c9eee
Compare 48b8478
to 29dda16
Compare 29dda16
to f8f9592
Compare There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
sdk-python/copilotkit/langgraph_agent.py (1)
270-272
: Consider removing explicit None assignments.The explicit assignment of
None
values for"stream"
,"state"
, and"config"
keys is redundant since the dictionary could simply omit these keys when they're not applicable.- "stream": None, - "state": None, - "config": None, + # stream, state, and config are omitted when interrupts are active without resume input
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
sdk-python/copilotkit/langgraph_agent.py
(11 hunks)sdk-python/pyproject.toml
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- sdk-python/pyproject.toml
⏰ Context from checks skipped due to timeout of 90000ms (11)
- GitHub Check: Build Images (coagents-qa-native, ui)
- GitHub Check: Build Images (next-openai, next-openai)
- GitHub Check: Build Images (coagents-qa-native, agent, python, local-deps)
- GitHub Check: Build Images (coagents-qa-text, ui)
- GitHub Check: Build Images (coagents-routing, agent, python, local-deps)
- GitHub Check: Build Images (coagents-qa-text, agent, python, local-deps)
- GitHub Check: Build Images (coagents-routing, ui)
- GitHub Check: Build Images (coagents-research-canvas, agent, python, local-deps)
- GitHub Check: Build Images (coagents-research-canvas, ui)
- GitHub Check: Test (18.x)
- GitHub Check: Test (20.x)
🔇 Additional comments (15)
sdk-python/copilotkit/langgraph_agent.py (15)
168-168
: LGTM: Thread state caching initialization.The addition of
self.thread_state = {}
provides the foundation for caching thread-specific states, which directly supports the PR objective of reducing checkpointer calls.
223-224
: Effective local state initialization from agent state.The initialization of
current_graph_state
fromagent_state.values
and settingstate_input["messages"]
provides a solid foundation for local state tracking, eliminating the need for repeated checkpointer calls during event processing.
227-227
: Correct parameter usage for merge_state function.Using
state_input
instead of the previous state parameter aligns with the new method signature and maintains consistency in state merging logic.
232-232
: Proper local state synchronization.The
current_graph_state.update(state)
ensures the local state dictionary is kept in sync with the merged state, which is crucial for the optimization strategy.
278-278
: Correct return of local state for optimization.Returning
current_graph_state
instead of the merged state ensures the caller receives the locally maintained state, supporting the optimization goals.
290-291
: Improved parameter extraction and method call.The explicit extraction of
thread_id
and passing it directly toget_checkpoint_before_message
improves clarity and aligns with the updated method signature.
338-338
: Strategic state fetch positioning.Moving the
agent_state
fetch beforeprepare_stream
and passing it explicitly eliminates redundant state fetches within the method, directly addressing the PR objective.
340-341
: Clean separation of state parameters.The explicit passing of
state_input
andagent_state
as separate parameters improves method interface clarity and supports the optimization strategy.
369-369
: Proper local state assignment.Assigning
current_graph_state
from the prepared stream response's"state"
establishes the local state tracking mechanism that's central to the optimization.
403-408
: Excellent optimization: Local state updates from events.This is a key optimization that directly addresses the PR objective. By updating
current_graph_state
with output from"on_chain_end"
events, the code avoids expensive checkpointer calls while maintaining state consistency.The condition checks are appropriate to ensure only valid output dictionaries are used to update the state.
451-451
: Core optimization: Use local state instead of checkpointer.This change represents the heart of the optimization - using
current_graph_state
instead of fetching state from the checkpointer. The fallback tomanually_emitted_state
when present is correct logic.
475-475
: Maintain local state consistency.The
current_graph_state.update(updated_state)
ensures the local state dictionary remains synchronized with state changes, which is essential for the optimization to work correctly throughout the event streaming process.
697-697
: Improved method signature with direct thread_id parameter.The method signature change from accepting a config object to directly accepting
thread_id
is cleaner and more explicit about the required parameter, improving code clarity.
213-214
: prepare_stream Signature Verified Across CodebaseConfirmed via ripgrep that the only definition and invocation of
prepare_stream
reside insdk-python/copilotkit/langgraph_agent.py
, and both pass the newstate_input
andagent_state
parameters. No other references exist, so this change poses no breaking impact.
616-619
: Address potential race in thread state cachingThe current implementation in
sdk-python/copilotkit/langgraph_agent.py
correctly caches per-thread_id
state to avoid redundant fetches, but ifget_state
is invoked concurrently for the samethread_id
, both calls may see a missing entry and trigger duplicateawait self.graph.aget_state(config)
calls. While this won’t corrupt the dict, it does waste resources and could surface unexpected behavior under true multi-threaded or highly concurrent async loads.To harden this:
- Introduce an
asyncio.Lock
(or similar) keyed bythread_id
around the “get-or-set” block.- Ensure only one fetch populates
self.thread_state[thread_id]
, with other calls awaiting its completion.Location:
• File: sdk-python/copilotkit/langgraph_agent.py
• Lines: ~616–619
Preview environments destroyed for this pull request. |
Test ResultsStatus: ✅ Passed (View Run) Commit: e93dc17 Duration: 225.0s Total Tests: 20 Pass Rate: 100.0% Failed Tests: 0 🔍 Detailed Resultscoagents-research-canvasCoAgents Research Canvas (UI) - Local Depenencies
coagents-qa-nativeCoAgents QA Native (UI) - Local
coagents-qa-textCoAgents QA Text (UI) - Local
coagents-routingCoAgents Routing (UI) - Local
next-openaiNext OpenAI - Self Hosted
|
5c98467
to 99ff790
Compare ⏭️ Changeset Not RequiredLatest commit: 99ff790 No changes in this PR affected the Changeset is not required for this PR. |
Today, we are querying the checkpointer for the latest state on every event. This is too many calls.
This PR introduces a significant reduction of these calls
Summary by CodeRabbit