Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,30 @@
import co.elastic.apm.agent.impl.context.SpanContextImpl;
import co.elastic.apm.agent.impl.context.UrlImpl;
import co.elastic.apm.agent.impl.stacktrace.StacktraceConfigurationImpl;
import co.elastic.apm.agent.tracer.Span;
import co.elastic.apm.agent.tracer.util.ResultUtil;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
import co.elastic.apm.agent.tracer.Outcome;
import co.elastic.apm.agent.tracer.Span;
import co.elastic.apm.agent.tracer.SpanEndListener;
import co.elastic.apm.agent.tracer.pooling.Recyclable;
import co.elastic.apm.agent.tracer.util.ResultUtil;
import co.elastic.apm.agent.util.CharSequenceUtils;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public class SpanImpl extends AbstractSpanImpl<SpanImpl> implements Recyclable, Span<SpanImpl> {

/**
* Protection against excessive memory usage and span ending run times:
* We limit the maximum allowed number of end listeners.
*/
static final int MAX_END_LISTENERS = 100;
private static final Logger logger = LoggerFactory.getLogger(SpanImpl.class);
public static final long MAX_LOG_INTERVAL_MICRO_SECS = TimeUnit.MINUTES.toMicros(5);
private static long lastSpanMaxWarningTimestamp;
Expand Down Expand Up @@ -75,6 +84,9 @@ public class SpanImpl extends AbstractSpanImpl<SpanImpl> implements Recyclable,
@Nullable
private List<StackFrame> stackFrames;

private final Set<SpanEndListener<? super SpanImpl>> endListeners =
Collections.newSetFromMap(new ConcurrentHashMap<SpanEndListener<? super SpanImpl>, Boolean>());

/**
* If a span is non-discardable, all the spans leading up to it are non-discardable as well
*/
Expand Down Expand Up @@ -174,6 +186,25 @@ public SpanImpl withAction(@Nullable String action) {
return this;
}

@Override
public void addEndListener(SpanEndListener<? super SpanImpl> listener) {
if (endListeners.size() < MAX_END_LISTENERS) {
endListeners.add(listener);
} else {
if (logger.isDebugEnabled()) {
logger.warn("Not adding span end listener because limit is reached: {}," +
" throwable stacktrace will be added for debugging", listener, new Throwable());
} else {
logger.warn("Not adding span end listener because limit is reached: {}", listener);
}
}
}

@Override
public void removeEndListener(SpanEndListener<? super SpanImpl> listener) {
endListeners.remove(listener);
}


/**
* Sets span.type, span.subtype and span.action. If no subtype and action are provided, assumes the legacy usage of hierarchical
Expand Down Expand Up @@ -221,6 +252,9 @@ public String getAction() {

@Override
public void beforeEnd(long epochMicros) {
for (SpanEndListener<? super SpanImpl> endListener : endListeners) {
endListener.onEnd(this);
}
// set outcome when not explicitly set by user nor instrumentation
if (outcomeNotSet()) {
Outcome outcome;
Expand Down Expand Up @@ -476,6 +510,7 @@ public void resetState() {
super.resetState();
context.resetState();
composite.resetState();
endListeners.clear();
stacktrace = null;
subtype = null;
action = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,25 @@
import co.elastic.apm.agent.impl.sampling.ConstantSampler;
import co.elastic.apm.agent.objectpool.TestObjectPoolFactory;
import co.elastic.apm.agent.tracer.Outcome;
import co.elastic.apm.agent.tracer.Span;
import co.elastic.apm.agent.tracer.SpanEndListener;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;

public class SpanTest {

Expand Down Expand Up @@ -87,6 +93,64 @@ void testOutcomeExplicitlyToUnknown() {
assertThat(span.getOutcome()).isEqualTo(Outcome.UNKNOWN);
}

@Test
void checkEndListenersConcurrencySafe() {
TransactionImpl transaction = new TransactionImpl(tracer);
transaction.startRoot(0, ConstantSampler.of(true), BaggageImpl.EMPTY);
try {
SpanImpl span = new SpanImpl(tracer);
span.start(TraceContextImpl.fromParent(), transaction, BaggageImpl.EMPTY, -1L);

AtomicInteger invocationCounter = new AtomicInteger();
SpanEndListener<Span<?>> callback = new SpanEndListener<Span<?>>() {
@Override
public void onEnd(Span<?> span) {
span.removeEndListener(this);
invocationCounter.incrementAndGet();
}
};
span.addEndListener(callback);
span.end();
assertThat(invocationCounter.get()).isEqualTo(1);
} finally {
transaction.end();
}

}

@Test
@SuppressWarnings("unchecked")
void checkEndListenersLimit() {
TransactionImpl transaction = new TransactionImpl(tracer);
transaction.startRoot(0, ConstantSampler.of(true), BaggageImpl.EMPTY);
try {
SpanImpl span = new SpanImpl(tracer);
span.start(TraceContextImpl.fromParent(), transaction, BaggageImpl.EMPTY, -1L);

for (int i = 0; i < SpanImpl.MAX_END_LISTENERS - 1; i++) {
span.addEndListener(new SpanEndListener<SpanImpl>() {
@Override
public void onEnd(SpanImpl span) {

}
});
}

SpanEndListener<SpanImpl> invokeMe = (SpanEndListener<SpanImpl>) Mockito.mock(SpanEndListener.class);
SpanEndListener<SpanImpl> dontInvokeMe = (SpanEndListener<SpanImpl>) Mockito.mock(SpanEndListener.class);
span.addEndListener(invokeMe);
span.addEndListener(dontInvokeMe);

span.end();

verify(invokeMe).onEnd(span);
verifyNoInteractions(dontInvokeMe);
} finally {
transaction.end();
}

}

@Test
void normalizeEmptyFields() {
SpanImpl span = new SpanImpl(tracer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,17 @@ public static <T> CoderResult decodeUtf8BytesFromSource(ByteSourceReader<T> read
}
}

@Nullable
public static byte[] copyToByteArray(@Nullable ByteBuffer buf) {
if (buf == null) {
return null;
}
byte[] data = new byte[buf.position()];
buf.position(0);
buf.get(data);
return data;
}

public interface ByteSourceReader<S> {
int availableBytes(S source);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public boolean isNestedCallAndDecrement() {
return decrement() != 0;
}

private int get() {
public int get() {
Integer callDepthForCurrentThread = callDepthPerThread.get();
if (callDepthForCurrentThread == null) {
callDepthForCurrentThread = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ protected boolean isBodyCapturingSupported() {
return true;
}

@Override
public void testPostBodyCaptureForExistingSpan() throws Exception {
//TODO: async http client instrumentation does not support capturing bodies for existing spans yet
}

@Override
protected void performPost(String path, byte[] data, String contentTypeHeader) throws Exception {
final CompletableFuture<HttpResponse> responseFuture = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package co.elastic.apm.agent.httpclient;

import co.elastic.apm.agent.tracer.Span;
import co.elastic.apm.agent.tracer.SpanEndListener;
import co.elastic.apm.agent.tracer.metadata.BodyCapture;

class RequestBodyRecordingHelper implements SpanEndListener<Span<?>> {

/**
* We do not need to participate in span reference counting here.
* Instead, we only hold a reference to the span for the time it is not ended.
* This is ensured via the {@link #onEnd(Span)} callback.
*/
// Visible for testing
Span<?> clientSpan;

public RequestBodyRecordingHelper(Span<?> clientSpan) {
if (!clientSpan.isFinished()) {
this.clientSpan = clientSpan;
clientSpan.addEndListener(this);
}
}

void appendToBody(byte b) {
if (clientSpan != null) {
BodyCapture requestBody = clientSpan.getContext().getHttp().getRequestBody();
requestBody.append(b);
if (requestBody.isFull()) {
releaseSpan();
}
}
}

void appendToBody(byte[] b, int off, int len) {
if (clientSpan != null) {
BodyCapture requestBody = clientSpan.getContext().getHttp().getRequestBody();
requestBody.append(b, off, len);
if (requestBody.isFull()) {
releaseSpan();
}
}
}

void releaseSpan() {
if (clientSpan != null) {
clientSpan.removeEndListener(this);
}
clientSpan = null;
}

@Override
public void onEnd(Span<?> span) {
releaseSpan();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,60 +19,29 @@
package co.elastic.apm.agent.httpclient;

import co.elastic.apm.agent.tracer.Span;
import co.elastic.apm.agent.tracer.metadata.BodyCapture;

import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;

public class RequestBodyRecordingInputStream extends InputStream {

private final InputStream delegate;

@Nullable
private Span<?> clientSpan;
private final RequestBodyRecordingHelper recordingHelper;

public RequestBodyRecordingInputStream(InputStream delegate, Span<?> clientSpan) {
this.delegate = delegate;
clientSpan.incrementReferences();
this.clientSpan = clientSpan;
this.recordingHelper = new RequestBodyRecordingHelper(clientSpan);
}


protected void appendToBody(byte b) {
if (clientSpan != null) {
BodyCapture requestBody = clientSpan.getContext().getHttp().getRequestBody();
requestBody.append(b);
if (requestBody.isFull()) {
releaseSpan();
}
}
}

protected void appendToBody(byte[] b, int off, int len) {
if (clientSpan != null) {
BodyCapture requestBody = clientSpan.getContext().getHttp().getRequestBody();
requestBody.append(b, off, len);
if (requestBody.isFull()) {
releaseSpan();
}
}
}

public void releaseSpan() {
if (clientSpan != null) {
clientSpan.decrementReferences();
clientSpan = null;
}
}

@Override
public int read() throws IOException {
int character = delegate.read();
if (character == -1) {
releaseSpan();
recordingHelper.releaseSpan();
} else {
appendToBody((byte) character);
recordingHelper.appendToBody((byte) character);
}
return character;
}
Expand All @@ -81,9 +50,9 @@ public int read() throws IOException {
public int read(byte[] b, int off, int len) throws IOException {
int readBytes = delegate.read(b, off, len);
if (readBytes == -1) {
releaseSpan();
recordingHelper.releaseSpan();
} else {
appendToBody(b, off, readBytes);
recordingHelper.appendToBody(b, off, readBytes);
}
return readBytes;
}
Expand All @@ -96,7 +65,7 @@ public int available() throws IOException {
@Override
public void close() throws IOException {
try {
releaseSpan();
recordingHelper.releaseSpan();
} finally {
delegate.close();
}
Expand Down
Loading