温馨提示×

Linux下Golang如何使用消息队列

小樊
36
2025-11-06 15:28:29
栏目: 编程语言

在Linux下,Golang可以使用多种消息队列技术。这里我们将介绍两种常用的消息队列:RabbitMQ和Kafka。

  1. RabbitMQ

RabbitMQ是一个基于AMQP协议的开源消息代理。要在Golang中使用RabbitMQ,你需要安装一个名为streadway/amqp的库。你可以使用以下命令安装:

go get github.com/streadway/amqp 

以下是一个简单的示例,展示了如何在Golang中使用RabbitMQ发送和接收消息:

package main import ( "fmt" "log" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil {	log.Fatalf("%s: %s", msg, err)	} } func main() {	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")	failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close()	ch, err := conn.Channel()	failOnError(err, "Failed to open a channel") defer ch.Close()	q, err := ch.QueueDeclare( "hello", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments	)	failOnError(err, "Failed to declare a queue")	body := "Hello World!"	err = ch.Publish( "", // exchange	q.Name, // routing key false, // mandatory false, // immediate	amqp.Publishing{	ContentType: "text/plain",	Body: []byte(body),	})	failOnError(err, "Failed to publish a message")	fmt.Println(" [x] Sent", body)	msgs, err := ch.Consume(	q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args	)	failOnError(err, "Failed to register a consumer")	forever := make(chan bool) go func() { for d := range msgs {	fmt.Printf("Received a message: %s\n", d.Body)	}	}()	fmt.Println(" [*] Waiting for messages. To exit press CTRL+C")	<-forever } 
  1. Kafka

Kafka是一个分布式流处理平台,通常用于构建实时数据管道和流应用程序。要在Golang中使用Kafka,你需要安装一个名为confluentinc/confluent-kafka-go的库。你可以使用以下命令安装:

go get github.com/confluentinc/confluent-kafka-go/kafka 

以下是一个简单的示例,展示了如何在Golang中使用Kafka发送和接收消息:

package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) func main() {	broker := "localhost:9092"	topic := "test" // Producer configuration	p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})	failOnError(err, "Failed to create producer") // Consumer configuration	c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "group.id": "test-group", "auto.offset.reset": "earliest",	})	failOnError(err, "Failed to create consumer") // Subscribe to the topic	err = c.SubscribeTopics([]string{topic}, nil)	failOnError(err, "Failed to subscribe to topic") // Produce a message	err = p.Produce(&kafka.Message{	TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},	Value: []byte("Hello, Kafka!"),	}, nil)	failOnError(err, "Failed to produce message") // Wait for messages	n := 0 for n < 1 {	msg, err := c.ReadMessage(-1)	failOnError(err, "Failed to read message")	fmt.Printf("Received message: %s\n", string(msg.Value))	n++	} // Close producer and consumer	p.Close()	c.Close() } func failOnError(err error, msg string) { if err != nil { panic(fmt.Sprintf("%s: %s", msg, err))	} } 

这些示例仅用于演示如何在Golang中使用RabbitMQ和Kafka。在实际应用中,你可能需要根据你的需求对这些示例进行修改。

0