go-worker provides a simple way to manage and execute tasks concurrently and prioritized, leveraging a TaskManager that spawns a pool of workers. Each Task represents a function scheduled by priority.
- Task prioritization: You can register tasks with a priority level influencing the execution order.
- Concurrent execution: Tasks are executed concurrently by a pool of workers.
- Middleware: You can apply middleware to the
TaskManagerto add additional functionality. - Results: You can access the results of the tasks via the
Resultschannel. - Rate limiting: You can rate limit the tasks schedule by setting a maximum number of jobs per second.
- Cancellation: You can cancel Tasks before or while they are running.
flowchart LR Client[Client code] -->|register tasks| TaskManager TaskManager --> Queue[Priority Queue] Queue -->|dispatch| Worker1[Worker] Queue -->|dispatch| WorkerN[Worker] Worker1 --> Results[Results Channel] WorkerN --> Results go-worker exposes its functionality over gRPC through the WorkerService. The service allows clients to register tasks, stream results, cancel running tasks and query their status.
The server registers handlers keyed by name. Each handler consists of a Make function that constructs the expected payload type, and a Fn function that executes the task logic using the unpacked payload.
Clients send a Task message containing a name and a serialized payload using google.protobuf.Any. The server automatically unpacks the Any payload into the correct type based on the registered handler and passes it to the corresponding function.
Here is an example of registering a handler and sending a task with a payload:
// Server-side handler registration tm.RegisterHandler("create_user", worker.Handler{ Make: func() any { return &workerpb.CreateUserPayload{} }, Fn: func(ctx context.Context, payload any) (any, error) { p := payload.(*workerpb.CreateUserPayload) // Handle user creation logic here return &workerpb.CreateUserResponse{UserId: "1234"}, nil }, }) // Client-side task creation with payload payload, err := anypb.New(&workerpb.CreateUserPayload{ Username: "newuser", Email: "newuser@example.com", }) if err != nil { log.Fatal(err) } task := &workerpb.Task{ Name: "create_user", Payload: payload, } _, err = client.RegisterTasks(ctx, &workerpb.RegisterTasksRequest{ Tasks: []*workerpb.Task{task}, })Note on deadlines: When the client uses a stream context with a deadline, exceeding the deadline will terminate the stream but does not cancel the tasks running on the server. To properly handle cancellation, use separate contexts for the task execution or use the close_on_completion flag (if implemented) to close the stream once tasks complete.
tm := worker.NewTaskManagerWithDefaults(context.Background()) srv := worker.NewGRPCServer(tm) gs := grpc.NewServer() workerpb.RegisterWorkerServiceServer(gs, srv) // listen and serve ... client := workerpb.NewWorkerServiceClient(conn) import ( "github.com/google/uuid" ) // register a task with payload payload, err := anypb.New(&workerpb.CreateUserPayload{ Username: "newuser", Email: "newuser@example.com", }) if err != nil { log.Fatal(err) } _, _ = client.RegisterTasks(ctx, &workerpb.RegisterTasksRequest{ Tasks: []*workerpb.Task{ { Name: "create_user", Payload: payload, CorrelationId: uuid.NewString(), IdempotencyKey: "create_user:newuser@example.com", Metadata: map[string]string{"source": "api_example", "role": "admin"}, }, }, }) // cancel by id _, _ = client.CancelTask(ctx, &workerpb.CancelTaskRequest{Id: "<task-id>"}) // get task information res, _ := client.GetTask(ctx, &workerpb.GetTaskRequest{Id: "<task-id>"}) fmt.Println(res.Status)tm := worker.NewTaskManager(2, 10, 5, time.Second, time.Second, 3) defer tm.Close() task := worker.Task{ID: uuid.New(), Priority: 1, Fn: func() (any, error) { return "hello", nil }} tm.RegisterTask(context.Background(), task) for res := range tm.GetResults() { fmt.Println(res) }Create a new TaskManager by calling the NewTaskManager() function with the following parameters:
maxWorkersis the number of workers to start. If 0 is specified, it will default to the number of available CPUsmaxTasksis the maximum number of tasks that can be executed at once, defaults to 10tasksPerSecondis the rate limit of tasks that can be executed per second, defaults to 1timeoutis the default timeout for tasks, defaults to 5 minuteretryDelayis the default delay between retries, defaults to 1 secondmaxRetriesis the default maximum number of retries, defaults to 3
tm := worker.NewTaskManager(4, 10, 5, time.Second*30, time.Second*30, 3)Register new tasks by calling the RegisterTasks() method of the TaskManager struct and passing in a variadic number of tasks.
id := uuid.New() task := worker.Task{ ID: id, Name: "Some task", Description: "Here goes the description of the task", Priority: 10, Fn: func() (any, error) { emptyFile, err := os.Create(path.Join("examples", "test", "res", fmt.Sprintf("1st__EmptyFile___%v.txt", j))) if err != nil { log.Fatal(err) } emptyFile.Close() time.Sleep(time.Second) return fmt.Sprintf("** task number %v with id %s executed", j, id), err }, Retries: 10, RetryDelay: 3, } task2 := worker.Task{ ID: uuid.New(), Priority: 10, Fn: func() (val any, err error){ return "Hello, World!", err }, } tm.RegisterTasks(context.Background(), task, task2)You can stop the task manager and its goroutines by calling the Stop() method of the TaskManager struct.
tm.Stop()The results of the tasks can be accessed via the Results channel of the TaskManager, calling the GetResults() method.
for result := range tm.GetResults() { // Do something with the result }You can cancel a Task by calling the CancelTask() method of the TaskManager struct and passing in the task ID as a parameter.
tm.CancelTask(task.ID)You can cancel all tasks by calling the CancelAllTasks() method of the TaskManager struct.
tm.CancelAllTasks()You can apply middleware to the TaskManager by calling the RegisterMiddleware() function and passing in the TaskManager and the middleware functions.
tm = worker.RegisterMiddleware(tm, //middleware.YourMiddleware, func(next worker.Service) worker.Service { return middleware.NewLoggerMiddleware(next, logger) }, )package main import ( "context" "fmt" "time" "github.com/google/uuid" worker "github.com/hyp3rd/go-worker" "github.com/hyp3rd/go-worker/middleware" ) func main() { tm := worker.NewTaskManager(4, 10, 5, time.Second*3, time.Second*30, 3) defer tm.Close() var srv worker.Service = tm // apply middleware in the same order as you want to execute them srv = worker.RegisterMiddleware(tm, // middleware.YourMiddleware, func(next worker.Service) worker.Service { return middleware.NewLoggerMiddleware(next, middleware.DefaultLogger()) }, ) defer srv.Close() task := worker.Task{ ID: uuid.New(), Priority: 1, Fn: func() (val any, err error) { return func(a int, b int) (val any, err error) { return a + b, err }(2, 5) }, } // Invalid task, it doesn't have a function task1 := worker.Task{ ID: uuid.New(), Priority: 10, // Fn: func() (val any, err error) { return "Hello, World from Task 1!", err }, } task2 := worker.Task{ ID: uuid.New(), Priority: 5, Fn: func() (val any, err error) { time.Sleep(time.Second * 2) return "Hello, World from Task 2!", err }, } task3 := worker.Task{ ID: uuid.New(), Priority: 90, Fn: func() (val any, err error) { // Simulate a long running task // time.Sleep(3 * time.Second) return "Hello, World from Task 3!", err }, } task4 := worker.Task{ ID: uuid.New(), Priority: 150, Fn: func() (val any, err error) { // Simulate a long running task time.Sleep(1 * time.Second) return "Hello, World from Task 4!", err }, } srv.RegisterTasks(context.Background(), task, task1, task2, task3) srv.CancelTask(task3.ID) srv.RegisterTask(context.Background(), task4) // Print results for result := range srv.GetResults() { fmt.Println(result) } tasks := srv.GetTasks() for _, task := range tasks { fmt.Println(task) } }This project follows Semantic Versioning. Release notes are available in CHANGELOG.md.
We welcome contributions! Fork the repository, create a feature branch, run the linters and tests, then open a pull request.
To propose new ideas, open an issue using the Feature request template.
Issues labeled good first issue or help wanted are ideal starting points for new contributors.
See CHANGELOG.md for the history of released versions.
The worker package provides an efficient way to manage and execute tasks concurrently and with prioritization. The package is highly configurable and can be used in various scenarios.
This project is licensed under the MIT License - see the LICENSE file for details.