Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class BatchEventProcessor implements EventProcessor, AutoCloseable {
public static final String CONFIG_CLOSE_TIMEOUT = "event.processor.close.timeout";

public static final int DEFAULT_QUEUE_CAPACITY = 1000;
public static final int DEFAULT_EMPTY_COUNT = 2;
public static final int DEFAULT_BATCH_SIZE = 10;
public static final long DEFAULT_BATCH_INTERVAL = TimeUnit.SECONDS.toMillis(30);
public static final long DEFAULT_TIMEOUT_INTERVAL = TimeUnit.SECONDS.toMillis(5);
Expand Down Expand Up @@ -129,19 +130,26 @@ public class EventConsumer implements Runnable {
@Override
public void run() {
try {
int emptyCount = 0;

while (true) {
if (System.currentTimeMillis() > deadline) {
logger.debug("Deadline exceeded flushing current batch.");
flush();
deadline = System.currentTimeMillis() + flushInterval;
}

Object item = eventQueue.poll(50, TimeUnit.MILLISECONDS);
long timeout = deadline - System.currentTimeMillis();
Object item = emptyCount > DEFAULT_EMPTY_COUNT ? eventQueue.take() : eventQueue.poll(timeout, TimeUnit.MILLISECONDS);

if (item == null) {
logger.debug("Empty item, sleeping for 50ms.");
Thread.sleep(50);
logger.debug("Empty item after waiting flush interval. Flushing.");
emptyCount++;
continue;
}

emptyCount = 0;

if (item == SHUTDOWN_SIGNAL) {
logger.info("Received shutdown signal.");
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,43 +84,45 @@ public void testDrainOnClose() throws Exception {
}

@Test
public void testFlushOnMaxTimeout() throws Exception {
public void testFlushMaxBatchSize() throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(1);
setEventProcessor(logEvent -> {
assertEquals(MAX_BATCH_SIZE, logEvent.getEventBatch().getVisitors().size());
eventHandlerRule.dispatchEvent(logEvent);
countDownLatch.countDown();
});

UserEvent userEvent = buildConversionEvent(EVENT_NAME);
eventProcessor.process(userEvent);
eventHandlerRule.expectConversion(EVENT_NAME, USER_ID);
for (int i = 0; i < MAX_BATCH_SIZE; i++) {
String eventName = EVENT_NAME + i;
UserEvent userEvent = buildConversionEvent(eventName);
eventProcessor.process(userEvent);
eventHandlerRule.expectConversion(eventName, USER_ID);
}

if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) {
fail("Exceeded timeout waiting for events to flush.");
}

eventProcessor.close();
assertEquals(0, eventQueue.size());
eventHandlerRule.expectCalls(1);
}

@Test
public void testFlushMaxBatchSize() throws Exception {
public void testFlushOnMaxTimeout() throws Exception {
UserEvent userEvent = buildConversionEvent(EVENT_NAME);

CountDownLatch countDownLatch = new CountDownLatch(1);
setEventProcessor(logEvent -> {
assertEquals(MAX_BATCH_SIZE, logEvent.getEventBatch().getVisitors().size());
eventHandlerRule.dispatchEvent(logEvent);
countDownLatch.countDown();
});

for (int i = 0; i < MAX_BATCH_SIZE; i++) {
String eventName = EVENT_NAME + i;
UserEvent userEvent = buildConversionEvent(eventName);
eventProcessor.process(userEvent);
eventHandlerRule.expectConversion(eventName, USER_ID);
}
eventProcessor.process(userEvent);
eventHandlerRule.expectConversion(EVENT_NAME, USER_ID);

if (!countDownLatch.await(MAX_DURATION_MS * 3, TimeUnit.MILLISECONDS)) {
eventProcessor.close();

if (!countDownLatch.await( TIMEOUT_MS * 3, TimeUnit.MILLISECONDS)) {
fail("Exceeded timeout waiting for events to flush.");
}

Expand Down