- Notifications
You must be signed in to change notification settings - Fork 31
Description
Describe the bug
Summary
When multiple partitions of a SuperStream close simultaneously (either from shared connection failure or concurrent connection losses), the ReliableSuperStreamProducer fails to reconnect all affected partitions, leading to "Producer not found in the SuperStream Producer" errors.
Root Cause
The bug is in pkg/ha/ha_super_stream_publisher.go:72-97 in the handleNotifyClose function. The goroutine only reads one event from the channelClose channel and then exits, instead of looping to handle all partition close events.
Buggy Code (current)
func (r *ReliableSuperStreamProducer) handleNotifyClose(channelClose chan stream.PPartitionClose) { go func() { cPartitionClose := <-channelClose // ← Only reads ONE event, then goroutine exits if strings.EqualFold(cPartitionClose.Event.Reason, stream.SocketClosed) || strings.EqualFold(cPartitionClose.Event.Reason, stream.MetaDataUpdate) || strings.EqualFold(cPartitionClose.Event.Reason, stream.ZombieConsumer) { r.setStatus(StatusReconnecting) logs.LogWarn("[Reliable] - %s closed unexpectedly %s.. Reconnecting..", r.getInfo(), cPartitionClose.Event.Reason) err, reconnected := retry(1, r, cPartitionClose.Partition) if err != nil { logs.LogInfo("[Reliable] - %s won't be reconnected. Error: %s", r.getInfo(), err) } if reconnected { r.setStatus(StatusOpen) } else { r.setStatus(StatusClosed) } } else { logs.LogInfo("[Reliable] - %s closed normally. Reason: %s", r.getInfo(), cPartitionClose.Event.Reason) r.setStatus(StatusClosed) } r.reconnectionSignal.L.Lock() r.reconnectionSignal.Broadcast() r.reconnectionSignal.L.Unlock() }() }Why This Causes the Bug
When multiple partitions close:
- Multiple events are sent to
channelClosechannel - Only the first event is processed by the goroutine
- The goroutine exits after processing one event
- Remaining events are lost
- Affected partitions are never reconnected
Reproduction steps
Reproduction Scenario
- Create a SuperStream with multiple partitions (e.g., partition-0, partition-1)
- Partitions share the same TCP connection (default behavior with
MaxProducersPerClient > 1) - Trigger a metadata update for one partition
- Connection disruption triggers
Client.Close(), which removes ALL producers on that client - Two events are sent to
chSuperStreamPartitionClose: one for partition-0, one for partition-1 - Only the first event is processed and reconnected
- The second partition is never reconnected
- Subsequent
Send()calls fail withErrProducerNotFounderror
Expected behavior
When multiple partitions close simultaneously, the ReliableSuperStreamProducer should:
- Process all partition close events - The
handleNotifyClosegoroutine should continue running and process every event sent to thechannelClosechannel, not just the first one - Reconnect all affected partitions - Each partition that experiences an unexpected closure (SocketClosed, MetaDataUpdate, or ZombieConsumer) should trigger an independent reconnection attempt
- Maintain partition producer availability - After reconnection completes, all partitions should be available for sending messages
- Log partition-specific information - Each close/reconnection event should identify which partition is affected for debugging purposes
- Resume normal operations - Subsequent
Send()calls should succeed withoutErrProducerNotFounderrors
The system should gracefully handle the scenario where:
- Multiple partitions share the same TCP connection (
MaxProducersPerClient > 1) - A connection failure affects all producers on that connection
- All affected partitions need to be reconnected, not just the first one
Additional context
Environment
- Library version: rabbitmq-stream-go-client v1.6.1
- Affected files:
pkg/ha/ha_super_stream_publisher.go(lines 72-97)
Additional Notes
The same pattern may exist in ReliableSuperStreamConsumer and should be reviewed for similar issues.