Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class BatchEventProcessor implements EventProcessor, AutoCloseable {
public static final String CONFIG_CLOSE_TIMEOUT = "event.processor.close.timeout";

public static final int DEFAULT_QUEUE_CAPACITY = 1000;
public static final int DEFAULT_WAIT_COUNT = 2;
public static final int DEFAULT_BATCH_SIZE = 10;
public static final long DEFAULT_BATCH_INTERVAL = TimeUnit.SECONDS.toMillis(30);
public static final long DEFAULT_TIMEOUT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
Expand Down Expand Up @@ -122,26 +123,46 @@ public void flush() throws InterruptedException {
eventQueue.put(FLUSH_SIGNAL);
}

private interface QueueService {
Object get() throws InterruptedException;
}

public class EventConsumer implements Runnable {
private LinkedList<UserEvent> currentBatch = new LinkedList<>();
private long deadline = System.currentTimeMillis() + flushInterval;

@Override
public void run() {
try {
int waitCount = 0;

QueueService polling = () -> eventQueue.poll(flushInterval, TimeUnit.MILLISECONDS);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When polling the queue, should we block until the next deadline, instead of blocking for flushInterval?

QueueService take = () -> eventQueue.take();
QueueService using = polling;

while (true) {
if (System.currentTimeMillis() > deadline) {
logger.debug("Deadline exceeded flushing current batch.");
flush();
deadline = System.currentTimeMillis() + flushInterval;
}

Object item = eventQueue.poll(50, TimeUnit.MILLISECONDS);
Object item = using.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the QueueService adds a bit too much cognitive load then what we need in this already complex loop. Since we're basing our decision to use take() or poll(...) based on the number of times we're exited the poll with null items we can use a ternary operator inline:

long timeout = deadline - System.currentTimeMillis(); Object item = emptyCount > MAX_EMPTY_COUNT ? eventQueue.take() : eventQueue.poll(timeout, TimeUnit.MILLISECONDS);

Note I'm suggesting renaming from "wait" to "empty" since I think better captures the intention of the counter. I also used the dynamic timeout as opposed to the fix flushInterval as we discussed earlier.


if (item == null) {
logger.debug("Empty item, sleeping for 50ms.");
Thread.sleep(50);
logger.debug("Empty item after waiting flush interval. Flushing.");
flush();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flush() is not necessary since it will be captured on the next while iteration. in general I'd like to keep the number of places we're calling flush() and setting the deadline to a minimum. (Ideally one place)

waitCount++;
if (waitCount > DEFAULT_WAIT_COUNT) {
using = take;
}
deadline = System.currentTimeMillis() + flushInterval;
continue;
}

waitCount = 0;
using = polling;

if (item == SHUTDOWN_SIGNAL) {
logger.info("Received shutdown signal.");
break;
Expand Down