A quick introduction to how Apache Kafka works and differs from other messaging systems using an example application. In this video I explain partitioning, consumer offsets, replication and many other concepts found in Kafka.
Example
package main import ( "encoding/json" "fmt" k "github.com/segmentio/kafka-go" "github.com/sohamkamani/golang-kafka-example/kafka" ) func main() { k := kafka.New([]string{"localhost:9092"}, "test", kafka.WithPartitions(3), kafka.WithReplications(1), kafka.WithBalancer(&k.Hash{}), ) //producers k.Producer(produce("producer_1")) fmt.Println("--------------------------------------------") //consumers go k.Consumer("mobile", consume("mobile_consumer_1")) go k.Consumer("mobile", consume("mobile_consumer_2")) go k.Consumer("mobile", consume("mobile_consumer_3")) go k.Consumer("computer", consume("computer_consumer_1")) go k.Consumer("computer", consume("computer_consumer_2")) go k.Consumer("computer", consume("computer_consumer_3")) exit() } type Event struct { Event string Key string Team1 string Team2 string } func produce(id string) func(producer *kafka.Producer) { return func(producer *kafka.Producer) { events := []Event{{ Event: "Querter Start", Key: "America_Canada", Team1: "America", Team2: "Canada", }, { Event: "Foul", Key: "Malta_Portugal", Team1: "Malta", Team2: "Portugal", }, { Event: "Score 39-46", Key: "America_Canada", Team1: "America", Team2: "Canada", }, { Event: "Free Throw", Key: "Brazil_Australia", Team1: "Brazil", Team2: "Australia", }, { Event: "Score 41-46", Key: "America_Canada", Team1: "America", Team2: "Canada", }, { Event: "Querter End", Key: "Brazil_Australia", Team1: "Brazil", Team2: "Australia", }} for _, event := range events { value, _ := json.Marshal(event) err := producer.Produce(event.Key, value) if err != nil { fmt.Println("could not write message " + err.Error()) } fmt.Printf("%s -> %s %s : %s\n", id, event.Team1, event.Team2, event.Event) } } } func consume(id string) func(consumer *kafka.Consumer) { return func(consumer *kafka.Consumer) { for { msg, err := consumer.ReadMessage() if err != nil { fmt.Println("could not read message " + err.Error()) } var event Event json.Unmarshal(msg.Value, &event) fmt.Printf("%s -> %s %s : %s\n", id, event.Team1, event.Team2, event.Event) } } } func exit() { exit := make(chan int) <-exit }Output
producer_1 -> America Canada : Querter Start producer_1 -> Malta Portugal : Foul producer_1 -> America Canada : Score 39-46 producer_1 -> Brazil Australia : Free Throw producer_1 -> America Canada : Score 41-46 producer_1 -> Brazil Australia : Querter End -------------------------------------------- mobile_consumer_2 -> America Canada : Querter Start mobile_consumer_3 -> Malta Portugal : Foul mobile_consumer_1 -> America Canada : Score 39-46 mobile_consumer_2 -> Brazil Australia : Free Throw mobile_consumer_1 -> Brazil Australia : Querter End mobile_consumer_3 -> America Canada : Score 41-46 computer_consumer_2 -> America Canada : Querter Start computer_consumer_2 -> Brazil Australia : Free Throw computer_consumer_3 -> America Canada : Score 39-46 computer_consumer_1 -> Malta Portugal : Foul computer_consumer_3 -> Brazil Australia : Querter End computer_consumer_1 -> America Canada : Score 41-46 