Skip to content

Commit a5c2f0b

Browse files
authored
Merge pull request #1 from mcuadros/master
Implementation as packages and register pattern
2 parents dbcfd67 + 51b475e commit a5c2f0b

File tree

10 files changed

+351
-257
lines changed

10 files changed

+351
-257
lines changed

amqp.go renamed to amqp/amqp.go

Lines changed: 65 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,20 @@ import (
88
"sync/atomic"
99
"time"
1010

11-
"gopkg.in/src-d/go-errors.v1"
11+
"gopkg.in/src-d/go-queue.v0"
1212

1313
"github.com/jpillora/backoff"
1414
"github.com/streadway/amqp"
1515
log15 "gopkg.in/inconshreveable/log15.v2"
16+
"gopkg.in/src-d/go-errors.v1"
1617
)
1718

19+
func init() {
20+
queue.Register("amqp", func(uri string) (queue.Broker, error) {
21+
return New(uri)
22+
})
23+
}
24+
1825
var consumerSeq uint64
1926

2027
var (
@@ -37,8 +44,8 @@ const (
3744
backoffFactor = 2
3845
)
3946

40-
// AMQPBroker implements the Broker interface for AMQP.
41-
type AMQPBroker struct {
47+
// Broker implements the queue.Broker interface for AMQP, such as RabbitMQ.
48+
type Broker struct {
4249
mut sync.RWMutex
4350
conn *amqp.Connection
4451
ch *amqp.Channel
@@ -51,8 +58,8 @@ type connection interface {
5158
channel() *amqp.Channel
5259
}
5360

54-
// NewAMQPBroker creates a new AMQPBroker.
55-
func NewAMQPBroker(url string) (Broker, error) {
61+
// New creates a new AMQPBroker.
62+
func New(url string) (queue.Broker, error) {
5663
conn, err := amqp.Dial(url)
5764
if err != nil {
5865
return nil, ErrConnectionFailed.New(err)
@@ -63,7 +70,7 @@ func NewAMQPBroker(url string) (Broker, error) {
6370
return nil, ErrOpenChannel.New(err)
6471
}
6572

66-
b := &AMQPBroker{
73+
b := &Broker{
6774
conn: conn,
6875
ch: ch,
6976
stop: make(chan struct{}),
@@ -75,7 +82,6 @@ func NewAMQPBroker(url string) (Broker, error) {
7582
}
7683

7784
func connect(url string) (*amqp.Connection, *amqp.Channel) {
78-
7985
var (
8086
conn *amqp.Connection
8187
ch *amqp.Channel
@@ -115,7 +121,7 @@ func connect(url string) (*amqp.Connection, *amqp.Channel) {
115121
return conn, ch
116122
}
117123

118-
func (b *AMQPBroker) manageConnection(url string) {
124+
func (b *Broker) manageConnection(url string) {
119125
b.connErrors = make(chan *amqp.Error)
120126
b.conn.NotifyClose(b.connErrors)
121127

@@ -138,19 +144,19 @@ func (b *AMQPBroker) manageConnection(url string) {
138144
}
139145
}
140146

141-
func (b *AMQPBroker) connection() *amqp.Connection {
147+
func (b *Broker) connection() *amqp.Connection {
142148
b.mut.Lock()
143149
defer b.mut.Unlock()
144150
return b.conn
145151
}
146152

147-
func (b *AMQPBroker) channel() *amqp.Channel {
153+
func (b *Broker) channel() *amqp.Channel {
148154
b.mut.Lock()
149155
defer b.mut.Unlock()
150156
return b.ch
151157
}
152158

153-
func (b *AMQPBroker) newBuriedQueue(mainQueueName string) (q amqp.Queue, rex string, err error) {
159+
func (b *Broker) newBuriedQueue(mainQueueName string) (q amqp.Queue, rex string, err error) {
154160
ch, err := b.conn.Channel()
155161
if err != nil {
156162
return
@@ -184,7 +190,7 @@ func (b *AMQPBroker) newBuriedQueue(mainQueueName string) (q amqp.Queue, rex str
184190
}
185191

186192
// Queue returns the queue with the given name.
187-
func (b *AMQPBroker) Queue(name string) (Queue, error) {
193+
func (b *Broker) Queue(name string) (queue.Queue, error) {
188194
buriedQueue, rex, err := b.newBuriedQueue(name)
189195
if err != nil {
190196
return nil, err
@@ -199,23 +205,23 @@ func (b *AMQPBroker) Queue(name string) (Queue, error) {
199205
amqp.Table{
200206
"x-dead-letter-exchange": rex,
201207
"x-dead-letter-routing-key": name,
202-
"x-max-priority": uint8(PriorityUrgent),
208+
"x-max-priority": uint8(queue.PriorityUrgent),
203209
},
204210
)
205211

206212
if err != nil {
207213
return nil, err
208214
}
209215

210-
return &AMQPQueue{
216+
return &Queue{
211217
conn: b,
212218
queue: q,
213-
buriedQueue: &AMQPQueue{conn: b, queue: buriedQueue},
219+
buriedQueue: &Queue{conn: b, queue: buriedQueue},
214220
}, nil
215221
}
216222

217223
// Close closes all the connections managed by the broker.
218-
func (b *AMQPBroker) Close() error {
224+
func (b *Broker) Close() error {
219225
close(b.stop)
220226

221227
if err := b.channel().Close(); err != nil {
@@ -225,17 +231,17 @@ func (b *AMQPBroker) Close() error {
225231
return b.connection().Close()
226232
}
227233

228-
// AMQPQueue implements the Queue interface for the AMQP.
229-
type AMQPQueue struct {
234+
// Queue implements the Queue interface for the AMQP.
235+
type Queue struct {
230236
conn connection
231237
queue amqp.Queue
232-
buriedQueue *AMQPQueue
238+
buriedQueue *Queue
233239
}
234240

235241
// Publish publishes the given Job to the Queue.
236-
func (q *AMQPQueue) Publish(j *Job) error {
237-
if j == nil || len(j.raw) == 0 {
238-
return ErrEmptyJob.New()
242+
func (q *Queue) Publish(j *queue.Job) error {
243+
if j == nil || j.Size() == 0 {
244+
return queue.ErrEmptyJob.New()
239245
}
240246

241247
headers := amqp.Table{}
@@ -257,18 +263,18 @@ func (q *AMQPQueue) Publish(j *Job) error {
257263
MessageId: j.ID,
258264
Priority: uint8(j.Priority),
259265
Timestamp: j.Timestamp,
260-
ContentType: string(j.contentType),
261-
Body: j.raw,
266+
ContentType: j.ContentType,
267+
Body: j.Raw,
262268
Headers: headers,
263269
},
264270
)
265271
}
266272

267273
// PublishDelayed publishes the given Job with a given delay. Delayed messages
268274
// wont go into the buried queue if they fail.
269-
func (q *AMQPQueue) PublishDelayed(j *Job, delay time.Duration) error {
270-
if j == nil || len(j.raw) == 0 {
271-
return ErrEmptyJob.New()
275+
func (q *Queue) PublishDelayed(j *queue.Job, delay time.Duration) error {
276+
if j == nil || j.Size() == 0 {
277+
return queue.ErrEmptyJob.New()
272278
}
273279

274280
ttl := delay / time.Millisecond
@@ -283,7 +289,7 @@ func (q *AMQPQueue) PublishDelayed(j *Job, delay time.Duration) error {
283289
"x-dead-letter-routing-key": q.queue.Name,
284290
"x-message-ttl": int64(ttl),
285291
"x-expires": int64(ttl) * 2,
286-
"x-max-priority": uint8(PriorityUrgent),
292+
"x-max-priority": uint8(queue.PriorityUrgent),
287293
},
288294
)
289295
if err != nil {
@@ -300,20 +306,20 @@ func (q *AMQPQueue) PublishDelayed(j *Job, delay time.Duration) error {
300306
MessageId: j.ID,
301307
Priority: uint8(j.Priority),
302308
Timestamp: j.Timestamp,
303-
ContentType: string(j.contentType),
304-
Body: j.raw,
309+
ContentType: j.ContentType,
310+
Body: j.Raw,
305311
},
306312
)
307313
}
308314

309315
type jobErr struct {
310-
job *Job
316+
job *queue.Job
311317
err error
312318
}
313319

314320
// RepublishBuried will republish in the main queue those jobs that timed out without Ack
315321
// or were Rejected with requeue = False and makes comply return true.
316-
func (q *AMQPQueue) RepublishBuried(conditions ...RepublishConditionFunc) error {
322+
func (q *Queue) RepublishBuried(conditions ...queue.RepublishConditionFunc) error {
317323
if q.buriedQueue == nil {
318324
return fmt.Errorf("buriedQueue is nil, called RepublishBuried on the internal buried queue?")
319325
}
@@ -327,10 +333,10 @@ func (q *AMQPQueue) RepublishBuried(conditions ...RepublishConditionFunc) error
327333
defer iter.Close()
328334

329335
retries := 0
330-
var notComplying []*Job
336+
var notComplying []*queue.Job
331337
var errorsPublishing []*jobErr
332338
for {
333-
j, err := iter.(*AMQPJobIter).nextNonBlocking()
339+
j, err := iter.(*JobIter).nextNonBlocking()
334340
if err != nil {
335341
return err
336342
}
@@ -355,7 +361,7 @@ func (q *AMQPQueue) RepublishBuried(conditions ...RepublishConditionFunc) error
355361
return err
356362
}
357363

358-
if republishConditions(conditions).comply(j) {
364+
if queue.RepublishConditions(conditions).Comply(j) {
359365
if err = q.Publish(j); err != nil {
360366
errorsPublishing = append(errorsPublishing, &jobErr{j, err})
361367
}
@@ -374,7 +380,7 @@ func (q *AMQPQueue) RepublishBuried(conditions ...RepublishConditionFunc) error
374380
return q.handleRepublishErrors(errorsPublishing)
375381
}
376382

377-
func (q *AMQPQueue) handleRepublishErrors(list []*jobErr) error {
383+
func (q *Queue) handleRepublishErrors(list []*jobErr) error {
378384
if len(list) > 0 {
379385
stringErrors := []string{}
380386
for _, je := range list {
@@ -391,7 +397,7 @@ func (q *AMQPQueue) handleRepublishErrors(list []*jobErr) error {
391397
}
392398

393399
// Transaction executes the given callback inside a transaction.
394-
func (q *AMQPQueue) Transaction(txcb TxCallback) error {
400+
func (q *Queue) Transaction(txcb queue.TxCallback) error {
395401
ch, err := q.conn.connection().Channel()
396402
if err != nil {
397403
return ErrOpenChannel.New(err)
@@ -403,8 +409,8 @@ func (q *AMQPQueue) Transaction(txcb TxCallback) error {
403409
return err
404410
}
405411

406-
txQueue := &AMQPQueue{
407-
conn: &AMQPBroker{
412+
txQueue := &Queue{
413+
conn: &Broker{
408414
conn: q.conn.connection(),
409415
ch: ch,
410416
},
@@ -425,7 +431,7 @@ func (q *AMQPQueue) Transaction(txcb TxCallback) error {
425431

426432
// Implements Queue. The advertisedWindow value will be the exact
427433
// number of undelivered jobs in transit, not just the minium.
428-
func (q *AMQPQueue) Consume(advertisedWindow int) (JobIter, error) {
434+
func (q *Queue) Consume(advertisedWindow int) (queue.JobIter, error) {
429435
ch, err := q.conn.connection().Channel()
430436
if err != nil {
431437
return nil, ErrOpenChannel.New(err)
@@ -451,39 +457,39 @@ func (q *AMQPQueue) Consume(advertisedWindow int) (JobIter, error) {
451457
return nil, err
452458
}
453459

454-
return &AMQPJobIter{id: id, ch: ch, c: c}, nil
460+
return &JobIter{id: id, ch: ch, c: c}, nil
455461
}
456462

457-
func (q *AMQPQueue) consumeID() string {
463+
func (q *Queue) consumeID() string {
458464
return fmt.Sprintf("%s-%s-%d",
459465
os.Args[0],
460466
q.queue.Name,
461467
atomic.AddUint64(&consumerSeq, 1),
462468
)
463469
}
464470

465-
// AMQPJobIter implements the JobIter interface for AMQP.
466-
type AMQPJobIter struct {
471+
// JobIter implements the JobIter interface for AMQP.
472+
type JobIter struct {
467473
id string
468474
ch *amqp.Channel
469475
c <-chan amqp.Delivery
470476
}
471477

472478
// Next returns the next job in the iter.
473-
func (i *AMQPJobIter) Next() (*Job, error) {
479+
func (i *JobIter) Next() (*queue.Job, error) {
474480
d, ok := <-i.c
475481
if !ok {
476-
return nil, ErrAlreadyClosed.New()
482+
return nil, queue.ErrAlreadyClosed.New()
477483
}
478484

479485
return fromDelivery(&d)
480486
}
481487

482-
func (i *AMQPJobIter) nextNonBlocking() (*Job, error) {
488+
func (i *JobIter) nextNonBlocking() (*queue.Job, error) {
483489
select {
484490
case d, ok := <-i.c:
485491
if !ok {
486-
return nil, ErrAlreadyClosed.New()
492+
return nil, queue.ErrAlreadyClosed.New()
487493
}
488494

489495
return fromDelivery(&d)
@@ -493,44 +499,43 @@ func (i *AMQPJobIter) nextNonBlocking() (*Job, error) {
493499
}
494500

495501
// Close closes the channel of the JobIter.
496-
func (i *AMQPJobIter) Close() error {
502+
func (i *JobIter) Close() error {
497503
if err := i.ch.Cancel(i.id, false); err != nil {
498504
return err
499505
}
500506

501507
return i.ch.Close()
502508
}
503509

504-
// AMQPAcknowledger implements the Acknowledger for AMQP.
505-
type AMQPAcknowledger struct {
510+
// Acknowledger implements the Acknowledger for AMQP.
511+
type Acknowledger struct {
506512
ack amqp.Acknowledger
507513
id uint64
508514
}
509515

510516
// Ack signals ackwoledgement.
511-
func (a *AMQPAcknowledger) Ack() error {
517+
func (a *Acknowledger) Ack() error {
512518
return a.ack.Ack(a.id, false)
513519
}
514520

515521
// Reject signals rejection. If requeue is false, the job will go to the buried
516522
// queue until Queue.RepublishBuried() is called.
517-
func (a *AMQPAcknowledger) Reject(requeue bool) error {
523+
func (a *Acknowledger) Reject(requeue bool) error {
518524
return a.ack.Reject(a.id, requeue)
519525
}
520526

521-
func fromDelivery(d *amqp.Delivery) (*Job, error) {
522-
j, err := NewJob()
527+
func fromDelivery(d *amqp.Delivery) (*queue.Job, error) {
528+
j, err := queue.NewJob()
523529
if err != nil {
524530
return nil, err
525531
}
526532

527533
j.ID = d.MessageId
528-
j.Priority = Priority(d.Priority)
534+
j.Priority = queue.Priority(d.Priority)
529535
j.Timestamp = d.Timestamp
530-
j.contentType = contentType(d.ContentType)
531-
j.acknowledger = &AMQPAcknowledger{d.Acknowledger, d.DeliveryTag}
532-
j.tag = d.DeliveryTag
533-
j.raw = d.Body
536+
j.ContentType = d.ContentType
537+
j.Acknowledger = &Acknowledger{d.Acknowledger, d.DeliveryTag}
538+
j.Raw = d.Body
534539

535540
if retries, ok := d.Headers[retriesHeader]; ok {
536541
retries, ok := retries.(int32)

0 commit comments

Comments
 (0)