Skip to content

Commit c746cf8

Browse files
committed
Add debug logging to RepublishBuried with duration
Signed-off-by: Javi Fontan <jfontan@gmail.com>
1 parent d6c29d2 commit c746cf8

File tree

1 file changed

+45
-3
lines changed

1 file changed

+45
-3
lines changed

amqp/amqp.go

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -355,12 +355,21 @@ func (q *Queue) RepublishBuried(conditions ...queue.RepublishConditionFunc) erro
355355
}
356356

357357
if j == nil {
358+
log.With(log.Fields{
359+
"retries": retries,
360+
}).Debugf("received empty job")
361+
358362
// check (in non blocking mode) up to DefaultConfiguration.BuriedNonBlockingRetries
359363
// with a small delay between them just in case some job is
360364
// arriving, return if there is nothing after all the retries
361365
// (meaning: BuriedQueue is surely empty or any arriving jobs will
362366
// have to wait to the next call).
363367
if retries > DefaultConfiguration.BuriedNonBlockingRetries {
368+
log.With(log.Fields{
369+
"retries": retries,
370+
"max-retries": DefaultConfiguration.BuriedNonBlockingRetries,
371+
}).Debugf("maximum number of retries reached")
372+
364373
break
365374
}
366375

@@ -376,19 +385,45 @@ func (q *Queue) RepublishBuried(conditions ...queue.RepublishConditionFunc) erro
376385
}
377386

378387
if queue.RepublishConditions(conditions).Comply(j) {
388+
start := time.Now()
379389
if err = q.Publish(j); err != nil {
390+
log.With(log.Fields{
391+
"duration": time.Since(start),
392+
"id": j.ID,
393+
}).Errorf(err, "error publishing job")
394+
380395
errorsPublishing = append(errorsPublishing, &jobErr{j, err})
396+
} else {
397+
log.With(log.Fields{
398+
"duration": time.Since(start),
399+
"id": j.ID,
400+
}).Debugf("job republished")
381401
}
382402
} else {
383-
notComplying = append(notComplying, j)
403+
log.With(log.Fields{
404+
"id": j.ID,
405+
"error-type": j.ErrorType,
406+
"content-type": j.ContentType,
407+
"retries": j.Retries,
408+
}).Debugf("job does not comply with conditions")
384409

410+
notComplying = append(notComplying, j)
385411
}
386412
}
387413

388-
for _, job := range notComplying {
414+
log.Debugf("rejecting %v non complying jobs", len(notComplying))
415+
416+
for i, job := range notComplying {
417+
start := time.Now()
418+
389419
if err = job.Reject(true); err != nil {
390420
return err
391421
}
422+
423+
log.With(log.Fields{
424+
"duration": time.Since(start),
425+
"id": job.ID,
426+
}).Debugf("job rejected (%v/%v)", i, len(notComplying))
392427
}
393428

394429
return q.handleRepublishErrors(errorsPublishing)
@@ -397,11 +432,18 @@ func (q *Queue) RepublishBuried(conditions ...queue.RepublishConditionFunc) erro
397432
func (q *Queue) handleRepublishErrors(list []*jobErr) error {
398433
if len(list) > 0 {
399434
stringErrors := []string{}
400-
for _, je := range list {
435+
for i, je := range list {
401436
stringErrors = append(stringErrors, je.err.Error())
437+
start := time.Now()
438+
402439
if err := q.buriedQueue.Publish(je.job); err != nil {
403440
return err
404441
}
442+
443+
log.With(log.Fields{
444+
"duration": time.Since(start),
445+
"id": je.job.ID,
446+
}).Debugf("job reburied (%v/%v)", i, len(list))
405447
}
406448

407449
return ErrRepublishingJobs.New(strings.Join(stringErrors, ": "))

0 commit comments

Comments
 (0)