温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

利用golang怎么对kafka进行连接

发布时间:2020-12-19 14:15:27 来源:亿速云 阅读:815 作者:Leah 栏目:开发技术

本篇文章为大家展示了利用golang怎么对kafka进行连接,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

1.首先初始化conf配置把kafka和ES的地址配置好还有一个日志方便查看

配置信息如下 用到的库是

github.com/astaxie/beego/config [logs] log_level = debug log_path = "./logs/log_transfer.log" [kafka] server_addr = 192.168.0.134:9092 topic = nginx_log [ES] addr = http://192.168.0.134:9200/

2.读取conf配置存取进结构体

type LogConfig struct {   kafkaAddr string   ESAddr string   LogPath string   LogLevel string   Topic string } var (    logConfig *LogConfig )

3.读取conf配置代码如下

func initConfig(conftype string,filename string)(err error) {    conf, err := config. NewConfig(conftype,filename)    if err != nil {     fmt. Println( "new config faild,err:",err)      return   }    logConfig = &LogConfig{}    logConfig.LogLevel = conf. String( "logs::log_level")    if len(logConfig.LogLevel) == 0 {      logConfig.LogLevel = "debug"   }    logConfig.LogPath = conf. String( "logs::log_path")    if len(logConfig.LogPath) == 0 {      logConfig.LogPath = "./logs"   }    logConfig.kafkaAddr = conf. String( "kafka::server_addr")    if len(logConfig.kafkaAddr) == 0 {      err = fmt. Errorf( "invalid kafka addr err")      return   }    logConfig.ESAddr = conf. String( "ES::addr")    if len(logConfig.ESAddr) == 0 {      err = fmt. Errorf( "invalid ES addr err")      return   }    logConfig.Topic = conf. String( "kafka::topic")    if len(logConfig.Topic) == 0 {      err = fmt. Errorf( "invalid topic addr err")      return   }    return }

4.完成了initConfig的初始化

5.初始化initLogger

func convertLogLevel(level string) int {    switch(level) {      case "debug":        return logs.LevelDebug      case "warn":        return logs.LevelWarn      case "info":        return logs.LevelInfo      case "trace":        return logs.LevelTrace   }    return logs.LevelDebug } func initLogger(logpath string, logLevel string) (err error) {    config := make( map[ string] interface{})   config[ "filename"] = logpath   config[ "level"] = convertLogLevel(logLevel)    configStr, err := json. Marshal(config)    if err!= nil {     fmt. Println( "marshal failed,err:",err)      return   }   logs. SetLogger(logs.AdapterFile, string(configStr))    return }

6.初始化kafka

type KafkaClient struct {   client sarama.Consumer   addr string   topic string   wg sync.WaitGroup } var (    kafkaClient *KafkaClient ) func initKafKa(addr string,topic string)(err error) {    kafkaClient = &KafkaClient{}    consumer, err := sarama. NewConsumer(strings. Split(addr, ","), nil)    if err != nil {     logs. Error( "Failed to strat consumer :",err)      return    }    kafkaClient.client = consumer    kafkaClient.addr = addr    kafkaClient.topic = topic    return }

7.初始化ES

gopkg.in/olivere/elastic.v2 // 这个是操作ES的库

type LogMessage struct {   App string   Topic string   Message string } var (    esClient *elastic.Client ) func initES(addr string)(err error) {       client, err := elastic. NewClient(elastic. SetSniff( false),elastic. SetURL(addr))    if err != nil {     fmt. Println( "connect es error",err)      return   }    esClient = client    return }

8.干活把kafka的数据写入ES

github.com/Shopify/sarama 这个是操作kafka的驱动库

func run()(err error) {   fmt. Println( "run")    partitionList, err := kafkaClient.client. Partitions(kafkaClient.topic)    if err != nil {     logs. Error( "ini failed ,err:%v",err)     fmt. Printf( "ini failed ,err:%v",err)      return   }    for partition := range partitionList {     fmt. Println( "for进入")      pc, errRet := kafkaClient.client. ConsumePartition(kafkaClient.topic, int32(partition),sarama.OffsetNewest)      if errRet != nil {        err = errRet       logs. Error( "Failed to start consumer for partition %d: %s \n ",partition,err)       fmt. Printf( "Failed to start consumer for partition %d: %s \n ",partition,err)        return     }      defer pc. AsyncClose()     fmt. Println( "马上进入协程")     kafkaClient.wg. Add( 1)      go func(pc sarama.PartitionConsumer){       fmt. Println( "进来了")               for msg := range pc. Messages() {         fmt. Println( "func执行")         logs. Debug( "Partition:%d,Offset:%d,key:%s,value:%s",msg.Partition,msg.Offset, string(msg.Key), string(msg.Value))          //fmt.Println()          err = sendToES(kafkaClient.topic,msg.Value)          if err != nil {           logs. Warn( "send to es failed,err:%v",err)         }       }       kafkaClient.wg. Done()     }(pc)       }   kafkaClient.wg. Wait()   fmt. Println( "协程执行完毕")    return }

上面代码是读kafka消费数据通过sendToES这个函数发送至ES里面

sendToES代码如下

func sendToES(topic string,data [] byte) (err error) {    msg := &LogMessage{}    msg.Topic = topic    msg.Message = string(data)    _, err = esClient. Index().        Index(topic).        Type(topic).        BodyJson(msg).        Do()      if err != nil {               return     }    return }

Index就是索引名称

index().type().bodyjson().do()这样的写法是链式执行操作

9.写完了基本操作后 再写一个模拟写入数据进kafka的数据 代码如下

func main() { config := sarama. NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Partitioner = sarama.NewRandomPartitioner config.Producer.Return.Successes = true client, err := sarama. NewSyncProducer([] string{ "127.0.0.1:9092"}, config) if err != nil { fmt. Println( "producer close,err:", err) return } defer client. Close() var n int= 0 for { n++ msg := &sarama.ProducerMessage{} msg.Topic = "nginx_log" msg.Value = sarama. StringEncoder( "this is a good test,hello maomaoChong!!," + strconv. Itoa(n)) pid, offset, err := client. SendMessage(msg) if err != nil { fmt. Println( "send message failed,", err) return } fmt. Printf( "pid:%v offset:%v \n ", pid, offset) time. Sleep(time.Second * 2) } }

这个就是生产者往kafka里面写入数据进去消费

10.我们启动我们的kafka 注意kafka依赖于zookeeper 先启动ZK然后启动kafka

我这里用的是zookeeper-3.4.12网上有下载

启动ZK

利用golang怎么对kafka进行连接

ZK已经成功启动

11.启动kafka 我这里是kafka_2.11-1.1.0

.\bin\windows\kafka-server-start.bat .\config\server.properties

利用golang怎么对kafka进行连接

kafka已经跑起来了

12.把kafka消费测试端也启动

.\bin\windows\kafka-console-consumer.bat --topic nginx_log --zookeeper 127.0.0.1 2181

利用golang怎么对kafka进行连接

消费端启动成功 一直等待数据进来消费

13.然后我们把ES 和Kib 都启动了

利用golang怎么对kafka进行连接

这是我们的ES版本是5.5.1的 已经跑起来了 接着启动我们的kib

kib里面有个配置config下面的叫kibana.yml里面设置好ES的地址和端口就处于监听ES状态

启动kib有点慢 稍微等一下就好

利用golang怎么对kafka进行连接

此时启动好了kib

14.测试kib是否启动

默认地址是http://localhost:5601

利用golang怎么对kafka进行连接

进入成功 确定没问题

15.编译我们的代码 写数据进kafka

利用golang怎么对kafka进行连接

从上面看我们知道一个再写 一个再消费

16.编译运行我们把kafka写入进ES里面的代码

利用golang怎么对kafka进行连接

运行了 这里就把kafka消费的数据 写入进ES里面

17.我们看一下数据是否有 进入kib

利用golang怎么对kafka进行连接

上述内容就是利用golang怎么对kafka进行连接,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI