- Notifications
You must be signed in to change notification settings - Fork 740
fix: drain all pending notification before writing the response to avoid missing notifications #670
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Added drain loop (lines 447-467) to catch pending notifications after HandleMessage completes - Fixes synchronization bug: close(done) now executes BEFORE mu.Unlock() to properly signal goroutine exit - Prevents notifications from being lost when they arrive during response processing - Added TestStreamableHTTP_DrainNotifications to validate drain loop functionality - Ensures thread-safe handling of concurrent notification writes The drain loop provides a non-blocking check for pending notifications in the channel and drains them before sending the final response, ensuring all notifications are included in the SSE stream response.
WalkthroughModified notification handling in Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes
Possibly related PRs
Suggested reviewers
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
server/streamable_http_test.go (2)
2488-2509: Remove or utilize the unused variable.The
drainLoopCalledchannel is declared (line 2492) but never written to and only has a no-op read at line 2578. Either remove this variable or implement the intended verification logic.Apply this diff to remove the unused variable:
func TestStreamableHTTP_DrainNotifications(t *testing.T) { t.Run("drain pending notifications after response is computed", func(t *testing.T) { mcpServer := NewMCPServer("test-mcp-server", "1.0") -drainLoopCalled := make(chan int, 1) - // Add a tool that sends notifications rapidly (faster than the goroutine can process) // This forces notifications to queue up in the channel, testing the drain loop mcpServer.AddTool(mcp.Tool{And remove the no-op read:
if !strings.Contains(responseStr, "event: message") { t.Errorf("Expected SSE event format in response") } - -_ = drainLoopCalled }) }
2494-2566: Avoid hard-coded line number references in comments.The comments reference specific line numbers in
streamable_http.go(lines 2501, 2502, 2561), which will become outdated as the code evolves. Consider referencing by function name or using relative descriptions instead.For example:
-// The concurrent goroutine (line 394-434 in streamable_http.go) may not process all of them -// before we hit the drain loop at line 448-468 +// The concurrent goroutine in handlePost may not process all of them +// before we hit the drain loop after response computation-// Without the drain loop, we'd get fewer notifications -// With the drain loop, we catch the pending ones at line 448-468 +// Without the drain loop, we'd get fewer notifications +// With the drain loop, we catch the pending ones in the drain phase
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
server/streamable_http.go(1 hunks)server/streamable_http_test.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*.go: Order imports: standard library first, then third-party, then local packages (goimports enforces this)
Follow Go naming conventions: exported identifiers in PascalCase; unexported in camelCase; acronyms uppercase (HTTP, JSON, MCP)
Error handling: return sentinel errors, wrap with fmt.Errorf("context: %w", err), and check with errors.Is/As
Prefer explicit types and strongly-typed structs; avoid using any except where protocol flexibility is required (e.g., Arguments any)
All exported types and functions must have GoDoc comments starting with the identifier name; avoid inline comments unless necessary
Functions that are handlers or long-running must accept context.Context as the first parameter
Ensure thread safety for shared state using sync.Mutex and document thread-safety requirements in comments
For JSON: use json struct tags with omitempty for optional fields; use json.RawMessage for flexible/deferred parsing
Files:
server/streamable_http_test.goserver/streamable_http.go
**/*_test.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*_test.go: Testing: use testify/assert and testify/require
Write table-driven tests using a tests := []struct{ name, ... } pattern
Go test files must end with _test.go
Files:
server/streamable_http_test.go
🧠 Learnings (1)
📚 Learning: 2025-08-14T16:35:34.100Z
Learnt from: JBUinfo Repo: mark3labs/mcp-go PR: 548 File: server/stdio.go:239-300 Timestamp: 2025-08-14T16:35:34.100Z Learning: In server/stdio.go, the s.mu mutex protects access to the writer field, but does not protect the actual write operations to stdout. Concurrent write operations to the same stdout stream need to be synchronized using the same mutex to prevent byte interleaving and JSON corruption. Applied to files:
server/streamable_http.go
🧬 Code graph analysis (1)
server/streamable_http_test.go (5)
server/server.go (1)
ServerFromContext(82-87)mcp/tools.go (3)
Tool(557-578)CallToolRequest(54-58)CallToolResult(40-51)mcp/utils.go (1)
NewToolResultText(271-280)server/streamable_http.go (1)
NewTestStreamableHTTPServer(1407-1411)server/constants.go (1)
HeaderKeySessionID(5-5)
🔇 Additional comments (2)
server/streamable_http.go (2)
447-471: LGTM! Drain loop implementation prevents missed notifications.The drain loop correctly ensures all pending notifications are consumed before closing the
donechannel and unlocking the mutex. The non-blocking select with default prevents indefinite blocking, and the SSE upgrade logic is consistent with the concurrent goroutine's behavior.The sequence of operations is correct:
- Lock mutex (line 445)
- Drain notifications (lines 447-467)
- Close done channel (line 470)
- Unlock mutex (line 471)
This prevents the race condition where the response could be written before pending notifications are processed.
Based on learnings, the mutex protecting write operations is consistent with the pattern used elsewhere in the codebase for synchronizing concurrent writes.
388-514: Document thread-safety requirements in code comments and verify the drain loop holds the mutex.The synchronization pattern is fundamentally sound: the mutex protects all ResponseWriter accesses, the goroutine respects the done channel check, and the drain loop closes done while holding the lock. However, the code lacks documentation of thread-safety requirements as specified in the coding guidelines. The mutex Unlock happens before the subsequent Lock returns, ensuring the goroutine's pending writes complete before the drain loop begins.
Two items need attention:
Add thread-safety documentation: Document that the session's shared state (ResponseWriter w, upgradedHeader boolean, and notificationChannel) requires mutex coordination and that this pattern ensures concurrent goroutines cannot write to the ResponseWriter simultaneously.
Verify mutex coverage: Confirm that all code paths accessing ResponseWriter fields (Header.Set, WriteHeader) are protected by mu, including edge cases after unlock at line 471.
| @coderabbitai generate docstrings |
✅ Actions performedInitiated docstring generation; will generate only if new commits exist. |
| Caution Docstrings generation - FAILED No docstrings were generated. |
| @pottekkat can you please review |
| @ezynda3 do we know when do we get the new version of the client containing this fix? |
Description
Scenario Without Drain Loop:
Timeline:────────
T1: Tool sends notification → channels → [notif1]
T2: Goroutine receives notif1 (locks mutex)
T3: Tool finishes
T4: Main handler tries to write response BUT: Goroutine still has the mutex (writing notif1)
T5: Goroutine finishes, releases mutex
T6: Main handler writes response
T7: close(done) - too late, response already sent!
The last notification (or notifications queued right before response) could get lost because:
The goroutine might be busy processing one notification
New notifications arrive in the channel
Main handler takes over before goroutine loops back
With Drain Loop:
Timeline:────────
T1: Tool sends notifications rapidly
T2: Goroutine processes some, some queue up
T3: Tool finishes
T4: Main handler locks mutex (goroutine blocked waiting for lock)
T5: Drain loop catches all pending notifications
T6: close(done) → goroutine exits
T7: Unlock → goroutine can exit cleanly
T8: No race condition, all notifications included
Fixes #<issue_number> (if applicable)
Type of Change
Checklist
MCP Spec Compliance
Additional Information
Summary by CodeRabbit
New Features
Bug Fixes
Tests
✏️ Tip: You can customize this high-level summary in your review settings.