Skip to content

Bug Report: ReliableSuperStreamProducer loses partition producers during reconnection #453

@l3lcss

Description

@l3lcss

Describe the bug

Summary

When multiple partitions of a SuperStream close simultaneously (either from shared connection failure or concurrent connection losses), the ReliableSuperStreamProducer fails to reconnect all affected partitions, leading to "Producer not found in the SuperStream Producer" errors.

Root Cause

The bug is in pkg/ha/ha_super_stream_publisher.go:72-97 in the handleNotifyClose function. The goroutine only reads one event from the channelClose channel and then exits, instead of looping to handle all partition close events.

Buggy Code (current)

func (r *ReliableSuperStreamProducer) handleNotifyClose(channelClose chan stream.PPartitionClose) { go func() { cPartitionClose := <-channelClose // ← Only reads ONE event, then goroutine exits if strings.EqualFold(cPartitionClose.Event.Reason, stream.SocketClosed) || strings.EqualFold(cPartitionClose.Event.Reason, stream.MetaDataUpdate) || strings.EqualFold(cPartitionClose.Event.Reason, stream.ZombieConsumer) { r.setStatus(StatusReconnecting) logs.LogWarn("[Reliable] - %s closed unexpectedly %s.. Reconnecting..", r.getInfo(), cPartitionClose.Event.Reason) err, reconnected := retry(1, r, cPartitionClose.Partition) if err != nil { logs.LogInfo("[Reliable] - %s won't be reconnected. Error: %s", r.getInfo(), err) } if reconnected { r.setStatus(StatusOpen) } else { r.setStatus(StatusClosed) } } else { logs.LogInfo("[Reliable] - %s closed normally. Reason: %s", r.getInfo(), cPartitionClose.Event.Reason) r.setStatus(StatusClosed) } r.reconnectionSignal.L.Lock() r.reconnectionSignal.Broadcast() r.reconnectionSignal.L.Unlock() }() }

Why This Causes the Bug

When multiple partitions close:

  1. Multiple events are sent to channelClose channel
  2. Only the first event is processed by the goroutine
  3. The goroutine exits after processing one event
  4. Remaining events are lost
  5. Affected partitions are never reconnected

Reproduction steps

Reproduction Scenario

  1. Create a SuperStream with multiple partitions (e.g., partition-0, partition-1)
  2. Partitions share the same TCP connection (default behavior with MaxProducersPerClient > 1)
  3. Trigger a metadata update for one partition
  4. Connection disruption triggers Client.Close(), which removes ALL producers on that client
  5. Two events are sent to chSuperStreamPartitionClose: one for partition-0, one for partition-1
  6. Only the first event is processed and reconnected
  7. The second partition is never reconnected
  8. Subsequent Send() calls fail with ErrProducerNotFound error

Expected behavior

When multiple partitions close simultaneously, the ReliableSuperStreamProducer should:

  1. Process all partition close events - The handleNotifyClose goroutine should continue running and process every event sent to the channelClose channel, not just the first one
  2. Reconnect all affected partitions - Each partition that experiences an unexpected closure (SocketClosed, MetaDataUpdate, or ZombieConsumer) should trigger an independent reconnection attempt
  3. Maintain partition producer availability - After reconnection completes, all partitions should be available for sending messages
  4. Log partition-specific information - Each close/reconnection event should identify which partition is affected for debugging purposes
  5. Resume normal operations - Subsequent Send() calls should succeed without ErrProducerNotFound errors

The system should gracefully handle the scenario where:

  • Multiple partitions share the same TCP connection (MaxProducersPerClient > 1)
  • A connection failure affects all producers on that connection
  • All affected partitions need to be reconnected, not just the first one

Additional context

Environment

  • Library version: rabbitmq-stream-go-client v1.6.1
  • Affected files:
    • pkg/ha/ha_super_stream_publisher.go (lines 72-97)

Additional Notes

The same pattern may exist in ReliableSuperStreamConsumer and should be reviewed for similar issues.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions