You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I would like to know this since once I start writing messages to disk once they have 'timed out' on the producer channel, which usually happens when the kafka server is down or unreachable, to check when I can re-queue these messages into the producer.
Basically, a way to periodically check to see if I can connect to the kafka server with my current ConfigMap.
I tried using the AdminClient to create a topic as a check of health, but this seems to exit the program after one failure or just hang up the calling go routine. A quick, ping test would be useful, using your current ConfigMap values or perhaps there is a better way that already exists?
Thanks for any help.
Scott
UPDATE This is what I did... I got an AdminClient, fetched the Broker metadata and listed the brokerIDs. If no metadata or brokerIDs returned, then the server was still down.
func getAdminClient() (*kafka.AdminClient, error) { var funcName = "getAdminClient()" if adminClient != nil { logger.Debugf("%v::AdminClient already present", funcName) return adminClient, nil } if adminConfigMap == nil { adminConfigMap = getAdminConfigMap() // Get the kafka.ConfigMap } logger.Debugf("%v::Getting AdminClient", funcName) ac, err := kafka.NewAdminClient(adminConfigMap) if err == nil { logger.Debugf("%v::Got AdminClient", funcName) adminClient = ac } else { logger.Debugf("%v::unable to get AdminClient from the server. It must still be down. Error: %v", funcName, err) } return adminClient, err } func getBrokerList(topic *string) (brokers []int32, err error) { var funcName = "getBrokerList()" ac, err := getAdminClient() if err == nil { // unc (a *AdminClient) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error) { logger.Tracef("%v::Getting metadata", funcName) md, err := ac.GetMetadata(topic, true, 5000) // 5 seconds if err != nil { //ac.Close() // only close if you aren't going to reuse the AdminClient for the duration of your program return nil, err } else { logger.Tracef("%v::Got metadata: %v", funcName, md) } brokers = make([]int32, len(md.Brokers)) for i, mdBroker := range md.Brokers { brokers[i] = mdBroker.ID } } else { //ac.Close() // only close if you aren't going to reuse the AdminClient for the duration of your program return nil, err } //ac.Close(). // only close if you aren't going to reuse the AdminClient for the duration of your program return brokers, nil } func main() { topicName := "myTopic" brokerIDs, err := getBrokerList(&topicName) isServerHealthy := false if err != nil { logger.Debugf("%v::unable to get metadata or brokerID list. The kafka server must still be down. Error: %v", funcName, err) } else { isServerHealthy = true logger.Debugf("%v::Broker IDs: %v", funcName, brokerIDs) } }
my ConfigMap properties were all of the connection properties in my ConfigMap that I use to publish events, with a couple additional properties that are:
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
I would like to know this since once I start writing messages to disk once they have 'timed out' on the producer channel, which usually happens when the kafka server is down or unreachable, to check when I can re-queue these messages into the producer.
Basically, a way to periodically check to see if I can connect to the kafka server with my current ConfigMap.
I tried using the AdminClient to create a topic as a check of health, but this seems to exit the program after one failure or just hang up the calling go routine.
A quick, ping test would be useful, using your current ConfigMap values or perhaps there is a better way that already exists?
Thanks for any help.
UPDATE
This is what I did... I got an AdminClient, fetched the Broker metadata and listed the brokerIDs. If no metadata or brokerIDs returned, then the server was still down.
my ConfigMap properties were all of the connection properties in my ConfigMap that I use to publish events, with a couple additional properties that are:
Note:
adminConfigMapis my kafka.ConfigMap object I use for doing AdminClient stuff.Beta Was this translation helpful? Give feedback.
All reactions