Skip to content

Conversation

@archit-harness
Copy link
Contributor

@archit-harness archit-harness commented Dec 16, 2025

Description

Scenario Without Drain Loop:
Timeline:────────
T1: Tool sends notification → channels → [notif1]
T2: Goroutine receives notif1 (locks mutex)
T3: Tool finishes
T4: Main handler tries to write response BUT: Goroutine still has the mutex (writing notif1)
T5: Goroutine finishes, releases mutex
T6: Main handler writes response
T7: close(done) - too late, response already sent!

The last notification (or notifications queued right before response) could get lost because:
The goroutine might be busy processing one notification
New notifications arrive in the channel
Main handler takes over before goroutine loops back

With Drain Loop:
Timeline:────────
T1: Tool sends notifications rapidly
T2: Goroutine processes some, some queue up
T3: Tool finishes
T4: Main handler locks mutex (goroutine blocked waiting for lock)
T5: Drain loop catches all pending notifications
T6: close(done) → goroutine exits
T7: Unlock → goroutine can exit cleanly
T8: No race condition, all notifications included

Fixes #<issue_number> (if applicable)

Type of Change

  • Bug fix (non-breaking change that fixes an issue)
  • New feature (non-breaking change that adds functionality)
  • MCP spec compatibility implementation
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update
  • Code refactoring (no functional changes)
  • Performance improvement
  • Tests only (no functional changes)
  • Other (please describe):

Checklist

  • My code follows the code style of this project
  • I have performed a self-review of my own code
  • I have added tests that prove my fix is effective or that my feature works
  • I have updated the documentation accordingly

MCP Spec Compliance

  • This PR implements a feature defined in the MCP specification
  • Link to relevant spec section: Link text
  • Implementation follows the specification exactly

Additional Information

Summary by CodeRabbit

  • New Features

    • Enhanced streaming HTTP response handling with improved Server-Sent Events notification delivery and queue processing.
  • Bug Fixes

    • Improved handling of pending notifications to ensure proper delivery coordination.
  • Tests

    • Added test coverage validating notification drain behavior and Server-Sent Events format compliance in streaming sessions.

✏️ Tip: You can customize this high-level summary in your review settings.

- Added drain loop (lines 447-467) to catch pending notifications after HandleMessage completes - Fixes synchronization bug: close(done) now executes BEFORE mu.Unlock() to properly signal goroutine exit - Prevents notifications from being lost when they arrive during response processing - Added TestStreamableHTTP_DrainNotifications to validate drain loop functionality - Ensures thread-safe handling of concurrent notification writes The drain loop provides a non-blocking check for pending notifications in the channel and drains them before sending the final response, ensuring all notifications are included in the SSE stream response.
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 16, 2025

Walkthrough

Modified notification handling in server/streamable_http.go to implement coordinated draining of pending notifications before response completion. Replaces immediate unlock pattern with a drainLoop that upgrades to SSE on first notification, writes each event with flushing, then signals completion. Adds corresponding test to validate draining behavior.

Changes

Cohort / File(s) Summary
Notification drain implementation
server/streamable_http.go
Replaces immediate unlock/defer close(done) pattern with drainLoop that processes pending notifications from session.notificationChannel, upgrades to SSE on first notification, writes SSE events with flush calls, then closes done before unlocking. Adds explicit SSE header initialization and per-message error handling.
Drain notification test
server/streamable_http_test.go
Adds TestStreamableHTTP_DrainNotifications test function to validate that pending notifications are drained and written as SSE events when response is computed, including verification of Server-Sent Events format and content.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

  • Areas requiring attention:
    • Notification draining logic in the new drainLoop — ensure proper channel synchronization and avoid race conditions
    • SSE header initialization and per-message write/flush sequence — verify correct event formatting and stream handling
    • Interaction between drainLoop, done channel closure, and final response handling — confirm signal ordering prevents deadlocks
    • Test coverage of edge cases (e.g., empty notification channel, SSE write failures during drain)

Possibly related PRs

Suggested reviewers

  • pottekkat
  • dugenkui03

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: draining pending notifications before writing the response to prevent missing notifications.
Description check ✅ Passed The description includes a clear problem statement with timeline examples, identifies the bug fix type, but several checklist items remain unchecked and the issue number placeholder is unfilled.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
server/streamable_http_test.go (2)

2488-2509: Remove or utilize the unused variable.

The drainLoopCalled channel is declared (line 2492) but never written to and only has a no-op read at line 2578. Either remove this variable or implement the intended verification logic.

Apply this diff to remove the unused variable:

 func TestStreamableHTTP_DrainNotifications(t *testing.T) {	t.Run("drain pending notifications after response is computed", func(t *testing.T) {	mcpServer := NewMCPServer("test-mcp-server", "1.0") -drainLoopCalled := make(chan int, 1) -	// Add a tool that sends notifications rapidly (faster than the goroutine can process)	// This forces notifications to queue up in the channel, testing the drain loop	mcpServer.AddTool(mcp.Tool{

And remove the no-op read:

	if !strings.Contains(responseStr, "event: message") {	t.Errorf("Expected SSE event format in response")	} - -_ = drainLoopCalled	}) }

2494-2566: Avoid hard-coded line number references in comments.

The comments reference specific line numbers in streamable_http.go (lines 2501, 2502, 2561), which will become outdated as the code evolves. Consider referencing by function name or using relative descriptions instead.

For example:

-// The concurrent goroutine (line 394-434 in streamable_http.go) may not process all of them -// before we hit the drain loop at line 448-468 +// The concurrent goroutine in handlePost may not process all of them +// before we hit the drain loop after response computation
-// Without the drain loop, we'd get fewer notifications -// With the drain loop, we catch the pending ones at line 448-468 +// Without the drain loop, we'd get fewer notifications +// With the drain loop, we catch the pending ones in the drain phase
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 919c4bf and 7851643.

📒 Files selected for processing (2)
  • server/streamable_http.go (1 hunks)
  • server/streamable_http_test.go (1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Order imports: standard library first, then third-party, then local packages (goimports enforces this)
Follow Go naming conventions: exported identifiers in PascalCase; unexported in camelCase; acronyms uppercase (HTTP, JSON, MCP)
Error handling: return sentinel errors, wrap with fmt.Errorf("context: %w", err), and check with errors.Is/As
Prefer explicit types and strongly-typed structs; avoid using any except where protocol flexibility is required (e.g., Arguments any)
All exported types and functions must have GoDoc comments starting with the identifier name; avoid inline comments unless necessary
Functions that are handlers or long-running must accept context.Context as the first parameter
Ensure thread safety for shared state using sync.Mutex and document thread-safety requirements in comments
For JSON: use json struct tags with omitempty for optional fields; use json.RawMessage for flexible/deferred parsing

Files:

  • server/streamable_http_test.go
  • server/streamable_http.go
**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*_test.go: Testing: use testify/assert and testify/require
Write table-driven tests using a tests := []struct{ name, ... } pattern
Go test files must end with _test.go

Files:

  • server/streamable_http_test.go
🧠 Learnings (1)
📚 Learning: 2025-08-14T16:35:34.100Z
Learnt from: JBUinfo Repo: mark3labs/mcp-go PR: 548 File: server/stdio.go:239-300 Timestamp: 2025-08-14T16:35:34.100Z Learning: In server/stdio.go, the s.mu mutex protects access to the writer field, but does not protect the actual write operations to stdout. Concurrent write operations to the same stdout stream need to be synchronized using the same mutex to prevent byte interleaving and JSON corruption. 

Applied to files:

  • server/streamable_http.go
🧬 Code graph analysis (1)
server/streamable_http_test.go (5)
server/server.go (1)
  • ServerFromContext (82-87)
mcp/tools.go (3)
  • Tool (557-578)
  • CallToolRequest (54-58)
  • CallToolResult (40-51)
mcp/utils.go (1)
  • NewToolResultText (271-280)
server/streamable_http.go (1)
  • NewTestStreamableHTTPServer (1407-1411)
server/constants.go (1)
  • HeaderKeySessionID (5-5)
🔇 Additional comments (2)
server/streamable_http.go (2)

447-471: LGTM! Drain loop implementation prevents missed notifications.

The drain loop correctly ensures all pending notifications are consumed before closing the done channel and unlocking the mutex. The non-blocking select with default prevents indefinite blocking, and the SSE upgrade logic is consistent with the concurrent goroutine's behavior.

The sequence of operations is correct:

  1. Lock mutex (line 445)
  2. Drain notifications (lines 447-467)
  3. Close done channel (line 470)
  4. Unlock mutex (line 471)

This prevents the race condition where the response could be written before pending notifications are processed.

Based on learnings, the mutex protecting write operations is consistent with the pattern used elsewhere in the codebase for synchronizing concurrent writes.


388-514: Document thread-safety requirements in code comments and verify the drain loop holds the mutex.

The synchronization pattern is fundamentally sound: the mutex protects all ResponseWriter accesses, the goroutine respects the done channel check, and the drain loop closes done while holding the lock. However, the code lacks documentation of thread-safety requirements as specified in the coding guidelines. The mutex Unlock happens before the subsequent Lock returns, ensuring the goroutine's pending writes complete before the drain loop begins.

Two items need attention:

  1. Add thread-safety documentation: Document that the session's shared state (ResponseWriter w, upgradedHeader boolean, and notificationChannel) requires mutex coordination and that this pattern ensures concurrent goroutines cannot write to the ResponseWriter simultaneously.

  2. Verify mutex coverage: Confirm that all code paths accessing ResponseWriter fields (Header.Set, WriteHeader) are protected by mu, including edge cases after unlock at line 471.

@archit-harness
Copy link
Contributor Author

@coderabbitai generate docstrings

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 16, 2025

✅ Actions performed

Initiated docstring generation; will generate only if new commits exist.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 16, 2025

Caution

Docstrings generation - FAILED

No docstrings were generated.

@archit-harness
Copy link
Contributor Author

@pottekkat can you please review

@ezynda3 ezynda3 merged commit 59b3a29 into mark3labs:main Dec 19, 2025
4 checks passed
@archit-harness
Copy link
Contributor Author

@ezynda3 do we know when do we get the new version of the client containing this fix?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

2 participants