Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ The list of available variables is:
- `AMQP_BACKOFF_MIN` (default: 20ms): Minimum time to wait for retry the connection or queue channel assignment.
- `AMQP_BACKOFF_MAX` (default: 30s): Maximum time to wait for retry the connection or queue channel assignment.
- `AMQP_BACKOFF_FACTOR` (default: 2): Multiplying factor for each increment step on the retry.
- `AMQP_BURIED_QUEUE_SUFFIX` (default: `.buriedQueue`): Suffix for the buried queue name.
- `AMQP_BURIED_EXCHANGE_SUFFIX` (default: `.buriedExchange`): Suffix for the exchange name.
- `AMQP_BURIED_TIMEOUT` (default: 500): Time in milliseconds to wait for new jobs from the buried queue.
- `AMQP_RETRIES_HEADER` (default: `x-retries`): Message header to set the number of retries.
- `AMQP_ERROR_HEADER` (default: `x-error-type`): Message header to set the error type.

License
-------
Expand Down
39 changes: 10 additions & 29 deletions amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ var DefaultConfiguration Configuration
// Configuration AMQP configuration settings, this settings are set using the
// envinroment varabiles.
type Configuration struct {
BuriedQueueSuffix string `envconfig:"BURIED_QUEUE_SUFFIX" default:".buriedQueue"`
BuriedExchangeSuffix string `envconfig:"BURIED_EXCHANGE_SUFFIX" default:".buriedExchange"`
BuriedNonBlockingRetries int `envconfig:"BURIED_BLOCKING_RETRIES" default:"3"`
BuriedQueueSuffix string `envconfig:"BURIED_QUEUE_SUFFIX" default:".buriedQueue"`
BuriedExchangeSuffix string `envconfig:"BURIED_EXCHANGE_SUFFIX" default:".buriedExchange"`
BuriedTimeout int `envconfig:"BURIED_TIMEOUT" default:"500"`

RetriesHeader string `envconfig:"RETRIES_HEADER" default:"x-retries"`
ErrorHeader string `envconfig:"ERROR_HEADER" default:"x-error-type"`
Expand Down Expand Up @@ -345,41 +345,22 @@ func (q *Queue) RepublishBuried(conditions ...queue.RepublishConditionFunc) erro

defer iter.Close()

retries := 0
timeout := time.Duration(DefaultConfiguration.BuriedTimeout) * time.Millisecond

var notComplying []*queue.Job
var errorsPublishing []*jobErr
for {
j, err := iter.(*JobIter).nextNonBlocking()
j, err := iter.(*JobIter).nextWithTimeout(timeout)
if err != nil {
return err
}

if j == nil {
log.With(log.Fields{
"retries": retries,
}).Debugf("received empty job")

// check (in non blocking mode) up to DefaultConfiguration.BuriedNonBlockingRetries
// with a small delay between them just in case some job is
// arriving, return if there is nothing after all the retries
// (meaning: BuriedQueue is surely empty or any arriving jobs will
// have to wait to the next call).
if retries > DefaultConfiguration.BuriedNonBlockingRetries {
log.With(log.Fields{
"retries": retries,
"max-retries": DefaultConfiguration.BuriedNonBlockingRetries,
}).Debugf("maximum number of retries reached")
log.Debugf("no more jobs in the buried queue")

break
}

time.Sleep(50 * time.Millisecond)
retries++
continue
break
}

retries = 0

if err = j.Ack(); err != nil {
return err
}
Expand Down Expand Up @@ -541,15 +522,15 @@ func (i *JobIter) Next() (*queue.Job, error) {
return fromDelivery(&d)
}

func (i *JobIter) nextNonBlocking() (*queue.Job, error) {
func (i *JobIter) nextWithTimeout(timeout time.Duration) (*queue.Job, error) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can make this function more generic (if it makes sense), e.g.:

func (i *JobIter) nextWithTimeout(timeout time.Duration, fn func(*queue.Job) error) error { } 
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nextWithTimeout is a version of Next (https://github.com/src-d/go-queue/blob/master/amqp/amqp.go#L492-L500) that does not block forever. It's only used in republish buried function so I don't see the benefit of adding a function parameter.

I suppose the best thing to do is to change Next to be able to timeout instead of adding a private method. Maybe adding the timeout value when constructing JobIter. It's not done like this as I didn't want to do bigger changes to the Queue library.

select {
case d, ok := <-i.c:
if !ok {
return nil, queue.ErrAlreadyClosed.New()
}

return fromDelivery(&d)
default:
case <-time.After(timeout):
return nil, nil
}
}
Expand Down