golang redis beego appname = redis_queue #项目名称 httpport = 1200 #监听端口 autorender = false #不渲染模板 include runmode.conf #导入环境变量 include config.conf #数据库等配置 include queue_list.conf #队列及消费者配置 [dev] #dev环境下redis配置 redis_host = 127.0.0.1:6379 #redis host redis_password = #redis password [prod] #prod环境下redis配置 redis_host = 127.0.0.1:6379 #redis host redis_password = #redis password ######### 队列配置 TEST_QUEUE = "TEST" #topic名称 TEST_CALLBACK = "http://xxxx.com/callback" #消息转发地址 TEST_REDIS = "test_queue_redis" #存在redis中key名 [consumers] #消费者配置 ######### 消费者配置 TEST = on #topic为TEST的开启消费者 runmode = dev #运行环境 //注册 func (t *ConsumerTask) Register () { TaskList := make(map[string]string) TaskList["consumer"] = "0/2 * * * * *" //每2秒消费一次 for key,value := range TaskList{ toolbox.AddTask(key,t.dispatch(key, value)) } } bee run curl http://127.0.0.1:1200/message/new -X -POST -d "topic=TEST&data1=1&data2=2&data3=3" 输出:{"code": 200, "data": null, "message": "success"} 然后消费者会向 TEST_CALLBACK(http://xxxx.com/callback)发起一条为POST的请求参数为 data1=1&data2=2&data3=3 curl http://127.0.0.1:1200/message/new?topic=TEST&data1=1&data2=2&data3=3 输出:{"code": 200, "data": null, "message": "success"} 然后消费者会向 TEST_CALLBACK(http://xxxx.com/callback)发起一条为GET的请求参数为 data1=1&data2=2&data3=3 func (c *IndexCtl) PushQueue() { params := c.Input() //获取输入参数 service.Log("push_queue", params) //记录日志 var topic string //声明topic if val, ok := params["topic"]; ok { //判断入参是否存在topic topic = strings.ToLower(val[0]) //存在赋值给topic } else { c.ReturnData(499, "请指定topic", nil) //否则抛出错误 } run_topics, _ := beego.AppConfig.GetSection("consumers") //获取所有topic if val, ok := run_topics[topic]; !ok || val != "on" { //判断当前topic是否存在并且为on状态 c.ReturnData(499, "topic不存在", nil) //不存在或者不为on 抛出错误 } delete(params, "topic") //删除topic参数 method := c.Ctx.Request.Method //获取当前请求类型 redis_queue := &structs.RedisQueue{ //结构化 Method: method, Params: params, } is := new(service.IndexService) is.LPushRedis(topic, redis_queue) //lpush到redis的topic列表中 c.ReturnData(200, "success", nil) }func (t *ConsumerTask) consumer() { topics, _ := beego.AppConfig.GetSection("consumers") //拿到所有消费者 for k := range topics { queue_name := beego.AppConfig.String(strings.ToUpper(k) + "_QUEUE") //获取消费者配置信息 queue_redis := beego.AppConfig.String(strings.ToUpper(k) + "_REDIS") //获取消费者配置信息 queue_callback := beego.AppConfig.String(strings.ToUpper(k) + "_CALLBACK") //获取消费者配置信息 t.doConsumer(queue_name, queue_redis, queue_callback) //开始消费 } } func (t *ConsumerTask) doConsumer(queue_name, queue_redis, queue_callback string) { redis_tool := &service.RedisTool{} //拿到redis链接 res, err := redis_tool.LPop(queue_redis) //取出列表中数据 if res == nil || err != nil { return //没有数据或者出错中断当前消费 } fmt.Println(queue_name + ": 消费中") fmt.Println("消息转发至:" + queue_callback) service.Log("do_consumer_log", "queue_name:" + queue_name + " queue_redis:" + queue_redis + " queue_callback:" + queue_callback) var r structs.RedisQueue err = json.Unmarshal(res.([]byte), &r) if err != nil || r.Method == "" { return } t.request(queue_callback, r) //转发请求 } func (t *ConsumerTask) request(uri string, param structs.RedisQueue) { var req *httplib.BeegoHTTPRequest switch strings.ToUpper(param.Method) { //请求方式 case "POST": req = httplib.Post(uri) case "GET": req = httplib.Get(uri) case "PUT": req = httplib.Put(uri) case "DELETE": req = httplib.Delete(uri) case "HEAD": req = httplib.Head(uri) default: return } for k, v := range param.Params { //数据 if len(v) < 1 { continue } req.Param(k, v[0]) } _, err := req.Bytes() //发送请求 if err != nil { service.Log("queue_request_err", "url:" + uri + " error:" + err.Error() + " params:" + fmt.Sprint(param.Params)) fmt.Println(err.Error()) } fmt.Println("消费完成") }