Skip to content

Commit 884fb09

Browse files
authored
Merge pull request #2 from mcuadros/master
amqp: configuration variables
2 parents da9ce46 + 1e63e1b commit 884fb09

File tree

3 files changed

+62
-32
lines changed

3 files changed

+62
-32
lines changed

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,18 @@ fmt.Println(payload)
6262
// Output: hello world!
6363
```
6464

65+
66+
Configuration
67+
-------------
68+
69+
### AMQP
70+
71+
The list of available variables is:
72+
73+
- `AMQP_BACKOFF_MIN` (default: 20ms): Minimum time to wait for retry the connection or queue channel assignment.
74+
- `AMQP_BACKOFF_MAX` (default: 30s): Maximum time to wait for retry the connection or queue channel assignment.
75+
- `AMQP_BACKOFF_FACTOR` (default: 2): Multiplying factor for each increment step on the retry.
76+
6577
License
6678
-------
6779
Apache License Version 2.0, see [LICENSE](LICENSE)

amqp/amqp.go

Lines changed: 42 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,42 @@ import (
1111
"gopkg.in/src-d/go-queue.v0"
1212

1313
"github.com/jpillora/backoff"
14+
"github.com/kelseyhightower/envconfig"
1415
"github.com/streadway/amqp"
1516
log15 "gopkg.in/inconshreveable/log15.v2"
1617
"gopkg.in/src-d/go-errors.v1"
1718
)
1819

1920
func init() {
21+
err := envconfig.Process("amqp", &DefaultConfiguration)
22+
if err != nil {
23+
panic(err)
24+
}
25+
2026
queue.Register("amqp", func(uri string) (queue.Broker, error) {
2127
return New(uri)
2228
})
2329
}
2430

31+
// DefaultConfiguration contains the default configuration initalized from
32+
// environment variables.
33+
var DefaultConfiguration Configuration
34+
35+
// Configuration AMQP configuration settings, this settings are set using the
36+
// envinroment varabiles.
37+
type Configuration struct {
38+
BuriedQueueSuffix string `envconfig:"BURIED_QUEUE_SUFFIX" default:".buriedQueue"`
39+
BuriedExchangeSuffix string `envconfig:"BURIED_EXCHANGE_SUFFIX" default:".buriedExchange"`
40+
BuriedNonBlockingRetries int `envconfig:"BURIED_BLOCKING_RETRIES" default:"3"`
41+
42+
RetriesHeader string `envconfig:"RETRIES_HEADER" default:"x-retries"`
43+
ErrorHeader string `envconfig:"ERROR_HEADER" default:"x-error-type"`
44+
45+
BackoffMin time.Duration `envconfig:"BACKOFF_MIN" default:"200ms"`
46+
BackoffMax time.Duration `envconfig:"BACKOFF_MAX" default:"30s"`
47+
BackoffFactor float64 `envconfig:"BACKOFF_FACTOR" default:"2"`
48+
}
49+
2550
var consumerSeq uint64
2651

2752
var (
@@ -31,19 +56,6 @@ var (
3156
ErrRepublishingJobs = errors.NewKind("couldn't republish some jobs : %s")
3257
)
3358

34-
const (
35-
buriedQueueSuffix = ".buriedQueue"
36-
buriedQueueExchangeSuffix = ".buriedExchange"
37-
buriedNonBlockingRetries = 3
38-
39-
retriesHeader string = "x-retries"
40-
errorHeader string = "x-error-type"
41-
42-
backoffMin = 200 * time.Millisecond
43-
backoffMax = 30 * time.Second
44-
backoffFactor = 2
45-
)
46-
4759
// Broker implements the queue.Broker interface for AMQP, such as RabbitMQ.
4860
type Broker struct {
4961
mut sync.RWMutex
@@ -87,9 +99,9 @@ func connect(url string) (*amqp.Connection, *amqp.Channel) {
8799
ch *amqp.Channel
88100
err error
89101
b = &backoff.Backoff{
90-
Min: backoffMin,
91-
Max: backoffMax,
92-
Factor: backoffFactor,
102+
Min: DefaultConfiguration.BackoffMin,
103+
Max: DefaultConfiguration.BackoffMax,
104+
Factor: DefaultConfiguration.BackoffFactor,
93105
Jitter: false,
94106
}
95107
)
@@ -162,8 +174,8 @@ func (b *Broker) newBuriedQueue(mainQueueName string) (q amqp.Queue, rex string,
162174
return
163175
}
164176

165-
buriedName := mainQueueName + buriedQueueSuffix
166-
rex = mainQueueName + buriedQueueExchangeSuffix
177+
buriedName := mainQueueName + DefaultConfiguration.BuriedQueueSuffix
178+
rex = mainQueueName + DefaultConfiguration.BuriedExchangeSuffix
167179

168180
if err = ch.ExchangeDeclare(rex, "fanout", true, false, false, false, nil); err != nil {
169181
return
@@ -246,11 +258,11 @@ func (q *Queue) Publish(j *queue.Job) error {
246258

247259
headers := amqp.Table{}
248260
if j.Retries > 0 {
249-
headers[retriesHeader] = j.Retries
261+
headers[DefaultConfiguration.RetriesHeader] = j.Retries
250262
}
251263

252264
if j.ErrorType != "" {
253-
headers[errorHeader] = j.ErrorType
265+
headers[DefaultConfiguration.ErrorHeader] = j.ErrorType
254266
}
255267

256268
return q.conn.channel().Publish(
@@ -342,11 +354,12 @@ func (q *Queue) RepublishBuried(conditions ...queue.RepublishConditionFunc) erro
342354
}
343355

344356
if j == nil {
345-
// check (in non blocking mode) up to "buriedNonBlockingRetries" with
346-
// a small delay between them just in case some job is arriving, return
347-
// if there is nothing after all the retries (meaning: BuriedQueue is surely
348-
// empty or any arriving jobs will have to wait to the next call).
349-
if retries > buriedNonBlockingRetries {
357+
// check (in non blocking mode) up to DefaultConfiguration.BuriedNonBlockingRetries
358+
// with a small delay between them just in case some job is
359+
// arriving, return if there is nothing after all the retries
360+
// (meaning: BuriedQueue is surely empty or any arriving jobs will
361+
// have to wait to the next call).
362+
if retries > DefaultConfiguration.BuriedNonBlockingRetries {
350363
break
351364
}
352365

@@ -537,19 +550,19 @@ func fromDelivery(d *amqp.Delivery) (*queue.Job, error) {
537550
j.Acknowledger = &Acknowledger{d.Acknowledger, d.DeliveryTag}
538551
j.Raw = d.Body
539552

540-
if retries, ok := d.Headers[retriesHeader]; ok {
553+
if retries, ok := d.Headers[DefaultConfiguration.RetriesHeader]; ok {
541554
retries, ok := retries.(int32)
542555
if !ok {
543-
return nil, ErrRetrievingHeader.New(retriesHeader, d.MessageId)
556+
return nil, ErrRetrievingHeader.New(DefaultConfiguration.RetriesHeader, d.MessageId)
544557
}
545558

546559
j.Retries = retries
547560
}
548561

549-
if errorType, ok := d.Headers[errorHeader]; ok {
562+
if errorType, ok := d.Headers[DefaultConfiguration.ErrorHeader]; ok {
550563
errorType, ok := errorType.(string)
551564
if !ok {
552-
return nil, ErrRetrievingHeader.New(retriesHeader, d.MessageId)
565+
return nil, ErrRetrievingHeader.New(DefaultConfiguration.ErrorHeader, d.MessageId)
553566
}
554567

555568
j.ErrorType = errorType

amqp/amqp_test.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ func (s *AMQPSuite) SetupSuite() {
2727
s.BrokerURI = testAMQPURI
2828
}
2929

30+
func TestDefaultConfig(t *testing.T) {
31+
assert.Equal(t, DefaultConfiguration.BuriedExchangeSuffix, ".buriedExchange")
32+
}
33+
3034
func TestNewAMQPBroker_bad_url(t *testing.T) {
3135
assert := assert.New(t)
3236

@@ -106,17 +110,18 @@ func TestAMQPHeaders(t *testing.T) {
106110
errorType string
107111
}{
108112
{
109-
name: fmt.Sprintf("with %s and %s headers", retriesHeader, errorHeader),
113+
name: fmt.Sprintf("with %s and %s headers",
114+
DefaultConfiguration.RetriesHeader, DefaultConfiguration.ErrorHeader),
110115
retries: int32(10),
111116
errorType: "error-test",
112117
},
113118
{
114-
name: fmt.Sprintf("with %s header", retriesHeader),
119+
name: fmt.Sprintf("with %s header", DefaultConfiguration.RetriesHeader),
115120
retries: int32(10),
116121
errorType: "",
117122
},
118123
{
119-
name: fmt.Sprintf("with %s headers", errorHeader),
124+
name: fmt.Sprintf("with %s headers", DefaultConfiguration.ErrorHeader),
120125
retries: int32(0),
121126
errorType: "error-test",
122127
},

0 commit comments

Comments
 (0)