-
Couldn't load subscription status.
- Fork 14
Add advertised window support for memory queue #16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
memory/memory.go Outdated
| "sync" | ||
| "time" | ||
| | ||
| "golang.org/x/sync/semaphore" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do the same with sync.WaitGroup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And without it, you can just do with a channel.
n := 5 // number of workers // first fill the channel up to N workers var ch = make(chan struct{}, n) for i := 0; i < n; i++ { ch <- struct{}{} } // before starting a new worker, wait until there's a spot available <-ch // when the worker is finish, just generate a new token ch <- struct{}{}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we could go with a channel. Do you think it's worth it to avoid using golang.org/x/?
| I think it’s not worth adding a dependency for something that’s not much harder than with it. …On 16 Oct 2018, 18:28 +0200, Carlos Martín ***@***.***>, wrote: @carlosms commented on this pull request. In memory/memory.go: > "io" "sync" "time" + "golang.org/x/sync/semaphore" Yeah, we could go with a channel. Do you think it's worth it to avoid using golang.org/x/? — You are receiving this because you commented. Reply to this email directly, view it on GitHub, or mute the thread. |
Signed-off-by: Carlos Martín <carlos.martin.sanchez@gmail.com>
b26bb06 to 09ec853 Compare | Ok I've redone the semaphore to be a channel. Instead of following @erizocosmico's suggestion to the letter, I used send to acquire and receive to release, since that's how it's done in the effective go example |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to add Ack() to queue.Job instead of having Acknowledger with references to JobIter members?
memory/memory.go Outdated
| } | ||
| | ||
| if advertisedWindow > 0 { | ||
| jobIter.sem = make(chan struct{}, advertisedWindow) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename it, to:
| jobIter.sem = make(chan struct{}, advertisedWindow) | |
| jobIter.chn = make(chan struct{}, advertisedWindow) |
with sem it's super confusing now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least chn tells me something about type. In go not so often you use semaphores, moreover without having semaphore type I don't know what to expect.
This is just a proposal - up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really mind one way or the other. Changed in e25f9e5.
| I don’t think chn is less confusing than sem tbh …On 16 Oct 2018, 19:36 +0200, Kuba Podgórski ***@***.***>, wrote: @kuba-- commented on this pull request. Does it make sense to add Ack() to queue.Job instead of having Acknowledger with references to JobIter members? In memory/memory.go: > @@ -117,34 +117,50 @@ func (q *Queue) Transaction(txcb queue.TxCallback) error { return nil } -// Consume implements Queue. MemoryQueues have infinite advertised window. -func (q *Queue) Consume(_ int) (queue.JobIter, error) { - return &JobIter{q: q, RWMutex: &q.RWMutex, finite: q.finite}, nil +// Consume implements Queue. The advertisedWindow value is the maximum number of +// unacknowledged jobs. Use 0 for an infinite window. +func (q *Queue) Consume(advertisedWindow int) (queue.JobIter, error) { + jobIter := JobIter{ + q: q, + RWMutex: &q.RWMutex, + finite: q.finite, + } + + if advertisedWindow > 0 { + jobIter.sem = make(chan struct{}, advertisedWindow) Please rename it, to: ⬇️ Suggested change - jobIter.sem = make(chan struct{}, advertisedWindow) + jobIter.chn = make(chan struct{}, advertisedWindow) with sem it's super confusing now. — You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or mute the thread. |
Signed-off-by: Carlos Martín <carlos.martin.sanchez@gmail.com>
🤔 I don't really follow what you mean. The common |
| @carlosms - ok make sense. I was just confused, why |
This PR adds the advertized window support to the memory queue.
Context: this is needed in lookout, to handle memory or rabbitMQ queues with the same code. src-d/lookout#321.
For compatibility with other projects that may rely on the lack of window, I left the option to use 0 to disable it.