Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 44 additions & 7 deletions memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,34 +117,50 @@ func (q *Queue) Transaction(txcb queue.TxCallback) error {
return nil
}

// Consume implements Queue. MemoryQueues have infinite advertised window.
func (q *Queue) Consume(_ int) (queue.JobIter, error) {
return &JobIter{q: q, RWMutex: &q.RWMutex, finite: q.finite}, nil
// Consume implements Queue. The advertisedWindow value is the maximum number of
// unacknowledged jobs. Use 0 for an infinite window.
func (q *Queue) Consume(advertisedWindow int) (queue.JobIter, error) {
jobIter := JobIter{
q: q,
RWMutex: &q.RWMutex,
finite: q.finite,
}

if advertisedWindow > 0 {
jobIter.chn = make(chan struct{}, advertisedWindow)
}

return &jobIter, nil
}

// JobIter implements a queue.JobIter interface.
type JobIter struct {
q *Queue
closed bool
finite bool
chn chan struct{}
*sync.RWMutex
}

// Acknowledger implements a queue.Acknowledger interface.
type Acknowledger struct {
q *Queue
j *queue.Job
q *Queue
j *queue.Job
chn chan struct{}
}

// Ack is called when the Job has finished.
func (*Acknowledger) Ack() error {
func (a *Acknowledger) Ack() error {
a.release()
return nil
}

// Reject is called when the Job has errored. The argument indicates whether the Job
// should be put back in queue or not. If requeue is false, the job will go to the buried
// queue until Queue.RepublishBuried() is called.
func (a *Acknowledger) Reject(requeue bool) error {
defer a.release()

if !requeue {
// Send to the buried queue for later republishing
a.q.buriedJobs = append(a.q.buriedJobs, a.j)
Expand All @@ -154,6 +170,12 @@ func (a *Acknowledger) Reject(requeue bool) error {
return a.q.Publish(a.j)
}

func (a *Acknowledger) release() {
if a.chn != nil {
<-a.chn
}
}

func (i *JobIter) isClosed() bool {
i.RLock()
defer i.RUnlock()
Expand All @@ -162,8 +184,10 @@ func (i *JobIter) isClosed() bool {

// Next returns the next job in the iter.
func (i *JobIter) Next() (*queue.Job, error) {
i.acquire()
for {
if i.isClosed() {
i.release()
return nil, queue.ErrAlreadyClosed.New()
}

Expand All @@ -173,6 +197,7 @@ func (i *JobIter) Next() (*queue.Job, error) {
}

if err == io.EOF && i.finite {
i.release()
return nil, err
}

Expand All @@ -188,7 +213,7 @@ func (i *JobIter) next() (*queue.Job, error) {
}

j := i.q.jobs[i.q.idx]
j.Acknowledger = &Acknowledger{j: j, q: i.q}
j.Acknowledger = &Acknowledger{j: j, q: i.q, chn: i.chn}
i.q.idx++

return j, nil
Expand All @@ -201,3 +226,15 @@ func (i *JobIter) Close() error {
i.closed = true
return nil
}

func (i *JobIter) acquire() {
if i.chn != nil {
i.chn <- struct{}{}
}
}

func (i *JobIter) release() {
if i.chn != nil {
<-i.chn
}
}
77 changes: 77 additions & 0 deletions test/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

"gopkg.in/src-d/go-queue.v1"
Expand Down Expand Up @@ -542,6 +545,80 @@ func (s *QueueSuite) TestRetryQueue() {
<-done
}

func (s *QueueSuite) TestConcurrent() {
testCases := []int{1, 2, 13, 150}

for _, advertisedWindow := range testCases {
s.T().Run(strconv.Itoa(advertisedWindow), func(t *testing.T) {
assert := assert.New(t)

qName := NewName()
q, err := s.Broker.Queue(qName)
assert.NoError(err)
assert.NotNil(q)

var continueWG sync.WaitGroup
continueWG.Add(1)

var calledWG sync.WaitGroup

var calls int32
atomic.StoreInt32(&calls, 0)

iter, err := q.Consume(advertisedWindow)
assert.NoError(err)

go func() {
for {
j, err := iter.Next()
if queue.ErrAlreadyClosed.Is(err) {
return
}
assert.NoError(err)
if j == nil {
time.Sleep(300 * time.Millisecond)
continue
}

go func() {
// Removes 1 from calledWG, and gets locked
// until continueWG is released
atomic.AddInt32(&calls, 1)

calledWG.Done()
continueWG.Wait()

assert.NoError(j.Ack())
}()
}
}()

assert.EqualValues(0, atomic.LoadInt32(&calls))
calledWG.Add(advertisedWindow)

// Enqueue some jobs, 3 * advertisedWindow
for i := 0; i < advertisedWindow*3; i++ {
j, err := queue.NewJob()
assert.NoError(err)
err = j.Encode(i)
assert.NoError(err)
err = q.Publish(j)
assert.NoError(err)
}

// The first batch of calls should be exactly advertisedWindow
calledWG.Wait()
assert.EqualValues(advertisedWindow, atomic.LoadInt32(&calls))

// Let the iterator go though all the jobs, should be 3*advertisedWindow
calledWG.Add(2 * advertisedWindow)
continueWG.Done()
calledWG.Wait()
assert.EqualValues(3*advertisedWindow, atomic.LoadInt32(&calls))
})
}
}

func (s *QueueSuite) checkNextClosed(iter queue.JobIter) chan struct{} {
assert := assert.New(s.T())

Expand Down