Skip to content
5 changes: 5 additions & 0 deletions kafka-ui-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@
<artifactId>json</artifactId>
<version>${org.json.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ protected AbstractAuthSecurityConfig() {
"/resources/**",
"/actuator/health/**",
"/actuator/info",
"/actuator/prometheus",
"/auth",
"/login",
"/logout",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,28 @@

import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import java.time.Duration;
import java.time.Instant;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;

public abstract class AbstractEmitter {

private final MessagesProcessing messagesProcessing;
private final PollingThrottler throttler;
protected final PollingSettings pollingSettings;

protected AbstractEmitter(MessagesProcessing messagesProcessing, PollingSettings pollingSettings) {
this.messagesProcessing = messagesProcessing;
this.pollingSettings = pollingSettings;
this.throttler = pollingSettings.getPollingThrottler();
}

protected ConsumerRecords<Bytes, Bytes> poll(
FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer) {
protected PolledRecords poll(
FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer) {
return poll(sink, consumer, pollingSettings.getPollTimeout());
}

protected ConsumerRecords<Bytes, Bytes> poll(
FluxSink<TopicMessageEventDTO> sink, Consumer<Bytes, Bytes> consumer, Duration timeout) {
Instant start = Instant.now();
ConsumerRecords<Bytes, Bytes> records = consumer.poll(timeout);
Instant finish = Instant.now();
int polledBytes = sendConsuming(sink, records, Duration.between(start, finish).toMillis());
throttler.throttleAfterPoll(polledBytes);
protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsumer consumer, Duration timeout) {
var records = consumer.pollEnhanced(timeout);
sendConsuming(sink, records);
return records;
}

Expand All @@ -49,10 +40,8 @@ protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
messagesProcessing.sendPhase(sink, name);
}

protected int sendConsuming(FluxSink<TopicMessageEventDTO> sink,
ConsumerRecords<Bytes, Bytes> records,
long elapsed) {
return messagesProcessing.sentConsumingInfo(sink, records, elapsed);
protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords records) {
messagesProcessing.sentConsumingInfo(sink, records);
}

protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
import java.util.TreeMap;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.Bytes;
Expand All @@ -22,12 +20,12 @@ public class BackwardRecordEmitter
extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {

private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final Supplier<EnhancedConsumer> consumerSupplier;
private final ConsumerPosition consumerPosition;
private final int messagesPerPage;

public BackwardRecordEmitter(
Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition consumerPosition,
int messagesPerPage,
MessagesProcessing messagesProcessing,
Expand All @@ -41,7 +39,7 @@ public BackwardRecordEmitter(
@Override
public void accept(FluxSink<TopicMessageEventDTO> sink) {
log.debug("Starting backward polling for {}", consumerPosition);
try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
try (EnhancedConsumer consumer = consumerSupplier.get()) {
sendPhase(sink, "Created consumer");

var seekOperations = SeekOperations.create(consumer, consumerPosition);
Expand Down Expand Up @@ -91,7 +89,7 @@ private List<ConsumerRecord<Bytes, Bytes>> partitionPollIteration(
TopicPartition tp,
long fromOffset,
long toOffset,
Consumer<Bytes, Bytes> consumer,
EnhancedConsumer consumer,
FluxSink<TopicMessageEventDTO> sink
) {
consumer.assign(Collections.singleton(tp));
Expand All @@ -101,21 +99,21 @@ private List<ConsumerRecord<Bytes, Bytes>> partitionPollIteration(

var recordsToSend = new ArrayList<ConsumerRecord<Bytes, Bytes>>();

EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
EmptyPollsCounter emptyPolls = pollingSettings.createEmptyPollsCounter();
while (!sink.isCancelled()
&& !sendLimitReached()
&& recordsToSend.size() < desiredMsgsToPoll
&& !emptyPolls.noDataEmptyPollsReached()) {
var polledRecords = poll(sink, consumer, pollingSettings.getPartitionPollTimeout());
emptyPolls.count(polledRecords);
emptyPolls.count(polledRecords.count());

log.debug("{} records polled from {}", polledRecords.count(), tp);

var filteredRecords = polledRecords.records(tp).stream()
.filter(r -> r.offset() < toOffset)
.toList();

if (!polledRecords.isEmpty() && filteredRecords.isEmpty()) {
if (polledRecords.count() > 0 && filteredRecords.isEmpty()) {
// we already read all messages in target offsets interval
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

import com.provectus.kafka.ui.model.TopicMessageConsumingDTO;
import com.provectus.kafka.ui.model.TopicMessageEventDTO;
import com.provectus.kafka.ui.util.ConsumerRecordsUtil;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;

class ConsumingStats {
Expand All @@ -13,23 +10,17 @@ class ConsumingStats {
private int records = 0;
private long elapsed = 0;

/**
* returns bytes polled.
*/
int sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
ConsumerRecords<Bytes, Bytes> polledRecords,
long elapsed,
void sendConsumingEvt(FluxSink<TopicMessageEventDTO> sink,
PolledRecords polledRecords,
int filterApplyErrors) {
int polledBytes = ConsumerRecordsUtil.calculatePolledSize(polledRecords);
bytes += polledBytes;
bytes += polledRecords.bytes();
this.records += polledRecords.count();
this.elapsed += elapsed;
this.elapsed += polledRecords.elapsed().toMillis();
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.CONSUMING)
.consuming(createConsumingStats(sink, filterApplyErrors))
);
return polledBytes;
}

void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, int filterApplyErrors) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ public class EmptyPollsCounter {
this.maxEmptyPolls = maxEmptyPolls;
}

public void count(ConsumerRecords<?, ?> polled) {
emptyPolls = polled.isEmpty() ? emptyPolls + 1 : 0;
public void count(int polledCount) {
emptyPolls = polledCount == 0 ? emptyPolls + 1 : 0;
}

public boolean noDataEmptyPollsReached() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package com.provectus.kafka.ui.emitter;

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.provectus.kafka.ui.util.ApplicationMetrics;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Delegate;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.utils.Bytes;


public class EnhancedConsumer extends KafkaConsumer<Bytes, Bytes> {

private final PollingThrottler throttler;
private final ApplicationMetrics metrics;
private String pollingTopic;

public EnhancedConsumer(Properties properties,
PollingThrottler throttler,
ApplicationMetrics metrics) {
super(properties, new BytesDeserializer(), new BytesDeserializer());
this.throttler = throttler;
this.metrics = metrics;
metrics.activeConsumers().incrementAndGet();
}

public PolledRecords pollEnhanced(Duration dur) {
var stopwatch = Stopwatch.createStarted();
ConsumerRecords<Bytes, Bytes> polled = poll(dur);
PolledRecords polledEnhanced = PolledRecords.create(polled, stopwatch.elapsed());
var throttled = throttler.throttleAfterPoll(polledEnhanced.bytes());
metrics.meterPolledRecords(pollingTopic, polledEnhanced, throttled);
return polledEnhanced;
}

@Override
public void assign(Collection<TopicPartition> partitions) {
super.assign(partitions);
Set<String> assignedTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
Preconditions.checkState(assignedTopics.size() == 1);
this.pollingTopic = assignedTopics.iterator().next();
}

@Override
public void subscribe(Pattern pattern) {
throw new UnsupportedOperationException();
}

@Override
public void subscribe(Collection<String> topics) {
throw new UnsupportedOperationException();
}

@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
throw new UnsupportedOperationException();
}

@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
throw new UnsupportedOperationException();
}

@Override
public void close(Duration timeout) {
metrics.activeConsumers().decrementAndGet();
super.close(timeout);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
Expand All @@ -16,11 +14,11 @@ public class ForwardRecordEmitter
extends AbstractEmitter
implements java.util.function.Consumer<FluxSink<TopicMessageEventDTO>> {

private final Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier;
private final Supplier<EnhancedConsumer> consumerSupplier;
private final ConsumerPosition position;

public ForwardRecordEmitter(
Supplier<KafkaConsumer<Bytes, Bytes>> consumerSupplier,
Supplier<EnhancedConsumer> consumerSupplier,
ConsumerPosition position,
MessagesProcessing messagesProcessing,
PollingSettings pollingSettings) {
Expand All @@ -32,7 +30,7 @@ public ForwardRecordEmitter(
@Override
public void accept(FluxSink<TopicMessageEventDTO> sink) {
log.debug("Starting forward polling for {}", position);
try (KafkaConsumer<Bytes, Bytes> consumer = consumerSupplier.get()) {
try (EnhancedConsumer consumer = consumerSupplier.get()) {
sendPhase(sink, "Assigning partitions");
var seekOperations = SeekOperations.create(consumer, position);
seekOperations.assignAndSeekNonEmptyPartitions();
Expand All @@ -44,8 +42,8 @@ public void accept(FluxSink<TopicMessageEventDTO> sink) {
&& !emptyPolls.noDataEmptyPollsReached()) {

sendPhase(sink, "Polling");
ConsumerRecords<Bytes, Bytes> records = poll(sink, consumer);
emptyPolls.count(records);
var records = poll(sink, consumer);
emptyPolls.count(records.count());

log.debug("{} records polled", records.count());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;

Expand Down Expand Up @@ -54,13 +53,10 @@ void sendMsg(FluxSink<TopicMessageEventDTO> sink, ConsumerRecord<Bytes, Bytes> r
}
}

int sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink,
ConsumerRecords<Bytes, Bytes> polledRecords,
long elapsed) {
void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polledRecords) {
if (!sink.isCancelled()) {
return consumingStats.sendConsumingEvt(sink, polledRecords, elapsed, filterApplyErrors);
consumingStats.sendConsumingEvt(sink, polledRecords, filterApplyErrors);
}
return 0;
}

void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.provectus.kafka.ui.emitter;

import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.utils.Bytes;

public record PolledRecords(int count,
int bytes,
Duration elapsed,
ConsumerRecords<Bytes, Bytes> records) implements Iterable<ConsumerRecord<Bytes, Bytes>> {

static PolledRecords create(ConsumerRecords<Bytes, Bytes> polled, Duration pollDuration) {
return new PolledRecords(
polled.count(),
calculatePolledRecSize(polled),
pollDuration,
polled
);
}

public List<ConsumerRecord<Bytes, Bytes>> records(TopicPartition tp) {
return records.records(tp);
}

@Override
public Iterator<ConsumerRecord<Bytes, Bytes>> iterator() {
return records.iterator();
}

private static int calculatePolledRecSize(Iterable<ConsumerRecord<Bytes, Bytes>> recs) {
int polledBytes = 0;
for (ConsumerRecord<Bytes, Bytes> rec : recs) {
for (Header header : rec.headers()) {
polledBytes +=
(header.key() != null ? header.key().getBytes().length : 0)
+ (header.value() != null ? header.value().length : 0);
}
polledBytes += rec.key() == null ? 0 : rec.serializedKeySize();
polledBytes += rec.value() == null ? 0 : rec.serializedValueSize();
}
return polledBytes;
}
}
Loading