- Notifications
You must be signed in to change notification settings - Fork 32
(chore): refactor batch event processor to use blocking queue poll and take so as not to spin too much. #343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
4ecfce9 1b919a3 48f7fd8 29e2067 c64b8d4 a3fcaa5 78f1cd6 File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -48,6 +48,7 @@ public class BatchEventProcessor implements EventProcessor, AutoCloseable { | |
| public static final String CONFIG_CLOSE_TIMEOUT = "event.processor.close.timeout"; | ||
| | ||
| public static final int DEFAULT_QUEUE_CAPACITY = 1000; | ||
| public static final int DEFAULT_WAIT_COUNT = 2; | ||
| public static final int DEFAULT_BATCH_SIZE = 10; | ||
| public static final long DEFAULT_BATCH_INTERVAL = TimeUnit.SECONDS.toMillis(30); | ||
| public static final long DEFAULT_TIMEOUT_INTERVAL = TimeUnit.SECONDS.toMillis(5); | ||
| | @@ -122,26 +123,46 @@ public void flush() throws InterruptedException { | |
| eventQueue.put(FLUSH_SIGNAL); | ||
| } | ||
| | ||
| private interface QueueService { | ||
| Object get() throws InterruptedException; | ||
| } | ||
| | ||
| public class EventConsumer implements Runnable { | ||
| private LinkedList<UserEvent> currentBatch = new LinkedList<>(); | ||
| private long deadline = System.currentTimeMillis() + flushInterval; | ||
| | ||
| @Override | ||
| public void run() { | ||
| try { | ||
| int waitCount = 0; | ||
| | ||
| QueueService polling = () -> eventQueue.poll(flushInterval, TimeUnit.MILLISECONDS); | ||
| QueueService take = () -> eventQueue.take(); | ||
| QueueService using = polling; | ||
| | ||
| while (true) { | ||
| if (System.currentTimeMillis() > deadline) { | ||
| logger.debug("Deadline exceeded flushing current batch."); | ||
| flush(); | ||
| deadline = System.currentTimeMillis() + flushInterval; | ||
| } | ||
| | ||
| Object item = eventQueue.poll(50, TimeUnit.MILLISECONDS); | ||
| Object item = using.get(); | ||
| ||
| | ||
| if (item == null) { | ||
| logger.debug("Empty item, sleeping for 50ms."); | ||
| Thread.sleep(50); | ||
| logger.debug("Empty item after waiting flush interval. Flushing."); | ||
| flush(); | ||
| ||
| waitCount++; | ||
| if (waitCount > DEFAULT_WAIT_COUNT) { | ||
| using = take; | ||
| } | ||
| deadline = System.currentTimeMillis() + flushInterval; | ||
| continue; | ||
| } | ||
| | ||
| waitCount = 0; | ||
| using = polling; | ||
| | ||
| if (item == SHUTDOWN_SIGNAL) { | ||
| logger.info("Received shutdown signal."); | ||
| break; | ||
| | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When polling the queue, should we block until the next deadline, instead of blocking for
flushInterval?