- Notifications
You must be signed in to change notification settings - Fork 4k
Closed
Description
Describe the bug
Stream does not support both a plain SAC consumer and a super stream consumer simultaneously.
The rabbit_stream_sac_coordinator crashes with this stack:
�[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> handle_leader err {'EXIT',�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> {function_clause,�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> [{rabbit_stream_sac_coordinator,evaluate_active_consumer,�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> [{group,�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> [{consumer,<0.6910.0>,0,�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> <<"192.168.65.1:31201 -> 172.17.0.2:5552">>,true},�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> {consumer,<0.6910.0>,1,�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> <<"192.168.65.1:31201 -> 172.17.0.2:5552">>,false}],�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> -1}],�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> [{file,"rabbit_stream_sac_coordinator.erl"},{line,721}]},�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> {rabbit_stream_sac_coordinator,do_register_consumer,8,�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> [{file,"rabbit_stream_sac_coordinator.erl"},{line,545}]},�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> {rabbit_stream_coordinator,apply,3,�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> [{file,"rabbit_stream_coordinator.erl"},{line,565}]},�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> {ra_server,apply_with,2,�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> [{file,"src/ra_server.erl"},{line,2609}]},�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> {ra_log,fold,5,[{file,"src/ra_log.erl"},{line,371}]},�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> {ra_server,apply_to,5,�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> [{file,"src/ra_server.erl"},{line,2549}]},�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> {ra_server,handle_leader,2,�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> [{file,"src/ra_server.erl"},{line,606}]},�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> {ra_server_proc,handle_leader,2,�[0m �[38;5;160m2025-04-30 09:15:15.127324+00:00 [error] <0.2746.0> [{file,"src/ra_server_proc.erl"},{line,1127}]}]}}�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> ** State machine rabbit_stream_coordinator terminating�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> ** Last event = {info,{ra_log_event,{written,{341,341,1}}}}�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> ** When server state = [{id,{rabbit_stream_coordinator,rabbit@02688c1643de}},�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> {opt,terminate},�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> {raft_state,leader},�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> {leader_last_seen,undefined},�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> {num_pending_commands,0},�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> {num_low_priority_commands,0},�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> {num_pending_applied_notifications,0},�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> {election_timeout_set,false},�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> {ra_server_state,�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> #{id =>�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> {rabbit_stream_coordinator,rabbit@02688c1643de},�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> machine =>�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> #{num_monitors => 3,�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> single_active_consumer =>�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> #{groups =>�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> #{{<<"/">>,<<"super-stream-issue-easy-0">>,�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> <<"my-group">>} =>�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> #{num_consumers => 1,�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> partition_index => -1}},�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> num_groups => 1},�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> num_streams => 2,�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> streams =>�[0m �[38;5;160m2025-04-30 09:15:15.129684+00:00 [error] <0.2746.0> #{"__super-stream-issue-0_1746003194949165113" =>�[0m Reproduction steps
Per conversation with @kjnilsson
using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.Stream.Client; using RabbitMQ.Stream.Client.Reliable; namespace Start; public class SuperStreamIssueEasy { public static async Task Start() { Console.WriteLine("Super Stream Issue"); const string streamSystemName = "super-stream-issue-easy"; var streamSystem = await StreamSystem.Create(new StreamSystemConfig() { ConnectionPoolConfig = new ConnectionPoolConfig() { ConsumersPerConnection = 200 } }); var factory = new ConnectionFactory(); var connection = factory.CreateConnection(); var channel = connection.CreateModel(); channel.ExchangeDeclare(streamSystemName, "direct", true, false, null); var channelQ = connection.CreateModel(); channelQ.QueueDeclare($"{streamSystemName}-0", true, false, false, new Dictionary<string, object>() { { "x-queue-type", "stream" }, }); channelQ.QueueBind($"{streamSystemName}-0", streamSystemName, "0"); var config = new ConsumerConfig(streamSystem, $"{streamSystemName}-0") { OffsetSpec = new OffsetTypeFirst(), IsSuperStream = false, IsSingleActiveConsumer = true, Reference = "my-group", ConsumerUpdateListener = (s, s1, arg3) => { return Task.FromResult<IOffsetType>(new OffsetTypeNext()); }, MessageHandler = async (stream, consumerSource, context, message) => { Console.WriteLine( $"body: {Encoding.UTF8.GetString(message.Data.Contents)}"); await Task.CompletedTask; } }; var consumer = await Consumer.Create(config); var configSuper = new ConsumerConfig(streamSystem, streamSystemName) { OffsetSpec = new OffsetTypeFirst(), IsSuperStream = true, IsSingleActiveConsumer = true, Reference = "my-group", ConsumerUpdateListener = (s, s1, arg3) => { return Task.FromResult<IOffsetType>(new OffsetTypeNext()); }, MessageHandler = async (stream, consumerSource, context, message) => { Console.WriteLine( $"body: {Encoding.UTF8.GetString(message.Data.Contents)}"); await Task.CompletedTask; } }; var consumerSuper = await Consumer.Create(configSuper); } }Expected behavior
Stream should support both a plain SAC consumer and a stream consumer simultaneously.
Additional context
Same issue with RabbitMQ 3.13.x and 4.1.x