Skip to content

Heartbeats misleading behaviour  #325

@HustonMmmavr

Description

@HustonMmmavr

Is your feature request related to a problem? Please describe.

Hello!

There is a question about hearbeats, for example the next code would raise to logs next messages

Code:

package main import ( "context" "fmt" "os" "os/signal" "strconv" "time" "github.com/google/uuid" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" ) func CheckErr(err error) { if err != nil { fmt.Printf("%s ", err) os.Exit(1)	} } func main() { ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) defer stop() env, err := stream.NewEnvironment( stream.NewEnvironmentOptions(). SetHost("localhost"). SetPort(5552). SetUser("guest"). SetPassword("guest").SetRequestedHeartbeat(time.Second * 1)) CheckErr(err) streamName := uuid.New().String() err = env.DeclareStream(streamName, &stream.StreamOptions{ MaxLengthBytes: stream.ByteCapacity{}.MB(500),	},	) CheckErr(err) consName := "consumer_" producer, err := env.NewProducer(streamName, nil) CheckErr(err) go func() { for i := 0; i < 1000; i++ { err := producer.Send(amqp.NewMessage([]byte("hello_world_" + strconv.Itoa(i)))) CheckErr(err) time.Sleep(5 * time.Second)	}	}() handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) { err := consumerContext.Consumer.StoreCustomOffset(consumerContext.Consumer.GetOffset()) if err != nil { CheckErr(err)	} fmt.Println(string(message.GetData()))	} cons, err := env.NewConsumer(streamName, handleMessages, stream. NewConsumerOptions(). SetManualCommit(). SetOffset(stream.OffsetSpecification{}.LastConsumed()).SetConsumerName(consName),	) CheckErr(err) defer cons.Close() <-ctx.Done() }
go run main.go hello_world_0 hello_world_1 hello_world_2 hello_world_3 2024/06/24 18:24:35 [warn] - Missing heart beat: 1 2024/06/24 18:24:35 [warn] - Missing heart beat: 1 2024/06/24 18:24:35 [warn] - Missing heart beat: 1 2024/06/24 18:24:35 [warn] - Missing heart beat: 1 2024/06/24 18:24:35 [warn] - Missing heart beat: 1 hello_world_4 hello_world_5 hello_world_6 hello_world_7 2024/06/24 18:24:55 [warn] - Missing heart beat: 2 2024/06/24 18:24:55 [warn] - Too many heartbeat missing: 2 2024/06/24 18:24:55 [warn] - Missing heart beat: 2 2024/06/24 18:24:55 [warn] - Too many heartbeat missing: 2 2024/06/24 18:24:55 [error] - Producer BatchSend error during flush: write tcp 127.0.0.1:37404->127.0.0.1:5552: use of closed network connection producer id: 0 closed exit status 1 

Im a bit confused, because the demo rabbit is alive and connection seems too - messages still consuming.
Is it a correct behaviour?

Seems the heartBeat uses a hardcoded heartbeat intervals and the heartbeat condition fails

Describe the solution you'd like

Seems hearbeat should be called every interval specified by user, but I may be wrong

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions