Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

.ci/
Makefile.main
8 changes: 6 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@ matrix:
- go: tip

services:
- rabbitmq

- docker

sudo: required

before_install:
- docker pull rabbitmq:3-management
- docker run -d --name rabbitmq -p 127.0.0.1:5672:5672 rabbitmq:3-management
- docker ps -a

install:
- make dependencies

Expand Down
111 changes: 79 additions & 32 deletions amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Broker struct {
conn *amqp.Connection
ch *amqp.Channel
connErrors chan *amqp.Error
chErrors chan *amqp.Error
stop chan struct{}
backoff *backoff.Backoff
}
Expand All @@ -84,6 +85,7 @@ func New(url string) (queue.Broker, error) {
}

b := &Broker{
mut: sync.RWMutex{},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since mut is not a pointer the zero value is already sync.RWMutex{}, so you can remove this line

conn: conn,
ch: ch,
stop: make(chan struct{}),
Expand All @@ -95,38 +97,67 @@ func New(url string) (queue.Broker, error) {
},
}

b.connErrors = make(chan *amqp.Error, 1)
b.conn.NotifyClose(b.connErrors)

b.chErrors = make(chan *amqp.Error, 1)
b.ch.NotifyClose(b.chErrors)

go b.manageConnection(url)

return b, nil
}

func (b *Broker) manageConnection(url string) {
b.connErrors = make(chan *amqp.Error)
b.conn.NotifyClose(b.connErrors)

for {
select {
case err := <-b.connErrors:
log.Errorf(err, "amqp connection error")
if err == nil {
break
}
log.Errorf(err, "amqp connection error - reconnecting")

b.mut.Lock()
if err != nil {
b.conn, b.ch = b.reconnect(url)
b.connErrors = make(chan *amqp.Error)
b.conn.NotifyClose(b.connErrors)
b.reconnect(url)
b.mut.Unlock()
break

case err := <-b.chErrors:
if err == nil {
break
}
log.Errorf(err, "amqp channel error - reopening channel")

b.mut.Lock()
b.reopenChannel()
b.mut.Unlock()

case <-b.stop:
return
}
}
}

func (b *Broker) reconnect(url string) (*amqp.Connection, *amqp.Channel) {
func (b *Broker) reconnect(url string) {
b.backoff.Reset()

// open a new connection and channel
b.conn = b.tryConnection(url)
b.connErrors = make(chan *amqp.Error, 1)
b.conn.NotifyClose(b.connErrors)

b.ch = b.tryChannel(b.conn)
b.chErrors = make(chan *amqp.Error, 1)
b.ch.NotifyClose(b.chErrors)
}

func (b *Broker) reopenChannel() {
b.backoff.Reset()
conn := b.tryConnection(url)
ch := b.tryChannel(conn)
return conn, ch

// open a new channel
b.ch = b.tryChannel(b.conn)
b.chErrors = make(chan *amqp.Error, 1)
b.ch.NotifyClose(b.chErrors)
}

func (b *Broker) tryConnection(url string) *amqp.Connection {
Expand All @@ -136,8 +167,8 @@ func (b *Broker) tryConnection(url string) *amqp.Connection {
b.backoff.Reset()
return conn
}

d := b.backoff.Duration()

log.Errorf(err, "error connecting to amqp, reconnecting in %s", d)
time.Sleep(d)
}
Expand All @@ -150,8 +181,8 @@ func (b *Broker) tryChannel(conn *amqp.Connection) *amqp.Channel {
b.backoff.Reset()
return ch
}

d := b.backoff.Duration()

log.Errorf(err, "error creatting channel, new retry in %s", d)
time.Sleep(d)
}
Expand All @@ -165,8 +196,9 @@ func (b *Broker) connection() *amqp.Connection {

func (b *Broker) channel() *amqp.Channel {
b.mut.Lock()
defer b.mut.Unlock()
return b.ch
ch := b.ch
b.mut.Unlock()
return ch
}

func (b *Broker) newBuriedQueue(mainQueueName string) (q amqp.Queue, rex string, err error) {
Expand Down Expand Up @@ -209,7 +241,7 @@ func (b *Broker) Queue(name string) (queue.Queue, error) {
return nil, err
}

q, err := b.ch.QueueDeclare(
q, err := b.channel().QueueDeclare(
name, // name
true, // durable
false, // delete when unused
Expand Down Expand Up @@ -257,30 +289,45 @@ func (q *Queue) Publish(j *queue.Job) error {
return queue.ErrEmptyJob.New()
}

var (
err error
nretries = int32(3)
)

headers := amqp.Table{}
if j.Retries > 0 {
nretries = j.Retries
headers[DefaultConfiguration.RetriesHeader] = j.Retries
}

if j.ErrorType != "" {
headers[DefaultConfiguration.ErrorHeader] = j.ErrorType
}

return q.conn.channel().Publish(
"", // exchange
q.queue.Name, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
MessageId: j.ID,
Priority: uint8(j.Priority),
Timestamp: j.Timestamp,
ContentType: j.ContentType,
Body: j.Raw,
Headers: headers,
},
)
for nretries > 0 {
err = q.conn.channel().Publish(
"", // exchange
q.queue.Name, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
MessageId: j.ID,
Priority: uint8(j.Priority),
Timestamp: j.Timestamp,
ContentType: j.ContentType,
Body: j.Raw,
Headers: headers,
},
)
if err == nil {
break
}
log.Errorf(err, "publishing to %s", q.queue.Name)
nretries--
}

return err
}

// PublishDelayed publishes the given Job with a given delay. Delayed messages
Expand Down
94 changes: 94 additions & 0 deletions amqp/amqp_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package queue

import (
"context"
"fmt"
"os/exec"
"testing"
"time"

Expand All @@ -13,6 +15,20 @@ import (
"github.com/stretchr/testify/suite"
)

// Pilosa tests require running docker. If `docker ps` command returned an error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Pilosa/AMQP/

// we skip some of the tests
var (
dockerIsRunning bool
dockerCmdOutput string
)

func init() {
cmd := exec.Command("docker", "ps")
b, err := cmd.CombinedOutput()

dockerCmdOutput, dockerIsRunning = string(b), (err == nil)
}

func TestAMQPSuite(t *testing.T) {
suite.Run(t, new(AMQPSuite))
}
Expand Down Expand Up @@ -208,3 +224,81 @@ func TestAMQPRepublishBuried(t *testing.T) {
require.NoError(t, err)
require.Equal(t, string(job.Raw), "republish")
}

func TestReconnect(t *testing.T) {
if !dockerIsRunning {
t.Skip()
}

broker, err := queue.NewBroker(testAMQPURI)
require.NoError(t, err)
defer func() { broker.Close() }()

queueName := test.NewName()
q, err := broker.Queue(queueName)
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go rabbitHiccup(ctx, 5*time.Second)

tests := []struct {
name string
payload string
}{
{name: "message 1", payload: "payload 1"},
{name: "message 2", payload: "payload 2"},
{name: "message 3", payload: "payload 3"},
{name: "message 3", payload: "payload 4"},
}

for _, test := range tests {
job, err := queue.NewJob()
require.NoError(t, err)

job.Raw = []byte(test.payload)

err = q.Publish(job)
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
}

n := len(tests)
jobIter, err := q.Consume(1)
require.NoError(t, err)
defer func() { jobIter.Close() }()

for i := 0; i < n; i++ {
if job, err := jobIter.Next(); err != nil {
t.Log(err)

job, err = queue.NewJob()
require.NoError(t, err)
job.Raw = []byte("check connection - retry till we connect")
err = q.Publish(job)
require.NoError(t, err)
break
} else {
t.Log(string(job.Raw))
}
}
}

// rabbitHiccup restarts rabbitmq every interval
// it requires the RabbitMQ running in docker container:
// docker run --name rabbitmq -d -p 127.0.0.1:5672:5672 rabbitmq:3-management
func rabbitHiccup(ctx context.Context, interval time.Duration) error {
cmd := exec.Command("docker", "restart", "rabbitmq")
err := cmd.Start()
for err == nil {
select {
case <-ctx.Done():
err = ctx.Err()

case <-time.After(interval):
err = cmd.Start()
}
}

return err
}
2 changes: 1 addition & 1 deletion common.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Queue interface {
PublishDelayed(*Job, time.Duration) error
// Transaction executes the passed TxCallback inside a transaction.
Transaction(TxCallback) error
// Consume returns a JobIter for the queue. Ir receives the minimum
// Consume returns a JobIter for the queue. It receives the minimum
// number of undelivered jobs the iterator will allow at any given
// time (see the Acknowledger interface).
Consume(advertisedWindow int) (JobIter, error)
Expand Down