Skip to content

GoQueue - One library to rule them all. A Golang wrapper that handles all the complexity of various Queue platforms. Extensible and easy to learn.

License

Notifications You must be signed in to change notification settings

bxcodec/goqueue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

ย 

History

28 Commits
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 

Repository files navigation

๐Ÿš€ GoQueue - Universal Go Message Queue Library

Go Reference Go Report Card License: MIT GitHub stars

One library to rule them all - A powerful, extensible, and developer-friendly Go wrapper that simplifies message queue operations across multiple platforms. Build robust, scalable applications with consistent queue operations, regardless of your underlying message broker.

โœจ Why GoQueue?

Core Concept

๐ŸŽฏ Universal Interface - Write once, run anywhere. Switch between queue providers without changing your code
โšก Production Ready - Built-in retry mechanisms, dead letter queues, and error handling
๐Ÿ›ก๏ธ Type Safe - Strongly typed interfaces with comprehensive error handling
๐Ÿ”ง Extensible - Plugin architecture for custom middleware and queue providers
๐Ÿ“Š Observable - Built-in logging and middleware support for monitoring
๐Ÿš€ Developer Experience - Intuitive API design with sensible defaults


๐Ÿ“‹ Table of Contents


๐Ÿš€ Quick Start

Get up and running in less than 5 minutes:

go get -u github.com/bxcodec/goqueue
package main import ( "context" "log" "github.com/bxcodec/goqueue" "github.com/bxcodec/goqueue/consumer" "github.com/bxcodec/goqueue/publisher" "github.com/bxcodec/goqueue/interfaces" ) func main() { // Create queue service queueSvc := goqueue.NewQueueService( options.WithConsumer(myConsumer), options.WithPublisher(myPublisher), options.WithMessageHandler(handleMessage), ) // Publish a message queueSvc.Publish(context.Background(), interfaces.Message{ Data: map[string]interface{}{"hello": "world"}, Action: "user.created", Topic: "users", }) // Start consuming queueSvc.Start(context.Background()) } func handleMessage(ctx context.Context, m interfaces.InboundMessage) error { log.Printf("Received: %v", m.Data) return m.Ack(ctx) // Acknowledge successful processing }

๐Ÿ’ซ Features

๐ŸŽฏ Core Features

  • Multi-Provider Support: Currently supports RabbitMQ (more coming soon!)
  • Unified API: Consistent interface across all queue providers
  • Type Safety: Strongly typed message structures
  • Context Support: Full Go context integration for cancellation and timeouts

๐Ÿ›ก๏ธ Reliability & Resilience

  • Automatic Retries: Configurable retry mechanisms with exponential backoff
  • Dead Letter Queues: Handle failed messages gracefully
  • Circuit Breaker: Built-in protection against cascading failures
  • Graceful Shutdown: Clean resource cleanup on application termination

๐Ÿ”ง Advanced Capabilities

  • Middleware System: Extensible pipeline for message processing
  • Custom Serialization: Support for JSON, Protocol Buffers, and custom formats
  • Message Routing: Flexible topic and routing key patterns
  • Batching: Efficient batch message processing
  • Connection Pooling: Optimized connection management

๐Ÿ“Š Observability

  • Structured Logging: Built-in zerolog integration
  • Metrics Ready: Hooks for Prometheus, StatsD, and custom metrics
  • Tracing Support: OpenTelemetry compatible
  • Health Checks: Built-in health check endpoints

๐Ÿ› ๏ธ Installation

# Install the core library go get -u github.com/bxcodec/goqueue

Requirements

  • Go 1.21 or higher
  • Message broker (RabbitMQ supported, more coming soon)

๐Ÿ“– Basic Usage

๐Ÿš€ Publisher Example

package main import ( "context" "github.com/bxcodec/goqueue/publisher" publisherOpts "github.com/bxcodec/goqueue/options/publisher" amqp "github.com/rabbitmq/amqp091-go" ) func main() { // Connect to RabbitMQ conn, _ := amqp.Dial("amqp://localhost:5672/") // Create publisher pub := publisher.NewPublisher( publisherOpts.PublisherPlatformRabbitMQ, publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{ Conn: conn, PublisherChannelPoolSize: 5,	}), publisherOpts.WithPublisherID("my-service"), ) // Publish message err := pub.Publish(context.Background(), interfaces.Message{ Data: map[string]interface{}{"user_id": 123, "action": "signup"}, Action: "user.created", Topic: "users", }) if err != nil { log.Fatal(err) } }

๐Ÿ“จ Consumer Example

package main import ( "context" "github.com/bxcodec/goqueue/consumer" consumerOpts "github.com/bxcodec/goqueue/options/consumer" ) func main() { // Create consumer cons := consumer.NewConsumer( consumerOpts.ConsumerPlatformRabbitMQ, consumerOpts.WithQueueName("user-events"), consumerOpts.WithMaxRetryFailedMessage(3), consumerOpts.WithBatchMessageSize(10), ) // Start consuming cons.Consume(context.Background(), messageHandler, metadata) } func messageHandler(ctx context.Context, msg interfaces.InboundMessage) error { // Process your message userData := msg.Data.(map[string]interface{}) // Business logic here if err := processUser(userData); err != nil { // Retry with exponential backoff return msg.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) } // Acknowledge successful processing return msg.Ack(ctx) }

๐Ÿ”ง Advanced Features

๐Ÿ”„ Retry Mechanisms

GoQueue provides sophisticated retry mechanisms with multiple strategies:

// Exponential backoff retry return msg.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) // Custom retry logic return msg.RetryWithDelayFn(ctx, func(retryCount int64) int64 { return retryCount * 2 // Custom delay calculation }) // Move to dead letter queue after max retries return msg.MoveToDeadLetterQueue(ctx)

๐Ÿ”Œ Middleware System

Extend functionality with custom middleware:

// Custom logging middleware func LoggingMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc { return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { return func(ctx context.Context, m interfaces.InboundMessage) error { start := time.Now() err := next(ctx, m) log.Printf("Message processed in %v", time.Since(start)) return err } } } // Apply middleware cons := consumer.NewConsumer( consumerOpts.ConsumerPlatformRabbitMQ, consumerOpts.WithMiddlewares( LoggingMiddleware(), MetricsMiddleware(), AuthMiddleware(), ), )

๐ŸŽ›๏ธ Configuration Options

Fine-tune your queue behavior:

cons := consumer.NewConsumer( consumerOpts.ConsumerPlatformRabbitMQ, consumerOpts.WithQueueName("high-priority-queue"), consumerOpts.WithMaxRetryFailedMessage(5), consumerOpts.WithBatchMessageSize(50), consumerOpts.WithConsumerID("worker-01"), consumerOpts.WithRabbitMQConsumerConfig(&consumerOpts.RabbitMQConsumerConfig{ ConsumerChannel: channel, ReQueueChannel: requeueChannel, QueueDeclareConfig: &consumerOpts.RabbitMQQueueDeclareConfig{ Durable: true, AutoDelete: false, Exclusive: false, }, }), )

๐ŸŽฎ Examples

๐Ÿ“ Complete Examples

Explore our comprehensive examples:

๐Ÿฐ RabbitMQ Quick Setup

Start RabbitMQ with Docker:

# Clone the repository git clone https://github.com/bxcodec/goqueue.git cd goqueue/examples/rabbitmq/basic # Start RabbitMQ docker-compose up -d # Run the example go run main.go

๐Ÿ”„ Retry Architecture

GoQueue Retry Architecture

Automatic retry mechanism with exponential backoff and dead letter queue


๐Ÿ—๏ธ Architecture

๐ŸŽฏ Design Principles

  • Interface Segregation: Clean, focused interfaces for different responsibilities
  • Dependency Injection: Easy testing and swappable implementations
  • Error Handling: Comprehensive error types and recovery mechanisms
  • Performance: Optimized for high-throughput scenarios
  • Extensibility: Plugin architecture for custom providers and middleware

๐Ÿงฉ Core Components

Core Concept

๐Ÿ“ฆ Provider Support

Provider Status Features
RabbitMQ ๐Ÿ”„ Beta Version Full feature support
Google Pub/Sub ๐Ÿ“‹ Planned Coming soon
AWS SQS + SNS ๐Ÿ“‹ Planned Coming soon

๐Ÿ”ง Configuration

๐Ÿ“ Logging Setup

GoQueue uses structured logging with zerolog:

import "github.com/bxcodec/goqueue" // Setup basic logging (automatic when importing consumer/publisher) // OR setup with custom configuration: goqueue.SetupLoggingWithDefaults() // Pretty console output for development

๐Ÿงช Testing

Run the test suite:

# Unit tests make test # Integration tests with RabbitMQ make integration-test 

๐Ÿ“š Documentation

๐Ÿ“– Component Documentation

Explore our comprehensive guides for each system component:

Component Description Documentation
๐Ÿ”Œ Middleware Extend functionality with custom logic ๐Ÿ“– Middleware Guide
๐Ÿ“จ Consumer Reliable message consumption and processing ๐Ÿ“– Consumer Guide
๐Ÿ“ค Publisher High-performance message publishing ๐Ÿ“– Publisher Guide
๐Ÿ”„ RabbitMQ Retry Advanced retry mechanisms for RabbitMQ ๐Ÿ“– Retry Architecture

๐ŸŽฏ Quick Links


๐Ÿค Contributing

We welcome contributions! Here's how to get started:

๐Ÿš€ Quick Contribution Guide

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

๐Ÿ“‹ Development Setup

# Clone your fork git clone https://github.com/yourusername/goqueue.git cd goqueue # Install dependencies go mod download # Run tests make test # Run linting make lint 

๐ŸŽฏ Contribution Areas

  • ๐Ÿ”Œ New Queue Providers (Google Pub/Sub, SQS+SNS)
  • ๐Ÿ› ๏ธ Middleware Components (Metrics, Tracing, Auth)
  • ๐Ÿ“š Documentation & Examples
  • ๐Ÿงช Testing & Benchmarks
  • ๐Ÿ› Bug Fixes & Improvements

๐Ÿ“ž Support & Community


๐Ÿ“„ License

This project is licensed under the MIT License - see the LICENSE file for details.


๐Ÿ™ Acknowledgments

  • Thanks to all contributors
  • Inspired by the Go community's best practices
  • Built with โค๏ธ for the Go ecosystem

About

GoQueue - One library to rule them all. A Golang wrapper that handles all the complexity of various Queue platforms. Extensible and easy to learn.

Topics

Resources

License

Stars

Watchers

Forks

Contributors 2

  •  
  •