- Notifications
You must be signed in to change notification settings - Fork 31
Open
Labels
enhancementNew feature or requestNew feature or request
Description
Is your feature request related to a problem? Please describe.
Hello!
There is a question about hearbeats, for example the next code would raise to logs next messages
Code:
package main import ( "context" "fmt" "os" "os/signal" "strconv" "time" "github.com/google/uuid" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" ) func CheckErr(err error) { if err != nil { fmt.Printf("%s ", err) os.Exit(1) } } func main() { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) defer stop() env, err := stream.NewEnvironment( stream.NewEnvironmentOptions(). SetHost("localhost"). SetPort(5552). SetUser("guest"). SetPassword("guest").SetRequestedHeartbeat(time.Second * 1)) CheckErr(err) streamName := uuid.New().String() err = env.DeclareStream(streamName, &stream.StreamOptions{ MaxLengthBytes: stream.ByteCapacity{}.MB(500), }, ) CheckErr(err) consName := "consumer_" producer, err := env.NewProducer(streamName, nil) CheckErr(err) go func() { for i := 0; i < 1000; i++ { err := producer.Send(amqp.NewMessage([]byte("hello_world_" + strconv.Itoa(i)))) CheckErr(err) time.Sleep(5 * time.Second) } }() handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) { err := consumerContext.Consumer.StoreCustomOffset(consumerContext.Consumer.GetOffset()) if err != nil { CheckErr(err) } fmt.Println(string(message.GetData())) } cons, err := env.NewConsumer(streamName, handleMessages, stream. NewConsumerOptions(). SetManualCommit(). SetOffset(stream.OffsetSpecification{}.LastConsumed()).SetConsumerName(consName), ) CheckErr(err) defer cons.Close() <-ctx.Done() }go run main.go hello_world_0 hello_world_1 hello_world_2 hello_world_3 2024/06/24 18:24:35 [warn] - Missing heart beat: 1 2024/06/24 18:24:35 [warn] - Missing heart beat: 1 2024/06/24 18:24:35 [warn] - Missing heart beat: 1 2024/06/24 18:24:35 [warn] - Missing heart beat: 1 2024/06/24 18:24:35 [warn] - Missing heart beat: 1 hello_world_4 hello_world_5 hello_world_6 hello_world_7 2024/06/24 18:24:55 [warn] - Missing heart beat: 2 2024/06/24 18:24:55 [warn] - Too many heartbeat missing: 2 2024/06/24 18:24:55 [warn] - Missing heart beat: 2 2024/06/24 18:24:55 [warn] - Too many heartbeat missing: 2 2024/06/24 18:24:55 [error] - Producer BatchSend error during flush: write tcp 127.0.0.1:37404->127.0.0.1:5552: use of closed network connection producer id: 0 closed exit status 1 Im a bit confused, because the demo rabbit is alive and connection seems too - messages still consuming.
Is it a correct behaviour?
Seems the heartBeat uses a hardcoded heartbeat intervals and the heartbeat condition fails
Describe the solution you'd like
Seems hearbeat should be called every interval specified by user, but I may be wrong
Describe alternatives you've considered
No response
Additional context
No response
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request