Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,28 @@ public CompletableResultCode export(@Nonnull Collection<SpanData> spans) {

CompletableResultCode result = new CompletableResultCode();
CompletableFuture.runAsync(
() ->
producer.send(
producerRecord,
(metadata, exception) -> {
if (exception == null) {
result.succeed();
} else {
logger.error(
String.format("Error while sending spans to Kafka topic %s", topicName),
exception);
result.fail();
}
}),
executorService);
() ->
producer.send(
producerRecord,
(metadata, exception) -> {
if (exception == null) {
result.succeed();
} else {
logger.error(
String.format("Error while sending spans to Kafka topic %s", topicName),
exception);
result.fail();
}
}),
executorService)
.whenComplete(
(ignore, exception) -> {
if (exception != null) {
logger.error(
"Executor task failed while sending to Kafka topic {}", topicName, exception);
result.fail();
}
});
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
Expand All @@ -29,7 +30,9 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
Expand All @@ -46,7 +49,7 @@
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class KafkaSpanExporterIntegrationTest {
private static final DockerImageName KAFKA_TEST_IMAGE =
DockerImageName.parse("apache/kafka:3.8.1");
DockerImageName.parse("apache/kafka:3.9.1");
private static final String TOPIC = "span_topic";
private KafkaContainer kafka;
private KafkaConsumer<String, ExportTraceServiceRequest> consumer;
Expand Down Expand Up @@ -155,6 +158,28 @@ void exportWhenProducerInError() {
testSubject.shutdown();
}

@Test
void exportWhenProducerFailsToSend() {
var mockProducer = new MockProducer<String, Collection<SpanData>>();
mockProducer.sendException = new KafkaException("Simulated kafka exception");
var testSubjectWithMockProducer =
KafkaSpanExporter.newBuilder().setTopicName(TOPIC).setProducer(mockProducer).build();

ImmutableList<SpanData> spans =
ImmutableList.of(makeBasicSpan("span-1"), makeBasicSpan("span-2"));

CompletableResultCode actual = testSubjectWithMockProducer.export(spans);

await()
.untilAsserted(
() -> {
assertThat(actual.isSuccess()).isFalse();
assertThat(actual.isDone()).isTrue();
});

testSubjectWithMockProducer.shutdown();
}

@Test
void flush() {
CompletableResultCode actual = testSubject.flush();
Expand Down
Loading