One library to rule them all - A powerful, extensible, and developer-friendly Go wrapper that simplifies message queue operations across multiple platforms. Build robust, scalable applications with consistent queue operations, regardless of your underlying message broker.
๐ฏ Universal Interface - Write once, run anywhere. Switch between queue providers without changing your code
โก Production Ready - Built-in retry mechanisms, dead letter queues, and error handling
๐ก๏ธ Type Safe - Strongly typed interfaces with comprehensive error handling
๐ง Extensible - Plugin architecture for custom middleware and queue providers
๐ Observable - Built-in logging and middleware support for monitoring
๐ Developer Experience - Intuitive API design with sensible defaults
- ๐ Quick Start
- ๐ซ Features
- ๐ ๏ธ Installation
- ๐ Basic Usage
- ๐ง Advanced Features
- ๐ฎ Examples
- ๐๏ธ Architecture
- ๐ Documentation
- ๐ค Contributing
- ๐ License
Get up and running in less than 5 minutes:
go get -u github.com/bxcodec/goqueuepackage main import ( "context" "log" "github.com/bxcodec/goqueue" "github.com/bxcodec/goqueue/consumer" "github.com/bxcodec/goqueue/publisher" "github.com/bxcodec/goqueue/interfaces" ) func main() { // Create queue service queueSvc := goqueue.NewQueueService( options.WithConsumer(myConsumer), options.WithPublisher(myPublisher), options.WithMessageHandler(handleMessage), ) // Publish a message queueSvc.Publish(context.Background(), interfaces.Message{ Data: map[string]interface{}{"hello": "world"}, Action: "user.created", Topic: "users", }) // Start consuming queueSvc.Start(context.Background()) } func handleMessage(ctx context.Context, m interfaces.InboundMessage) error { log.Printf("Received: %v", m.Data) return m.Ack(ctx) // Acknowledge successful processing }- Multi-Provider Support: Currently supports RabbitMQ (more coming soon!)
- Unified API: Consistent interface across all queue providers
- Type Safety: Strongly typed message structures
- Context Support: Full Go context integration for cancellation and timeouts
- Automatic Retries: Configurable retry mechanisms with exponential backoff
- Dead Letter Queues: Handle failed messages gracefully
- Circuit Breaker: Built-in protection against cascading failures
- Graceful Shutdown: Clean resource cleanup on application termination
- Middleware System: Extensible pipeline for message processing
- Custom Serialization: Support for JSON, Protocol Buffers, and custom formats
- Message Routing: Flexible topic and routing key patterns
- Batching: Efficient batch message processing
- Connection Pooling: Optimized connection management
- Structured Logging: Built-in zerolog integration
- Metrics Ready: Hooks for Prometheus, StatsD, and custom metrics
- Tracing Support: OpenTelemetry compatible
- Health Checks: Built-in health check endpoints
# Install the core library go get -u github.com/bxcodec/goqueue- Go 1.21 or higher
- Message broker (RabbitMQ supported, more coming soon)
package main import ( "context" "github.com/bxcodec/goqueue/publisher" publisherOpts "github.com/bxcodec/goqueue/options/publisher" amqp "github.com/rabbitmq/amqp091-go" ) func main() { // Connect to RabbitMQ conn, _ := amqp.Dial("amqp://localhost:5672/") // Create publisher pub := publisher.NewPublisher( publisherOpts.PublisherPlatformRabbitMQ, publisherOpts.WithRabbitMQPublisherConfig(&publisherOpts.RabbitMQPublisherConfig{ Conn: conn, PublisherChannelPoolSize: 5, }), publisherOpts.WithPublisherID("my-service"), ) // Publish message err := pub.Publish(context.Background(), interfaces.Message{ Data: map[string]interface{}{"user_id": 123, "action": "signup"}, Action: "user.created", Topic: "users", }) if err != nil { log.Fatal(err) } }package main import ( "context" "github.com/bxcodec/goqueue/consumer" consumerOpts "github.com/bxcodec/goqueue/options/consumer" ) func main() { // Create consumer cons := consumer.NewConsumer( consumerOpts.ConsumerPlatformRabbitMQ, consumerOpts.WithQueueName("user-events"), consumerOpts.WithMaxRetryFailedMessage(3), consumerOpts.WithBatchMessageSize(10), ) // Start consuming cons.Consume(context.Background(), messageHandler, metadata) } func messageHandler(ctx context.Context, msg interfaces.InboundMessage) error { // Process your message userData := msg.Data.(map[string]interface{}) // Business logic here if err := processUser(userData); err != nil { // Retry with exponential backoff return msg.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) } // Acknowledge successful processing return msg.Ack(ctx) }GoQueue provides sophisticated retry mechanisms with multiple strategies:
// Exponential backoff retry return msg.RetryWithDelayFn(ctx, interfaces.ExponentialBackoffDelayFn) // Custom retry logic return msg.RetryWithDelayFn(ctx, func(retryCount int64) int64 { return retryCount * 2 // Custom delay calculation }) // Move to dead letter queue after max retries return msg.MoveToDeadLetterQueue(ctx)Extend functionality with custom middleware:
// Custom logging middleware func LoggingMiddleware() interfaces.InboundMessageHandlerMiddlewareFunc { return func(next interfaces.InboundMessageHandlerFunc) interfaces.InboundMessageHandlerFunc { return func(ctx context.Context, m interfaces.InboundMessage) error { start := time.Now() err := next(ctx, m) log.Printf("Message processed in %v", time.Since(start)) return err } } } // Apply middleware cons := consumer.NewConsumer( consumerOpts.ConsumerPlatformRabbitMQ, consumerOpts.WithMiddlewares( LoggingMiddleware(), MetricsMiddleware(), AuthMiddleware(), ), )Fine-tune your queue behavior:
cons := consumer.NewConsumer( consumerOpts.ConsumerPlatformRabbitMQ, consumerOpts.WithQueueName("high-priority-queue"), consumerOpts.WithMaxRetryFailedMessage(5), consumerOpts.WithBatchMessageSize(50), consumerOpts.WithConsumerID("worker-01"), consumerOpts.WithRabbitMQConsumerConfig(&consumerOpts.RabbitMQConsumerConfig{ ConsumerChannel: channel, ReQueueChannel: requeueChannel, QueueDeclareConfig: &consumerOpts.RabbitMQQueueDeclareConfig{ Durable: true, AutoDelete: false, Exclusive: false, }, }), )Explore our comprehensive examples:
- Basic Usage - Simple publish/consume example
- With Retries - Advanced retry mechanisms
Start RabbitMQ with Docker:
# Clone the repository git clone https://github.com/bxcodec/goqueue.git cd goqueue/examples/rabbitmq/basic # Start RabbitMQ docker-compose up -d # Run the example go run main.goAutomatic retry mechanism with exponential backoff and dead letter queue
- Interface Segregation: Clean, focused interfaces for different responsibilities
- Dependency Injection: Easy testing and swappable implementations
- Error Handling: Comprehensive error types and recovery mechanisms
- Performance: Optimized for high-throughput scenarios
- Extensibility: Plugin architecture for custom providers and middleware
| Provider | Status | Features |
|---|---|---|
| RabbitMQ | ๐ Beta Version | Full feature support |
| Google Pub/Sub | ๐ Planned | Coming soon |
| AWS SQS + SNS | ๐ Planned | Coming soon |
GoQueue uses structured logging with zerolog:
import "github.com/bxcodec/goqueue" // Setup basic logging (automatic when importing consumer/publisher) // OR setup with custom configuration: goqueue.SetupLoggingWithDefaults() // Pretty console output for developmentRun the test suite:
# Unit tests make test # Integration tests with RabbitMQ make integration-test Explore our comprehensive guides for each system component:
| Component | Description | Documentation |
|---|---|---|
| ๐ Middleware | Extend functionality with custom logic | ๐ Middleware Guide |
| ๐จ Consumer | Reliable message consumption and processing | ๐ Consumer Guide |
| ๐ค Publisher | High-performance message publishing | ๐ Publisher Guide |
| ๐ RabbitMQ Retry | Advanced retry mechanisms for RabbitMQ | ๐ Retry Architecture |
- ๐ Full Documentation Index - Complete documentation overview
- ๐ง API Reference - Go package documentation
- ๐ฎ Examples - Working code examples
- ๐ Troubleshooting - Common issues and solutions
We welcome contributions! Here's how to get started:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
# Clone your fork git clone https://github.com/yourusername/goqueue.git cd goqueue # Install dependencies go mod download # Run tests make test # Run linting make lint - ๐ New Queue Providers (Google Pub/Sub, SQS+SNS)
- ๐ ๏ธ Middleware Components (Metrics, Tracing, Auth)
- ๐ Documentation & Examples
- ๐งช Testing & Benchmarks
- ๐ Bug Fixes & Improvements
- ๐ Documentation: pkg.go.dev/github.com/bxcodec/goqueue
- ๐ Issues: GitHub Issues
- ๐ง Email: iman@tumorang.com
This project is licensed under the MIT License - see the LICENSE file for details.
- Thanks to all contributors
- Inspired by the Go community's best practices
- Built with โค๏ธ for the Go ecosystem

