Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.baggage.processor;

import io.opentelemetry.api.baggage.Baggage;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.logs.LogRecordProcessor;
import io.opentelemetry.sdk.logs.ReadWriteLogRecord;
import java.util.function.Predicate;

/**
* This log record processor copies attributes stored in {@link Baggage} into each newly created log
* record.
*/
public class BaggageLogRecordProcessor implements LogRecordProcessor {

/**
* Creates a new {@link BaggageLogRecordProcessor} that copies all baggage entries into the newly
* created log record.
*/
public static BaggageLogRecordProcessor allowAllBaggageKeys() {
return new BaggageLogRecordProcessor(baggageKey -> true);
}

private final Predicate<String> baggageKeyPredicate;

/**
* Creates a new {@link BaggageLogRecordProcessor} that copies only baggage entries with keys that
* pass the provided filter into the newly created log record.
*/
public BaggageLogRecordProcessor(Predicate<String> baggageKeyPredicate) {
this.baggageKeyPredicate = baggageKeyPredicate;
}

@Override
public void onEmit(Context context, ReadWriteLogRecord logRecord) {
Baggage.fromContext(context)
.forEach(
(s, baggageEntry) -> {
if (baggageKeyPredicate.test(s)) {
logRecord.setAttribute(AttributeKey.stringKey(s), baggageEntry.getValue());
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.baggage.processor;

import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizer;
import io.opentelemetry.sdk.autoconfigure.spi.AutoConfigurationCustomizerProvider;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.logs.SdkLoggerProviderBuilder;
import io.opentelemetry.sdk.trace.SdkTracerProviderBuilder;
import java.util.List;

public class BaggageProcessorCustomizer implements AutoConfigurationCustomizerProvider {
@Override
public void customize(AutoConfigurationCustomizer autoConfigurationCustomizer) {
autoConfigurationCustomizer
.addTracerProviderCustomizer(
(sdkTracerProviderBuilder, config) -> {
addSpanProcessor(sdkTracerProviderBuilder, config);
return sdkTracerProviderBuilder;
})
.addLoggerProviderCustomizer(
(sdkLoggerProviderBuilder, config) -> {
addLogRecordProcessor(sdkLoggerProviderBuilder, config);
return sdkLoggerProviderBuilder;
});
}

private static void addSpanProcessor(
SdkTracerProviderBuilder sdkTracerProviderBuilder, ConfigProperties config) {
List<String> keys =
config.getList("otel.java.experimental.span-attributes.copy-from-baggage.include");

if (keys.isEmpty()) {
return;
}

sdkTracerProviderBuilder.addSpanProcessor(createBaggageSpanProcessor(keys));
}

static BaggageSpanProcessor createBaggageSpanProcessor(List<String> keys) {
if (keys.size() == 1 && keys.get(0).equals("*")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extract isWildcard method to be reused for logger

return BaggageSpanProcessor.allowAllBaggageKeys();
}
return new BaggageSpanProcessor(keys::contains);
}

private static void addLogRecordProcessor(
SdkLoggerProviderBuilder sdkLoggerProviderBuilder, ConfigProperties config) {
List<String> keys =
config.getList("otel.java.experimental.log-attributes.copy-from-baggage.include");

if (keys.isEmpty()) {
return;
}

sdkLoggerProviderBuilder.addLogRecordProcessor(createBaggageLogRecordProcessor(keys));
}

static BaggageLogRecordProcessor createBaggageLogRecordProcessor(List<String> keys) {
if (keys.size() == 1 && keys.get(0).equals("*")) {
return BaggageLogRecordProcessor.allowAllBaggageKeys();
}
return new BaggageLogRecordProcessor(keys::contains);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1 +1 @@
io.opentelemetry.contrib.baggage.processor.BaggageSpanProcessorCustomizer
io.opentelemetry.contrib.baggage.processor.BaggageProcessorCustomizer
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.contrib.baggage.processor;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.verify;

Expand All @@ -20,14 +21,20 @@
import io.opentelemetry.sdk.autoconfigure.internal.ComponentLoader;
import io.opentelemetry.sdk.autoconfigure.internal.SpiHelper;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.autoconfigure.spi.logs.ConfigurableLogRecordExporterProvider;
import io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider;
import io.opentelemetry.sdk.logs.ReadWriteLogRecord;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
import io.opentelemetry.sdk.testing.assertj.TracesAssert;
import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter;
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
Expand All @@ -38,57 +45,69 @@
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class BaggageSpanProcessorCustomizerTest {
class BaggageProcessorCustomizerTest {

private static final String MEMORY_EXPORTER = "memory";

@Test
void test_customizer() {
assertCustomizer(Collections.emptyMap(), span -> span.hasTotalAttributeCount(0));
assertCustomizer(
Collections.singletonMap(
"otel.java.experimental.span-attributes.copy-from-baggage.include", "key"),
span -> span.hasAttribute(AttributeKey.stringKey("key"), "value"));
Map<String, String> properties = new HashMap<>();
properties.put("otel.java.experimental.span-attributes.copy-from-baggage.include", "key");
properties.put("otel.java.experimental.log-attributes.copy-from-baggage.include", "key");
// TODO try use
// AttributeAssertion attributeAssertion =
// OpenTelemetryAssertions.equalTo(AttributeKey.stringKey("key"), "value");
assertCustomizer(properties, span -> span.hasAttribute(AttributeKey.stringKey("key"), "value"));
}

private static void assertCustomizer(
Map<String, String> properties, Consumer<SpanDataAssert> spanDataAssertConsumer) {

InMemorySpanExporter spanExporter = InMemorySpanExporter.create();
InMemoryLogRecordExporter logExporter = InMemoryLogRecordExporter.create();

OpenTelemetrySdk sdk = getOpenTelemetrySdk(properties, spanExporter);
OpenTelemetrySdk sdk = getOpenTelemetrySdk(properties, spanExporter, logExporter);
try (Scope ignore = Baggage.current().toBuilder().put("key", "value").build().makeCurrent()) {
sdk.getTracer("test").spanBuilder("test").startSpan().end();
sdk.getLogsBridge().get("test").logRecordBuilder().setBody("test").emit();
}
// TODO verify log record attributes
await()
.atMost(Duration.ofSeconds(1))
.untilAsserted(
() ->
TracesAssert.assertThat(spanExporter.getFinishedSpanItems())
.hasTracesSatisfyingExactly(
trace -> trace.hasSpansSatisfyingExactly(spanDataAssertConsumer)));
() -> {
TracesAssert.assertThat(spanExporter.getFinishedSpanItems())
.hasTracesSatisfyingExactly(
trace -> trace.hasSpansSatisfyingExactly(spanDataAssertConsumer));
List<LogRecordData> finishedLogRecordItems = logExporter.getFinishedLogRecordItems();
assertThat(finishedLogRecordItems).hasSize(1);
});
}

private static OpenTelemetrySdk getOpenTelemetrySdk(
Map<String, String> properties, InMemorySpanExporter spanExporter) {
SpiHelper spiHelper =
SpiHelper.create(BaggageSpanProcessorCustomizerTest.class.getClassLoader());
Map<String, String> properties,
InMemorySpanExporter spanExporter,
InMemoryLogRecordExporter logRecordExporter) {
SpiHelper spiHelper = SpiHelper.create(BaggageProcessorCustomizerTest.class.getClassLoader());

AutoConfiguredOpenTelemetrySdkBuilder sdkBuilder =
AutoConfiguredOpenTelemetrySdk.builder()
.addPropertiesSupplier(
() ->
ImmutableMap.of(
// We set the export interval of the spans to 100 ms. The default value is 5
// We set the export interval of the spans to 10 ms. The default value is 5
// seconds.
"otel.bsp.schedule.delay",
"otel.bsp.schedule.delay", // span exporter
"10",
"otel.blrp.schedule.delay", // log exporter
"10",
"otel.traces.exporter",
MEMORY_EXPORTER,
"otel.metrics.exporter",
"none",
"otel.logs.exporter",
"none"))
MEMORY_EXPORTER))
.addPropertiesSupplier(() -> properties);
AutoConfigureUtil.setComponentLoader(
sdkBuilder,
Expand All @@ -105,6 +124,20 @@ public SpanExporter createExporter(ConfigProperties configProperties) {
return spanExporter;
}

@Override
public String getName() {
return MEMORY_EXPORTER;
}
});
} else if (spiClass == ConfigurableLogRecordExporterProvider.class) {
return Collections.singletonList(
(T)
new ConfigurableLogRecordExporterProvider() {
@Override
public LogRecordExporter createExporter(ConfigProperties configProperties) {
return logRecordExporter;
}

@Override
public String getName() {
return MEMORY_EXPORTER;
Expand All @@ -120,7 +153,7 @@ public String getName() {
@Test
public void test_baggageSpanProcessor_adds_attributes_to_spans(@Mock ReadWriteSpan span) {
try (BaggageSpanProcessor processor =
BaggageSpanProcessorCustomizer.createProcessor(Collections.singletonList("*"))) {
BaggageProcessorCustomizer.createBaggageSpanProcessor(Collections.singletonList("*"))) {
try (Scope ignore = Baggage.current().toBuilder().put("key", "value").build().makeCurrent()) {
processor.onStart(Context.current(), span);
verify(span).setAttribute("key", "value");
Expand All @@ -132,7 +165,7 @@ public void test_baggageSpanProcessor_adds_attributes_to_spans(@Mock ReadWriteSp
public void test_baggageSpanProcessor_adds_attributes_to_spans_when_key_filter_matches(
@Mock ReadWriteSpan span) {
try (BaggageSpanProcessor processor =
BaggageSpanProcessorCustomizer.createProcessor(Collections.singletonList("key"))) {
BaggageProcessorCustomizer.createBaggageSpanProcessor(Collections.singletonList("key"))) {
try (Scope ignore =
Baggage.current().toBuilder()
.put("key", "value")
Expand All @@ -145,4 +178,36 @@ public void test_baggageSpanProcessor_adds_attributes_to_spans_when_key_filter_m
}
}
}

@Test
public void test_baggageLogRecordProcessor_adds_attributes_to_logRecord(
@Mock ReadWriteLogRecord logRecord) {
try (BaggageLogRecordProcessor processor =
BaggageProcessorCustomizer.createBaggageLogRecordProcessor(
Collections.singletonList("*"))) {
try (Scope ignore = Baggage.current().toBuilder().put("key", "value").build().makeCurrent()) {
processor.onEmit(Context.current(), logRecord);
verify(logRecord).setAttribute(AttributeKey.stringKey("key"), "value");
}
}
}

@Test
public void test_baggageLogRecordProcessor_adds_attributes_to_spans_when_key_filter_matches(
@Mock ReadWriteLogRecord logRecord) {
try (BaggageLogRecordProcessor processor =
BaggageProcessorCustomizer.createBaggageLogRecordProcessor(
Collections.singletonList("key"))) {
try (Scope ignore =
Baggage.current().toBuilder()
.put("key", "value")
.put("other", "value")
.build()
.makeCurrent()) {
processor.onEmit(Context.current(), logRecord);
verify(logRecord).setAttribute(AttributeKey.stringKey("key"), "value");
verify(logRecord, Mockito.never()).setAttribute(AttributeKey.stringKey("other"), "value");
}
}
}
}
Loading