Skip to content

Commit a175371

Browse files
Initial implementation of the OpenFeature SDK
1 parent 7a4d8d0 commit a175371

File tree

51 files changed

+3880
-3
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+3880
-3
lines changed

.github/CODEOWNERS

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,3 +139,10 @@ dd-trace-core/src/test/groovy/datadog/trace/llmobs/ @DataDog/ml-observability
139139
/internal-api/src/test/groovy/datadog/trace/api/rum/ @DataDog/rum
140140
/telemetry/src/main/java/datadog/telemetry/rum/ @DataDog/rum
141141
/telemetry/src/test/groovy/datadog/telemetry/rum/ @DataDog/rum
142+
143+
144+
# @DataDog/feature-flagging-and-experimentation-sdk
145+
/internal-api/src/main/java/datadog/trace/api/featureflag/ @DataDog/feature-flagging-and-experimentation-sdk
146+
/internal-api/src/test/groovy/datadog/trace/api/featureflag/ @DataDog/feature-flagging-and-experimentation-sdk
147+
/dd-java-agent/agent-feature-flagging/ @DataDog/feature-flagging-and-experimentation-sdk
148+
/products/openfeature/ @DataDog/feature-flagging-and-experimentation-sdk

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import datadog.trace.api.config.CrashTrackingConfig;
3333
import datadog.trace.api.config.CwsConfig;
3434
import datadog.trace.api.config.DebuggerConfig;
35+
import datadog.trace.api.config.FeatureFlaggingConfig;
3536
import datadog.trace.api.config.GeneralConfig;
3637
import datadog.trace.api.config.IastConfig;
3738
import datadog.trace.api.config.JmxFetchConfig;
@@ -125,7 +126,8 @@ private enum AgentFeature {
125126
DATA_JOBS(GeneralConfig.DATA_JOBS_ENABLED, false),
126127
AGENTLESS_LOG_SUBMISSION(GeneralConfig.AGENTLESS_LOG_SUBMISSION_ENABLED, false),
127128
LLMOBS(LlmObsConfig.LLMOBS_ENABLED, false),
128-
LLMOBS_AGENTLESS(LlmObsConfig.LLMOBS_AGENTLESS_ENABLED, false);
129+
LLMOBS_AGENTLESS(LlmObsConfig.LLMOBS_AGENTLESS_ENABLED, false),
130+
FEATURE_FLAGGING(FeatureFlaggingConfig.FLAGGING_PROVIDER_ENABLED, false);
129131

130132
private final String configKey;
131133
private final String systemProp;
@@ -184,6 +186,7 @@ public boolean isEnabledByDefault() {
184186
private static boolean codeOriginEnabled = false;
185187
private static boolean distributedDebuggerEnabled = false;
186188
private static boolean agentlessLogSubmissionEnabled = false;
189+
private static boolean featureFlaggingEnabled = false;
187190

188191
private static void safelySetContextClassLoader(ClassLoader classLoader) {
189192
try {
@@ -268,6 +271,7 @@ public static void start(
268271
codeOriginEnabled = isFeatureEnabled(AgentFeature.CODE_ORIGIN);
269272
agentlessLogSubmissionEnabled = isFeatureEnabled(AgentFeature.AGENTLESS_LOG_SUBMISSION);
270273
llmObsEnabled = isFeatureEnabled(AgentFeature.LLMOBS);
274+
featureFlaggingEnabled = isFeatureEnabled(AgentFeature.FEATURE_FLAGGING);
271275

272276
// setup writers when llmobs is enabled to accomodate apm and llmobs
273277
if (llmObsEnabled) {
@@ -662,6 +666,7 @@ public void execute() {
662666
maybeStartDebugger(instrumentation, scoClass, sco);
663667
maybeStartRemoteConfig(scoClass, sco);
664668
maybeStartAiGuard();
669+
maybeStartFeatureFlagging(scoClass, sco);
665670

666671
if (telemetryEnabled) {
667672
startTelemetry(instrumentation, scoClass, sco);
@@ -1083,6 +1088,23 @@ private static void maybeStartLLMObs(Instrumentation inst, Class<?> scoClass, Ob
10831088
}
10841089
}
10851090

1091+
private static void maybeStartFeatureFlagging(final Class<?> scoClass, final Object sco) {
1092+
if (featureFlaggingEnabled) {
1093+
StaticEventLogger.begin("Feature Flagging");
1094+
1095+
try {
1096+
final Class<?> ffSysClass =
1097+
AGENT_CLASSLOADER.loadClass("com.datadog.featureflag.FeatureFlaggingSystem");
1098+
final Method ffSysMethod = ffSysClass.getMethod("start", scoClass);
1099+
ffSysMethod.invoke(null, sco);
1100+
} catch (final Throwable e) {
1101+
log.warn("Not starting Feature Flagging subsystem", e);
1102+
}
1103+
1104+
StaticEventLogger.end("Feature Flagging");
1105+
}
1106+
}
1107+
10861108
private static void maybeInstallLogsIntake(Class<?> scoClass, Object sco) {
10871109
if (agentlessLogSubmissionEnabled) {
10881110
StaticEventLogger.begin("Logs Intake");
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
2+
3+
plugins {
4+
id 'com.gradleup.shadow'
5+
}
6+
7+
apply from: "$rootDir/gradle/java.gradle"
8+
apply from: "$rootDir/gradle/version.gradle"
9+
10+
java {
11+
sourceCompatibility = JavaVersion.VERSION_1_8
12+
targetCompatibility = JavaVersion.VERSION_1_8
13+
}
14+
15+
excludedClassesCoverage += [
16+
// POJOs
17+
'com.datadog.featureflag.ExposureCache.Key',
18+
'com.datadog.featureflag.ExposureCache.Value'
19+
]
20+
21+
dependencies {
22+
api libs.slf4j
23+
implementation libs.moshi
24+
implementation libs.jctools
25+
26+
api project(':dd-trace-api')
27+
compileOnly project(':dd-trace-core')
28+
implementation project(':internal-api')
29+
implementation project(':communication')
30+
31+
testImplementation project(':utils:test-utils')
32+
testImplementation project(':dd-java-agent:testing')
33+
}
34+
35+
tasks.named("shadowJar", ShadowJar) {
36+
dependencies deps.excludeShared
37+
}
38+
39+
tasks.named("jar", Jar) {
40+
archiveClassifier = 'unbundled'
41+
}
42+
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.datadog.featureflag;
2+
3+
import datadog.trace.api.featureflag.exposure.ExposureEvent;
4+
import java.util.Objects;
5+
6+
public interface ExposureCache {
7+
8+
boolean add(ExposureEvent event);
9+
10+
Value get(Key key);
11+
12+
int size();
13+
14+
final class Key {
15+
public final String flag;
16+
public final String subject;
17+
18+
public Key(final ExposureEvent event) {
19+
this.flag = event.flag == null ? null : event.flag.key;
20+
this.subject = event.subject == null ? null : event.subject.id;
21+
}
22+
23+
@Override
24+
public boolean equals(final Object o) {
25+
if (o == null || getClass() != o.getClass()) {
26+
return false;
27+
}
28+
final Key key = (Key) o;
29+
return Objects.equals(flag, key.flag) && Objects.equals(subject, key.subject);
30+
}
31+
32+
@Override
33+
public int hashCode() {
34+
return Objects.hash(flag, subject);
35+
}
36+
}
37+
38+
final class Value {
39+
public final String variant;
40+
public final String allocation;
41+
42+
public Value(final ExposureEvent event) {
43+
this.variant = event.variant == null ? null : event.variant.key;
44+
this.allocation = event.allocation == null ? null : event.allocation.key;
45+
}
46+
47+
@Override
48+
public boolean equals(final Object o) {
49+
if (o == null || getClass() != o.getClass()) {
50+
return false;
51+
}
52+
final Value value = (Value) o;
53+
return Objects.equals(variant, value.variant) && Objects.equals(allocation, value.allocation);
54+
}
55+
56+
@Override
57+
public int hashCode() {
58+
return Objects.hash(variant, allocation);
59+
}
60+
}
61+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.datadog.featureflag;
2+
3+
import datadog.trace.api.featureflag.FeatureFlaggingGateway;
4+
5+
/**
6+
* Defines an exposure writer responsible for sending exposure events to the EVP proxy.
7+
* Implementations should use a background thread to perform these operations asynchronously.
8+
*/
9+
public interface ExposureWriter extends AutoCloseable, FeatureFlaggingGateway.ExposureListener {
10+
11+
void init();
12+
13+
void close();
14+
}
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package com.datadog.featureflag;
2+
3+
import static datadog.trace.util.AgentThreadFactory.AgentThread.FEATURE_FLAG_EXPOSURE_PROCESSOR;
4+
import static datadog.trace.util.AgentThreadFactory.newAgentThread;
5+
import static java.util.concurrent.TimeUnit.SECONDS;
6+
7+
import com.squareup.moshi.JsonAdapter;
8+
import com.squareup.moshi.Moshi;
9+
import datadog.communication.BackendApi;
10+
import datadog.communication.BackendApiFactory;
11+
import datadog.communication.ddagent.SharedCommunicationObjects;
12+
import datadog.trace.api.Config;
13+
import datadog.trace.api.featureflag.FeatureFlaggingGateway;
14+
import datadog.trace.api.featureflag.exposure.ExposureEvent;
15+
import datadog.trace.api.featureflag.exposure.ExposuresRequest;
16+
import datadog.trace.api.intake.Intake;
17+
import java.util.ArrayList;
18+
import java.util.HashMap;
19+
import java.util.List;
20+
import java.util.Map;
21+
import java.util.concurrent.TimeUnit;
22+
import okhttp3.RequestBody;
23+
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
public class ExposureWriterImpl implements ExposureWriter {
28+
29+
private static final Logger LOGGER = LoggerFactory.getLogger(ExposureWriterImpl.class);
30+
private static final int DEFAULT_CAPACITY = 1 << 16; // 65536 elements
31+
private static final int DEFAULT_FLUSH_INTERVAL_IN_SECONDS = 1;
32+
private static final int FLUSH_THRESHOLD = 100;
33+
34+
private final MpscBlockingConsumerArrayQueue<ExposureEvent> queue;
35+
private final Thread serializerThread;
36+
37+
public ExposureWriterImpl(final SharedCommunicationObjects sco, final Config config) {
38+
this(DEFAULT_CAPACITY, DEFAULT_FLUSH_INTERVAL_IN_SECONDS, SECONDS, sco, config);
39+
}
40+
41+
ExposureWriterImpl(
42+
final int capacity,
43+
final long flushInterval,
44+
final TimeUnit timeUnit,
45+
final SharedCommunicationObjects sco,
46+
final Config config) {
47+
this.queue = new MpscBlockingConsumerArrayQueue<>(capacity);
48+
final Map<String, String> context = new HashMap<>(4);
49+
context.put("service", config.getServiceName() == null ? "unknown" : config.getServiceName());
50+
if (config.getEnv() != null) {
51+
context.put("env", config.getEnv());
52+
}
53+
if (config.getVersion() != null) {
54+
context.put("version", config.getVersion());
55+
}
56+
final ExposureSerializingHandler serializer =
57+
new ExposureSerializingHandler(
58+
new BackendApiFactory(config, sco),
59+
queue,
60+
flushInterval,
61+
timeUnit,
62+
context,
63+
this::close);
64+
this.serializerThread = newAgentThread(FEATURE_FLAG_EXPOSURE_PROCESSOR, serializer);
65+
}
66+
67+
@Override
68+
public void init() {
69+
FeatureFlaggingGateway.addExposureListener(this);
70+
this.serializerThread.start();
71+
}
72+
73+
@Override
74+
public void close() {
75+
FeatureFlaggingGateway.removeExposureListener(this);
76+
if (this.serializerThread.isAlive()) {
77+
this.serializerThread.interrupt();
78+
}
79+
}
80+
81+
@Override
82+
public void accept(final ExposureEvent event) {
83+
queue.offer(event);
84+
}
85+
86+
private static class ExposureSerializingHandler implements Runnable {
87+
private final MpscBlockingConsumerArrayQueue<ExposureEvent> queue;
88+
private final long ticksRequiredToFlush;
89+
private long lastTicks;
90+
91+
private final JsonAdapter<ExposuresRequest> jsonAdapter;
92+
private final BackendApiFactory backendApiFactory;
93+
private BackendApi evp;
94+
95+
private final Map<String, String> context;
96+
private final ExposureCache cache;
97+
98+
private final List<ExposureEvent> buffer = new ArrayList<>();
99+
private final Runnable errorCallback;
100+
101+
public ExposureSerializingHandler(
102+
final BackendApiFactory backendApiFactory,
103+
final MpscBlockingConsumerArrayQueue<ExposureEvent> queue,
104+
final long flushInterval,
105+
final TimeUnit timeUnit,
106+
final Map<String, String> context,
107+
final Runnable errorCallback) {
108+
this.queue = queue;
109+
this.cache = new LRUExposureCache(queue.capacity());
110+
this.jsonAdapter = new Moshi.Builder().build().adapter(ExposuresRequest.class);
111+
this.backendApiFactory = backendApiFactory;
112+
this.context = context;
113+
114+
this.lastTicks = System.nanoTime();
115+
this.ticksRequiredToFlush = timeUnit.toNanos(flushInterval);
116+
117+
this.errorCallback = errorCallback;
118+
119+
LOGGER.debug("starting exposure serializer");
120+
}
121+
122+
@Override
123+
public void run() {
124+
evp = backendApiFactory.createBackendApi(Intake.EVENT_PLATFORM);
125+
if (evp == null) {
126+
errorCallback.run();
127+
throw new IllegalArgumentException("EVP Proxy not available");
128+
}
129+
try {
130+
runDutyCycle();
131+
} catch (InterruptedException e) {
132+
Thread.currentThread().interrupt();
133+
}
134+
LOGGER.debug("exposure processor worker exited. submitting exposures stopped.");
135+
}
136+
137+
private void runDutyCycle() throws InterruptedException {
138+
final Thread thread = Thread.currentThread();
139+
while (!thread.isInterrupted()) {
140+
ExposureEvent event;
141+
while ((event = queue.poll(100, TimeUnit.MILLISECONDS)) != null) {
142+
if (addToBuffer(event)) {
143+
consumeBatch();
144+
break;
145+
}
146+
}
147+
flushIfNecessary();
148+
}
149+
}
150+
151+
private void consumeBatch() {
152+
queue.drain(this::addToBuffer, queue.size());
153+
}
154+
155+
/** Adds an element to the buffer taking care of duplicated exposures thanks to the LRU cache */
156+
private boolean addToBuffer(final ExposureEvent event) {
157+
if (cache.add(event)) {
158+
buffer.add(event);
159+
return true;
160+
}
161+
return false;
162+
}
163+
164+
protected void flushIfNecessary() {
165+
if (buffer.isEmpty()) {
166+
return;
167+
}
168+
if (shouldFlush()) {
169+
try {
170+
final ExposuresRequest exposures = new ExposuresRequest(this.context, this.buffer);
171+
final String reqBod = jsonAdapter.toJson(exposures);
172+
final RequestBody requestBody =
173+
RequestBody.create(okhttp3.MediaType.parse("application/json"), reqBod);
174+
evp.post("exposures", requestBody, stream -> null, null, false);
175+
this.buffer.clear();
176+
} catch (Exception e) {
177+
LOGGER.error("Could not submit exposures", e);
178+
}
179+
}
180+
}
181+
182+
private boolean shouldFlush() {
183+
long nanoTime = System.nanoTime();
184+
long ticks = nanoTime - lastTicks;
185+
if (ticks > ticksRequiredToFlush || queue.size() >= FLUSH_THRESHOLD) {
186+
lastTicks = nanoTime;
187+
return true;
188+
}
189+
return false;
190+
}
191+
}
192+
}

0 commit comments

Comments
 (0)