温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

go语言的任务队列machinery怎么用

发布时间:2022-04-14 16:03:35 来源:亿速云 阅读:484 作者:iii 栏目:编程语言

这篇“go语言的任务队列machinery怎么用”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“go语言的任务队列machinery怎么用”文章吧。

使用概述

步骤1: 创建server,配置参数、注册task。(此处server只是个配置作用, 并不是单独的server进程)

步骤2: 启动worker

步骤3: 发送task

与celery的用法是完全一致的

创建server

func startServer() (*machinery.Server, error) {	cnf := &config.Config{	Broker:          "amqp://guest:guest@localhost:5672/",	DefaultQueue:    "machinery_tasks",	ResultBackend:   "amqp://guest:guest@localhost:5672/",	ResultsExpireIn: 3600,  //任务有效期	AMQP: &config.AMQPConfig{	Exchange:      "machinery_exchange",	ExchangeType:  "direct",	BindingKey:    "machinery_task",	PrefetchCount: 3,   //限定消费能力	},	}	// Create server instance	broker := amqpbroker.New(cnf)	backend := amqpbackend.New(cnf)	lock := eagerlock.New()     //任务锁	server := machinery.NewServer(cnf, broker, backend, lock)	// Register tasks	tasks := map[string]interface{}{	"add":               exampletasks.Add,	"multiply":          exampletasks.Multiply,	"sum_ints":          exampletasks.SumInts,	"sum_floats":        exampletasks.SumFloats,	"concat":            exampletasks.Concat,	"split":             exampletasks.Split,	"panic_task":        exampletasks.PanicTask,	"long_running_task": exampletasks.LongRunningTask,	}	return server, server.RegisterTasks(tasks) }

创建worker

创建worker, 之后就可以启动了

func worker() error {     //消费者的标记	consumerTag := "machinery_worker"	server, err := startServer()	if err != nil {	return err	}     //第二个参数并发数, 0表示不限制	worker := server.NewWorker(consumerTag, 0)     //钩子函数	errorhandler := func(err error) {}	pretaskhandler := func(signature *tasks.Signature) {}	posttaskhandler := func(signature *tasks.Signature) {}	worker.SetPostTaskHandler(posttaskhandler)	worker.SetErrorHandler(errorhandler)	worker.SetPreTaskHandler(pretaskhandler)	return worker.Launch() }

启动结果

INFO: 2021/05/01 08:28:27 worker.go:58 Launching a worker with the following settings: INFO: 2021/05/01 08:28:27 worker.go:59 - Broker: amqp://192.168.120.101:5672 INFO: 2021/05/01 08:28:27 worker.go:61 - DefaultQueue: machinery_tasks INFO: 2021/05/01 08:28:27 worker.go:65 - ResultBackend: amqp://192.168.120.101:5672 INFO: 2021/05/01 08:28:27 worker.go:67 - AMQP: machinery_exchange INFO: 2021/05/01 08:28:27 worker.go:68   - Exchange: machinery_exchange INFO: 2021/05/01 08:28:27 worker.go:69   - ExchangeType: direct INFO: 2021/05/01 08:28:27 worker.go:70   - BindingKey: machinery_task INFO: 2021/05/01 08:28:27 worker.go:71   - PrefetchCount: 0 INFO: 2021/05/01 08:28:27 amqp.go:96 [*] Waiting for messages. To exit press CTRL+C

发送任务

server, _ := startServer() signature := &tasks.Signature{     Name: "add",     Args: []tasks.Arg{         {             Type:  "int64",             Value: 1,         },         {             Type:  "int64",             Value: 1,         },     }, } asyncResult, _ := server.SendTask(signature) fmt.Println(asyncResult.Get(time.Millisecond * 5))  //等待间隔,理论上是越小越好 //asyncResult.GetWithTimeout(time.Second*120, time.Millisecond * 5)   //第一个参数才是timeout

以上就是关于“go语言的任务队列machinery怎么用”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注亿速云行业资讯频道。

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI