Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

import static dev.openfeature.contrib.providers.flagd.resolver.common.Convert.convertProtobufMapToStructure;

import com.google.protobuf.Struct;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FeatureFlag;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser;
import dev.openfeature.contrib.providers.flagd.resolver.process.model.ParsingResult;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
import dev.openfeature.sdk.ImmutableStructure;
import dev.openfeature.sdk.Structure;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand Down Expand Up @@ -114,7 +114,7 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
Map<String, FeatureFlag> flagMap = parsingResult.getFlags();
Map<String, Object> flagSetMetadataMap = parsingResult.getFlagSetMetadata();

Structure metadata = parseSyncMetadata(payload.getMetadataResponse());
Structure syncContext = parseSyncContext(payload.getSyncContext());
writeLock.lock();
try {
changedFlagsKeys = getChangedFlagsKeys(flagMap);
Expand All @@ -126,7 +126,7 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
writeLock.unlock();
}
if (!stateBlockingQueue.offer(
new StorageStateChange(StorageState.OK, changedFlagsKeys, metadata))) {
new StorageStateChange(StorageState.OK, changedFlagsKeys, syncContext))) {
log.warn("Failed to convey OK status, queue is full");
}
} catch (Throwable e) {
Expand All @@ -150,11 +150,13 @@ private void streamerListener(final QueueSource connector) throws InterruptedExc
log.info("Shutting down store stream listener");
}

private Structure parseSyncMetadata(GetMetadataResponse metadataResponse) {
try {
return convertProtobufMapToStructure(metadataResponse.getMetadata().getFieldsMap());
} catch (Exception exception) {
log.error("Failed to parse metadataResponse, provider metadata may not be up-to-date");
private Structure parseSyncContext(Struct syncContext) {
if (syncContext != null) {
try {
return convertProtobufMapToStructure(syncContext.getFieldsMap());
} catch (Exception exception) {
log.error("Failed to parse metadataResponse, provider metadata may not be up-to-date");
}
}
return new ImmutableStructure();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector;

import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
import com.google.protobuf.Struct;
import lombok.AllArgsConstructor;
import lombok.Getter;

Expand All @@ -10,9 +10,9 @@
public class QueuePayload {
private final QueuePayloadType type;
private final String flagData;
private final GetMetadataResponse metadataResponse;
private final Struct syncContext;

public QueuePayload(QueuePayloadType type, String flagData) {
this(type, flagData, GetMetadataResponse.getDefaultInstance());
this(type, flagData, null);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync;

import com.google.protobuf.Struct;
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector;
import dev.openfeature.contrib.providers.flagd.resolver.common.FlagdProviderEvent;
Expand Down Expand Up @@ -123,7 +124,7 @@ private void observeSyncStream() throws InterruptedException {

log.debug("Initializing sync stream request");
final GetMetadataRequest.Builder metadataRequest = GetMetadataRequest.newBuilder();
GetMetadataResponse metadataResponse = GetMetadataResponse.getDefaultInstance();
GetMetadataResponse metadataResponse = null;

// create a context which exists to track and cancel the stream
try (CancellableContext context = Context.current().withCancellation()) {
Expand Down Expand Up @@ -162,8 +163,7 @@ private void observeSyncStream() throws InterruptedException {
log.debug("Exception in stream RPC, streamException {}, will restart", streamException);
if (!outgoingQueue.offer(new QueuePayload(
QueuePayloadType.ERROR,
String.format("Error from stream: %s", streamException.getMessage()),
metadataResponse))) {
String.format("Error from stream: %s", streamException.getMessage())))) {
log.error("Failed to convey ERROR status, queue is full");
}
break;
Expand All @@ -173,7 +173,14 @@ private void observeSyncStream() throws InterruptedException {
final String data = flagsResponse.getFlagConfiguration();
log.debug("Got stream response: {}", data);

if (!outgoingQueue.offer(new QueuePayload(QueuePayloadType.DATA, data, metadataResponse))) {
Struct syncContext = null;
if (flagsResponse.hasSyncContext()) {
syncContext = flagsResponse.getSyncContext();
} else if (metadataResponse != null) {
syncContext = metadataResponse.getMetadata();
}

if (!outgoingQueue.offer(new QueuePayload(QueuePayloadType.DATA, data, syncContext))) {
log.error("Stream writing failed");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ public void setupProvider(String providerType) throws InterruptedException {
state.builder.port(container.getPort(State.resolverType));
}
break;
default:
case "syncpayload":
flagdConfig = "sync-payload";
state.builder.port(container.getPort(State.resolverType));
break;
case "stable":
this.state.providerType = ProviderType.DEFAULT;
if (State.resolverType == Config.Resolver.FILE) {

Expand All @@ -134,6 +138,8 @@ public void setupProvider(String providerType) throws InterruptedException {
state.builder.port(container.getPort(State.resolverType));
}
break;
default:
throw new IllegalStateException();
}
when().post("http://" + container.getLaunchpadUrl() + "/start?config={config}", flagdConfig)
.then()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import dev.openfeature.contrib.providers.flagd.resolver.process.model.FlagParser;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -35,10 +34,7 @@ void connectorHandling() throws Exception {

// OK for simple flag
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
payload.offer(new QueuePayload(
QueuePayloadType.DATA,
getFlagsFromResource(VALID_SIMPLE),
GetMetadataResponse.getDefaultInstance()));
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_SIMPLE)));
});

assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
Expand All @@ -47,10 +43,7 @@ void connectorHandling() throws Exception {

// STALE for invalid flag
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
payload.offer(new QueuePayload(
QueuePayloadType.DATA,
getFlagsFromResource(INVALID_FLAG),
GetMetadataResponse.getDefaultInstance()));
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(INVALID_FLAG)));
});

assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
Expand All @@ -59,8 +52,7 @@ void connectorHandling() throws Exception {

// OK again for next payload
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
payload.offer(new QueuePayload(
QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG), GetMetadataResponse.getDefaultInstance()));
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG)));
});

assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
Expand All @@ -69,7 +61,7 @@ void connectorHandling() throws Exception {

// ERROR is propagated correctly
assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
payload.offer(new QueuePayload(QueuePayloadType.ERROR, null, GetMetadataResponse.getDefaultInstance()));
payload.offer(new QueuePayload(QueuePayloadType.ERROR, null));
});

assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
Expand All @@ -93,10 +85,7 @@ public void changedFlags() throws Exception {
final BlockingQueue<StorageStateChange> storageStateDTOS = store.getStateQueue();

assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
payload.offer(new QueuePayload(
QueuePayloadType.DATA,
getFlagsFromResource(VALID_SIMPLE),
GetMetadataResponse.getDefaultInstance()));
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_SIMPLE)));
});
// flags changed for first time
assertEquals(
Expand All @@ -105,8 +94,7 @@ public void changedFlags() throws Exception {
storageStateDTOS.take().getChangedFlagsKeys());

assertTimeoutPreemptively(Duration.ofMillis(maxDelay), () -> {
payload.offer(new QueuePayload(
QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG), GetMetadataResponse.getDefaultInstance()));
payload.offer(new QueuePayload(QueuePayloadType.DATA, getFlagsFromResource(VALID_LONG)));
});
Map<String, FeatureFlag> expectedChangedFlags =
FlagParser.parseString(getFlagsFromResource(VALID_LONG), true).getFlags();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayload;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueuePayloadType;
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.QueueSource;
import dev.openfeature.flagd.grpc.sync.Sync.GetMetadataResponse;
import java.util.concurrent.BlockingQueue;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -26,8 +25,7 @@ public BlockingQueue<QueuePayload> getStreamQueue() {

public void shutdown() {
// Emit error mocking closed connection scenario
if (!mockQueue.offer(new QueuePayload(
QueuePayloadType.ERROR, "shutdown invoked", GetMetadataResponse.getDefaultInstance()))) {
if (!mockQueue.offer(new QueuePayload(QueuePayloadType.ERROR, "shutdown invoked"))) {
log.warn("Failed to offer shutdown status");
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.sync;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -12,6 +13,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.google.protobuf.Struct;
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelConnector;
import dev.openfeature.contrib.providers.flagd.resolver.common.QueueingStreamObserver;
Expand Down Expand Up @@ -73,6 +75,7 @@ void onNextEnqueuesDataPayload() throws Exception {
BlockingQueue<QueuePayload> streamQueue = connector.getStreamQueue();
QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
assertNotNull(payload);
assertNotNull(payload.getSyncContext());
assertEquals(QueuePayloadType.DATA, payload.getType());
// should NOT have restarted the stream (1 call)
verify(stub, times(1)).syncFlags(any(), any());
Expand All @@ -94,13 +97,38 @@ void onNextEnqueuesDataPayloadMetadataDisabled() throws Exception {
BlockingQueue<QueuePayload> streamQueue = connector.getStreamQueue();
QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
assertNotNull(payload);
assertNull(payload.getSyncContext());
assertEquals(QueuePayloadType.DATA, payload.getType());
// should NOT have restarted the stream (1 call)
verify(stub, times(1)).syncFlags(any(), any());
// should NOT have called getMetadata
verify(blockingStub, times(0)).getMetadata(any());
}

@Test
void onNextEnqueuesDataPayloadWithSyncContext() throws Exception {
// disable GetMetadata call
SyncStreamQueueSource connector =
new SyncStreamQueueSource(FlagdOptions.builder().build(), mockConnector, stub, blockingStub);
latch = new CountDownLatch(1);
connector.init();
latch.await();

// fire onNext (data) event
Struct syncContext = Struct.newBuilder().build();
observer.onNext(
SyncFlagsResponse.newBuilder().setSyncContext(syncContext).build());

// should enqueue data payload
BlockingQueue<QueuePayload> streamQueue = connector.getStreamQueue();
QueuePayload payload = streamQueue.poll(1000, TimeUnit.MILLISECONDS);
assertNotNull(payload);
assertEquals(syncContext, payload.getSyncContext());
assertEquals(QueuePayloadType.DATA, payload.getType());
// should NOT have restarted the stream (1 call)
verify(stub, times(1)).syncFlags(any(), any());
}

@Test
void onErrorEnqueuesDataPayload() throws Exception {
SyncStreamQueueSource connector =
Expand Down
2 changes: 1 addition & 1 deletion providers/flagd/test-harness