Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 36 additions & 33 deletions amqp.go → amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@ import (
"sync/atomic"
"time"

"gopkg.in/src-d/go-errors.v1"
"gopkg.in/src-d/go-queue.v0"

"github.com/jpillora/backoff"
"github.com/streadway/amqp"
log15 "gopkg.in/inconshreveable/log15.v2"
"gopkg.in/src-d/go-errors.v1"
)

func init() {
queue.Register("amqp", NewAMQPBroker)
}

var consumerSeq uint64

var (
Expand Down Expand Up @@ -52,7 +57,7 @@ type connection interface {
}

// NewAMQPBroker creates a new AMQPBroker.
func NewAMQPBroker(url string) (Broker, error) {
func NewAMQPBroker(url string) (queue.Broker, error) {
conn, err := amqp.Dial(url)
if err != nil {
return nil, ErrConnectionFailed.New(err)
Expand All @@ -75,7 +80,6 @@ func NewAMQPBroker(url string) (Broker, error) {
}

func connect(url string) (*amqp.Connection, *amqp.Channel) {

var (
conn *amqp.Connection
ch *amqp.Channel
Expand Down Expand Up @@ -184,7 +188,7 @@ func (b *AMQPBroker) newBuriedQueue(mainQueueName string) (q amqp.Queue, rex str
}

// Queue returns the queue with the given name.
func (b *AMQPBroker) Queue(name string) (Queue, error) {
func (b *AMQPBroker) Queue(name string) (queue.Queue, error) {
buriedQueue, rex, err := b.newBuriedQueue(name)
if err != nil {
return nil, err
Expand All @@ -199,7 +203,7 @@ func (b *AMQPBroker) Queue(name string) (Queue, error) {
amqp.Table{
"x-dead-letter-exchange": rex,
"x-dead-letter-routing-key": name,
"x-max-priority": uint8(PriorityUrgent),
"x-max-priority": uint8(queue.PriorityUrgent),
},
)

Expand Down Expand Up @@ -233,9 +237,9 @@ type AMQPQueue struct {
}

// Publish publishes the given Job to the Queue.
func (q *AMQPQueue) Publish(j *Job) error {
if j == nil || len(j.raw) == 0 {
return ErrEmptyJob.New()
func (q *AMQPQueue) Publish(j *queue.Job) error {
if j == nil || j.Size() == 0 {
return queue.ErrEmptyJob.New()
}

headers := amqp.Table{}
Expand All @@ -257,18 +261,18 @@ func (q *AMQPQueue) Publish(j *Job) error {
MessageId: j.ID,
Priority: uint8(j.Priority),
Timestamp: j.Timestamp,
ContentType: string(j.contentType),
Body: j.raw,
ContentType: j.ContentType,
Body: j.Raw,
Headers: headers,
},
)
}

// PublishDelayed publishes the given Job with a given delay. Delayed messages
// wont go into the buried queue if they fail.
func (q *AMQPQueue) PublishDelayed(j *Job, delay time.Duration) error {
if j == nil || len(j.raw) == 0 {
return ErrEmptyJob.New()
func (q *AMQPQueue) PublishDelayed(j *queue.Job, delay time.Duration) error {
if j == nil || j.Size() == 0 {
return queue.ErrEmptyJob.New()
}

ttl := delay / time.Millisecond
Expand All @@ -283,7 +287,7 @@ func (q *AMQPQueue) PublishDelayed(j *Job, delay time.Duration) error {
"x-dead-letter-routing-key": q.queue.Name,
"x-message-ttl": int64(ttl),
"x-expires": int64(ttl) * 2,
"x-max-priority": uint8(PriorityUrgent),
"x-max-priority": uint8(queue.PriorityUrgent),
},
)
if err != nil {
Expand All @@ -300,20 +304,20 @@ func (q *AMQPQueue) PublishDelayed(j *Job, delay time.Duration) error {
MessageId: j.ID,
Priority: uint8(j.Priority),
Timestamp: j.Timestamp,
ContentType: string(j.contentType),
Body: j.raw,
ContentType: j.ContentType,
Body: j.Raw,
},
)
}

type jobErr struct {
job *Job
job *queue.Job
err error
}

// RepublishBuried will republish in the main queue those jobs that timed out without Ack
// or were Rejected with requeue = False and makes comply return true.
func (q *AMQPQueue) RepublishBuried(conditions ...RepublishConditionFunc) error {
func (q *AMQPQueue) RepublishBuried(conditions ...queue.RepublishConditionFunc) error {
if q.buriedQueue == nil {
return fmt.Errorf("buriedQueue is nil, called RepublishBuried on the internal buried queue?")
}
Expand All @@ -327,7 +331,7 @@ func (q *AMQPQueue) RepublishBuried(conditions ...RepublishConditionFunc) error
defer iter.Close()

retries := 0
var notComplying []*Job
var notComplying []*queue.Job
var errorsPublishing []*jobErr
for {
j, err := iter.(*AMQPJobIter).nextNonBlocking()
Expand Down Expand Up @@ -355,7 +359,7 @@ func (q *AMQPQueue) RepublishBuried(conditions ...RepublishConditionFunc) error
return err
}

if republishConditions(conditions).comply(j) {
if queue.RepublishConditions(conditions).Comply(j) {
if err = q.Publish(j); err != nil {
errorsPublishing = append(errorsPublishing, &jobErr{j, err})
}
Expand Down Expand Up @@ -391,7 +395,7 @@ func (q *AMQPQueue) handleRepublishErrors(list []*jobErr) error {
}

// Transaction executes the given callback inside a transaction.
func (q *AMQPQueue) Transaction(txcb TxCallback) error {
func (q *AMQPQueue) Transaction(txcb queue.TxCallback) error {
ch, err := q.conn.connection().Channel()
if err != nil {
return ErrOpenChannel.New(err)
Expand Down Expand Up @@ -425,7 +429,7 @@ func (q *AMQPQueue) Transaction(txcb TxCallback) error {

// Implements Queue. The advertisedWindow value will be the exact
// number of undelivered jobs in transit, not just the minium.
func (q *AMQPQueue) Consume(advertisedWindow int) (JobIter, error) {
func (q *AMQPQueue) Consume(advertisedWindow int) (queue.JobIter, error) {
ch, err := q.conn.connection().Channel()
if err != nil {
return nil, ErrOpenChannel.New(err)
Expand Down Expand Up @@ -470,20 +474,20 @@ type AMQPJobIter struct {
}

// Next returns the next job in the iter.
func (i *AMQPJobIter) Next() (*Job, error) {
func (i *AMQPJobIter) Next() (*queue.Job, error) {
d, ok := <-i.c
if !ok {
return nil, ErrAlreadyClosed.New()
return nil, queue.ErrAlreadyClosed.New()
}

return fromDelivery(&d)
}

func (i *AMQPJobIter) nextNonBlocking() (*Job, error) {
func (i *AMQPJobIter) nextNonBlocking() (*queue.Job, error) {
select {
case d, ok := <-i.c:
if !ok {
return nil, ErrAlreadyClosed.New()
return nil, queue.ErrAlreadyClosed.New()
}

return fromDelivery(&d)
Expand Down Expand Up @@ -518,19 +522,18 @@ func (a *AMQPAcknowledger) Reject(requeue bool) error {
return a.ack.Reject(a.id, requeue)
}

func fromDelivery(d *amqp.Delivery) (*Job, error) {
j, err := NewJob()
func fromDelivery(d *amqp.Delivery) (*queue.Job, error) {
j, err := queue.NewJob()
if err != nil {
return nil, err
}

j.ID = d.MessageId
j.Priority = Priority(d.Priority)
j.Priority = queue.Priority(d.Priority)
j.Timestamp = d.Timestamp
j.contentType = contentType(d.ContentType)
j.acknowledger = &AMQPAcknowledger{d.Acknowledger, d.DeliveryTag}
j.tag = d.DeliveryTag
j.raw = d.Body
j.ContentType = d.ContentType
j.Acknowledger = &AMQPAcknowledger{d.Acknowledger, d.DeliveryTag}
j.Raw = d.Body

if retries, ok := d.Headers[retriesHeader]; ok {
retries, ok := retries.(int32)
Expand Down
59 changes: 33 additions & 26 deletions amqp_test.go → amqp/amqp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"testing"
"time"

"gopkg.in/src-d/go-queue.v0"
"gopkg.in/src-d/go-queue.v0/test"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand All @@ -15,9 +18,11 @@ func TestAMQPSuite(t *testing.T) {
}

type AMQPSuite struct {
QueueSuite
test.QueueSuite
}

const testAMQPURI = "amqp://localhost:5672"

func (s *AMQPSuite) SetupSuite() {
s.BrokerURI = testAMQPURI
}
Expand All @@ -30,9 +35,9 @@ func TestNewAMQPBroker_bad_url(t *testing.T) {
assert.Nil(b)
}

func sendJobs(assert *assert.Assertions, n int, p Priority, q Queue) {
func sendJobs(assert *assert.Assertions, n int, p queue.Priority, q queue.Queue) {
for i := 0; i < n; i++ {
j, err := NewJob()
j, err := queue.NewJob()
assert.NoError(err)
j.SetPriority(p)
err = j.Encode(i)
Expand All @@ -47,18 +52,20 @@ func TestAMQPPriorities(t *testing.T) {

broker, err := NewAMQPBroker(testAMQPURI)
assert.NoError(err)
assert.NotNil(broker)
if !assert.NotNil(broker) {
return
}

name := newName()
name := test.NewName()
q, err := broker.Queue(name)
assert.NoError(err)
assert.NotNil(q)

// Send 50 low priority jobs
sendJobs(assert, 50, PriorityLow, q)
sendJobs(assert, 50, queue.PriorityLow, q)

// Send 50 high priority jobs
sendJobs(assert, 50, PriorityUrgent, q)
sendJobs(assert, 50, queue.PriorityUrgent, q)

// Receive and collect priorities
iter, err := q.Consume(1)
Expand All @@ -81,16 +88,16 @@ func TestAMQPPriorities(t *testing.T) {
}

assert.True(sumFirst > sumLast)
assert.Equal(uint(PriorityUrgent)*50, sumFirst)
assert.Equal(uint(PriorityLow)*50, sumLast)
assert.Equal(uint(queue.PriorityUrgent)*50, sumFirst)
assert.Equal(uint(queue.PriorityLow)*50, sumLast)
}

func TestAMQPHeaders(t *testing.T) {
broker, err := NewBroker(testAMQPURI)
broker, err := queue.NewBroker(testAMQPURI)
require.NoError(t, err)
defer func() { require.NoError(t, broker.Close()) }()

queue, err := broker.Queue(newName())
q, err := broker.Queue(test.NewName())
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -121,17 +128,17 @@ func TestAMQPHeaders(t *testing.T) {
}

for i, test := range tests {
job, err := NewJob()
job, err := queue.NewJob()
require.NoError(t, err)

job.Retries = test.retries
job.ErrorType = test.errorType

require.NoError(t, job.Encode(i))
require.NoError(t, queue.Publish(job))
require.NoError(t, q.Publish(job))
}

jobIter, err := queue.Consume(len(tests))
jobIter, err := q.Consume(len(tests))
require.NoError(t, err)

for _, test := range tests {
Expand All @@ -147,15 +154,15 @@ func TestAMQPHeaders(t *testing.T) {
}

func TestAMQPRepublishBuried(t *testing.T) {
broker, err := NewBroker(testAMQPURI)
broker, err := queue.NewBroker(testAMQPURI)
require.NoError(t, err)
defer func() { require.NoError(t, broker.Close()) }()

queueName := newName()
queue, err := broker.Queue(queueName)
queueName := test.NewName()
q, err := broker.Queue(queueName)
require.NoError(t, err)

amqpQueue, ok := queue.(*AMQPQueue)
amqpQueue, ok := q.(*AMQPQueue)
require.True(t, ok)

buried := amqpQueue.buriedQueue
Expand All @@ -170,29 +177,29 @@ func TestAMQPRepublishBuried(t *testing.T) {
{name: "message 3", payload: "payload 4"},
}

for _, test := range tests {
job, err := NewJob()
for _, utest := range tests {
job, err := queue.NewJob()
require.NoError(t, err)

job.raw = []byte(test.payload)
job.Raw = []byte(utest.payload)

err = buried.Publish(job)
require.NoError(t, err)
time.Sleep(1 * time.Second)
}

var condition RepublishConditionFunc = func(j *Job) bool {
return string(j.raw) == "republish"
var condition queue.RepublishConditionFunc = func(j *queue.Job) bool {
return string(j.Raw) == "republish"
}

err = queue.RepublishBuried(condition)
err = q.RepublishBuried(condition)
require.NoError(t, err)

jobIter, err := queue.Consume(1)
jobIter, err := q.Consume(1)
require.NoError(t, err)
defer func() { require.NoError(t, jobIter.Close()) }()

job, err := jobIter.Next()
require.NoError(t, err)
require.Equal(t, string(job.raw), "republish")
require.Equal(t, string(job.Raw), "republish")
}
Loading