Hey there, Gophers! π
Ever found yourself needing a lightweight, in-memory queue in Go that's both powerful and easy to use? Well, that's exactly what we're going to explore today with GoFrame's gqueue
component!
What's gqueue and Why Should You Care? π€
First off, let me paint a picture. You're building a service that needs to:
- Process tasks asynchronously
- Handle traffic spikes gracefully
- Manage background jobs efficiently
This is where gqueue
shines! It's an in-memory queue implementation that comes bundled with GoFrame, offering a perfect balance of simplicity and power.
Quick Start: Your First gqueue π±
Let's dive right in with a simple example:
package main import ( "github.com/gogf/gf/v2/container/gqueue" "fmt" ) func main() { // Create a queue that can hold 10 items q := gqueue.New(10) // Push some data q.Push("Hello, gqueue!") // Get the data back if value := q.Pop(); value != nil { fmt.Printf("Got: %v\n", value) } }
Simple, right? But wait, there's so much more we can do! π¨
The Cool Features You'll Love β¨
Here's what makes gqueue
stand out from regular channels or standard library queues:
- Thread-Safe by Default - No more mutex headaches!
- Flexible Capacity - Starts small, grows as needed
- Batch Operations - Process multiple items efficiently
- Timeout Support - Never get stuck waiting
Real-World Example: Building a Task Processor π οΈ
Let's build something more practical - a task processing system that you might actually use in production:
package main import ( "github.com/gogf/gf/v2/container/gqueue" "context" "fmt" "time" ) type Task struct { ID string Payload interface{} } func NewTaskProcessor() { // Create a queue with decent capacity q := gqueue.New(1000) // Start the worker go func() { for { // Try to get a task if data := q.Pop(); data != nil { if task, ok := data.(Task); ok { // Process the task fmt.Printf("Processing task: %s\n", task.ID) // Your processing logic here time.Sleep(100 * time.Millisecond) } } } }() // Add some tasks for i := 0; i < 5; i++ { q.Push(Task{ ID: fmt.Sprintf("task-%d", i), Payload: fmt.Sprintf("data-%d", i), }) } }
More Real-World Examples π
Building a Rate Limiter
Here's how you can build a simple rate limiter using gqueue
:
type RateLimiter struct { q *gqueue.Queue rate int window time.Duration } func NewRateLimiter(rate int, window time.Duration) *RateLimiter { rl := &RateLimiter{ q: gqueue.New(rate * 2), // Buffer for bursts rate: rate, window: window, } // Clean up old timestamps go rl.cleanup() return rl } func (rl *RateLimiter) Allow() bool { now := time.Now() // Remove expired timestamps for { if ts := rl.q.Pop(); ts != nil { if now.Sub(ts.(time.Time)) <= rl.window { // Put it back if still within window rl.q.Push(ts) break } } else { break } } // Check if we can add new request if rl.q.Len() < int64(rl.rate) { rl.q.Push(now) return true } return false } func (rl *RateLimiter) cleanup() { ticker := time.NewTicker(rl.window / 2) for range ticker.C { now := time.Now() for { if ts := rl.q.Pop(); ts != nil { if now.Sub(ts.(time.Time)) <= rl.window { rl.q.Push(ts) break } } else { break } } } }
Event Processing Pipeline
Here's an example of building an event processing pipeline with retry logic:
type Event struct { ID string Data interface{} Attempts int LastError error } type Pipeline struct { mainQueue *gqueue.Queue retryQueue *gqueue.Queue maxAttempts int } func NewPipeline(maxAttempts int) *Pipeline { p := &Pipeline{ mainQueue: gqueue.New(1000), retryQueue: gqueue.New(1000), maxAttempts: maxAttempts, } // Start retry handler go p.handleRetries() // Start main processor go p.processEvents() return p } func (p *Pipeline) handleRetries() { ticker := time.NewTicker(5 * time.Second) for range ticker.C { if event := p.retryQueue.Pop(); event != nil { e := event.(Event) // Exponential backoff if time.Since(e.LastAttempt) > time.Second*time.Duration(1<<e.Attempts) { p.mainQueue.Push(e) } else { p.retryQueue.Push(e) } } } } func (p *Pipeline) processEvents() { for { if data := p.mainQueue.Pop(); data != nil { event := data.(Event) if err := p.processEvent(event); err != nil { event.Attempts++ event.LastError = err event.LastAttempt = time.Now() if event.Attempts < p.maxAttempts { p.retryQueue.Push(event) } else { // Handle fatal error p.handleFatalError(event) } } } } }
Batch Processing with Timeouts
Here's a more sophisticated batch processing implementation:
type BatchProcessor struct { q *gqueue.Queue batchSize int timeout time.Duration processor func([]interface{}) error } func NewBatchProcessor(batchSize int, timeout time.Duration, processor func([]interface{}) error) *BatchProcessor { return &BatchProcessor{ q: gqueue.New(batchSize * 10), batchSize: batchSize, timeout: timeout, processor: processor, } } func (bp *BatchProcessor) Start(ctx context.Context) { batch := make([]interface{}, 0, bp.batchSize) timer := time.NewTimer(bp.timeout) for { select { case <-ctx.Done(): if len(batch) > 0 { bp.processor(batch) } return case <-timer.C: if len(batch) > 0 { bp.processor(batch) batch = make([]interface{}, 0, bp.batchSize) } timer.Reset(bp.timeout) default: if item := bp.q.Pop(); item != nil { batch = append(batch, item) if len(batch) >= bp.batchSize { bp.processor(batch) batch = make([]interface{}, 0, bp.batchSize) timer.Reset(bp.timeout) } } } } }
Pro Tips from the Trenches π‘
After using gqueue
in several production systems, here are some tips I've learned:
1. Size Your Queue Right
// For small services (< 1000 req/s) q := gqueue.New(1000) // For medium services (1000-5000 req/s) q := gqueue.New(5000) // For high-load services (5000+ req/s) q := gqueue.New(10000)
2. Implement Graceful Shutdown
func main() { q := gqueue.New(1000) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Start worker go func() { for { select { case <-ctx.Done(): // Clean up and exit return default: if data := q.Pop(); data != nil { // Process data } } } }() }
3. Handle Backpressure
func pushWithBackpressure(q *gqueue.Queue, data interface{}) error { if q.Len() > int64(q.Cap()*0.8) { // Queue is getting full, take action return fmt.Errorf("queue is at high capacity") } q.Push(data) return nil }
Troubleshooting Guide and Common Pitfalls β οΈ
1. Memory Leaks
Symptom: Increasing memory usage over time
Common Causes:
- Forgotten goroutines still processing queue items
- Large items not being released from memory
Solution:
func preventMemoryLeaks() { q := gqueue.New(1000) // Proper cleanup with context ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { for { select { case <-ctx.Done(): // Clean up remaining items for q.Len() > 0 { _ = q.Pop() } return default: if item := q.Pop(); item != nil { // Process item item = nil // Help GC } } } }() }
2. Queue Capacity Issues
Symptom: Application becomes slow or unresponsive
Common Causes:
- Queue filling up faster than it's being processed
- No backpressure mechanism
Solution:
func handleBackpressure() { q := gqueue.New(1000) // Monitor queue capacity go func() { ticker := time.NewTicker(time.Second) for range ticker.C { if q.Len() > int64(float64(q.Cap())*0.8) { // Alert on high usage log.Printf("Queue at %d%% capacity", int(float64(q.Len())/float64(q.Cap())*100)) // Take action (e.g., slow down producers) throttleProducers() } } }() } func throttleProducers() { // Implement throttling logic }
3. Deadlocks and Hanging
Symptom: Workers stop processing
Common Causes:
- Infinite loops in processing logic
- Missing error handling
Solution:
func preventDeadlocks() { q := gqueue.New(1000) // Use timeouts go func() { for { done := make(chan bool) go func() { if item := q.Pop(); item != nil { processWithTimeout(item) } done <- true }() // Timeout if processing takes too long select { case <-done: // Processing completed normally case <-time.After(5 * time.Second): // Handle timeout log.Println("Processing timeout") } } }() } func processWithTimeout(item interface{}) { // Your processing logic here }
4. Data Loss During Shutdown
Symptom: Items disappear when application stops
Common Causes:
- Improper shutdown handling
- Not waiting for queue to empty
Solution:
func gracefulShutdown() { q := gqueue.New(1000) ctx, cancel := context.WithCancel(context.Background()) // Graceful shutdown handler go func() { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT) <-sigChan // Cancel context to stop new items cancel() // Wait for queue to empty for q.Len() > 0 { time.Sleep(100 * time.Millisecond) } // Now safe to exit os.Exit(0) }() }
5. Performance Degradation
Symptom: Processing becomes slower over time
Common Causes:
- Too many goroutines
- Inefficient batch processing
- Memory pressure
Solution:
func optimizePerformance() { q := gqueue.New(1000) // Use worker pool const workerCount = 5 sem := make(chan struct{}, workerCount) go func() { for { sem <- struct{}{} // Limit concurrent workers go func() { defer func() { <-sem }() if item := q.Pop(); item != nil { // Process with proper error handling if err := processItem(item); err != nil { log.Printf("Error processing item: %v", err) } } }() } }() }
Performance Tips π
Here's a quick batch processing pattern that can significantly boost performance:
func batchProcess(q *gqueue.Queue, batchSize int) { batch := make([]interface{}, 0, batchSize) // Collect items for i := 0; i < batchSize; i++ { if item := q.Pop(); item != nil { batch = append(batch, item) } } // Process batch if len(batch) > 0 { processBatch(batch) } }
When to Use gqueue (And When Not To) π€
Perfect for:
- β Background task processing
- β Message buffering
- β Event handling
- β Rate limiting
Maybe not for:
- β Persistent storage needs
- β Distributed systems (use Kafka/RabbitMQ instead)
- β Critical transaction processing
Let's Wrap It Up! π
gqueue
is a fantastic tool when you need a lightweight, in-memory queue in Go. It's perfect for those scenarios where a full-blown message queue system would be overkill, but plain channels aren't quite enough.
Your Turn! π―
Now I'd love to hear from you:
- Have you used
gqueue
in your projects? - What other queue implementations have you tried in Go?
- Any cool patterns or tips to share?
Drop your thoughts in the comments below! And if you found this useful, don't forget to give it a β€οΈ
P.S. If you want to dive deeper into GoFrame, check out my other articles in the series! π
Top comments (0)