Skip to content

Commit a951624

Browse files
authored
destination-async-framework: move the state emission logic into GlobalAsyncStateManager (#35240)
1 parent a886ace commit a951624

File tree

13 files changed

+312
-157
lines changed

13 files changed

+312
-157
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ MavenLocal debugging steps:
166166

167167
| Version | Date | Pull Request | Subject |
168168
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
169+
| 0.20.9 | 2024-02-15 | [\#35240](https://github.com/airbytehq/airbyte/pull/35240) | Make state emission to platform inside state manager itself. |
169170
| 0.20.8 | 2024-02-15 | [\#35285](https://github.com/airbytehq/airbyte/pull/35285) | Improve blobstore module structure. |
170171
| 0.20.7 | 2024-02-13 | [\#35236](https://github.com/airbytehq/airbyte/pull/35236) | output logs to files in addition to stdout when running tests |
171172
| 0.20.6 | 2024-02-12 | [\#35036](https://github.com/airbytehq/airbyte/pull/35036) | Add trace utility to emit analytics messages. |

airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/FlushWorkers.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,8 @@
88
import io.airbyte.cdk.integrations.destination_async.buffers.StreamAwareQueue.MessageWithMeta;
99
import io.airbyte.cdk.integrations.destination_async.state.FlushFailure;
1010
import io.airbyte.cdk.integrations.destination_async.state.GlobalAsyncStateManager;
11-
import io.airbyte.cdk.integrations.destination_async.state.PartialStateWithDestinationStats;
12-
import io.airbyte.commons.json.Jsons;
1311
import io.airbyte.protocol.models.v0.AirbyteMessage;
1412
import io.airbyte.protocol.models.v0.StreamDescriptor;
15-
import java.util.List;
1613
import java.util.Map;
1714
import java.util.Optional;
1815
import java.util.UUID;
@@ -67,8 +64,6 @@ public class FlushWorkers implements AutoCloseable {
6764
private final AtomicBoolean isClosing;
6865
private final GlobalAsyncStateManager stateManager;
6966

70-
private final Object LOCK = new Object();
71-
7267
public FlushWorkers(final BufferDequeue bufferDequeue,
7368
final DestinationFlushFunction flushFunction,
7469
final Consumer<AirbyteMessage> outputRecordCollector,
@@ -172,7 +167,7 @@ private void flush(final StreamDescriptor desc, final UUID flushWorkerId) {
172167
AirbyteFileUtils.byteCountToDisplaySize(batch.getSizeInBytes()));
173168

174169
flusher.flush(desc, batch.getData().stream().map(MessageWithMeta::message));
175-
emitStateMessages(batch.flushStates(stateIdToCount));
170+
batch.flushStates(stateIdToCount, outputRecordCollector);
176171
}
177172

178173
log.info("Flush Worker ({}) -- Worker finished flushing. Current queue size: {}",
@@ -222,7 +217,7 @@ public void close() throws Exception {
222217
log.info("Closing flush workers -- all buffers flushed");
223218

224219
// before shutting down the supervisor, flush all state.
225-
emitStateMessages(stateManager.flushStates());
220+
stateManager.flushStates(outputRecordCollector);
226221
supervisorThread.shutdown();
227222
while (!supervisorThread.awaitTermination(5L, TimeUnit.MINUTES)) {
228223
log.info("Waiting for flush worker supervisor to shut down");
@@ -239,17 +234,6 @@ public void close() throws Exception {
239234
debugLoop.shutdownNow();
240235
}
241236

242-
private void emitStateMessages(final List<PartialStateWithDestinationStats> partials) {
243-
synchronized (LOCK) {
244-
for (final PartialStateWithDestinationStats partial : partials) {
245-
final AirbyteMessage message = Jsons.deserialize(partial.stateMessage().getSerialized(), AirbyteMessage.class);
246-
message.getState().setDestinationStats(partial.stats());
247-
log.info("State with arrival number {} emitted from thread {}", partial.stateArrivalNumber(), Thread.currentThread().getName());
248-
outputRecordCollector.accept(message);
249-
}
250-
}
251-
}
252-
253237
private static String humanReadableFlushWorkerId(final UUID flushWorkerId) {
254238
return flushWorkerId.toString().substring(0, 5);
255239
}

airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/buffers/MemoryAwareMessageBatch.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77
import io.airbyte.cdk.integrations.destination_async.GlobalMemoryManager;
88
import io.airbyte.cdk.integrations.destination_async.buffers.StreamAwareQueue.MessageWithMeta;
99
import io.airbyte.cdk.integrations.destination_async.state.GlobalAsyncStateManager;
10-
import io.airbyte.cdk.integrations.destination_async.state.PartialStateWithDestinationStats;
10+
import io.airbyte.protocol.models.v0.AirbyteMessage;
1111
import java.util.List;
1212
import java.util.Map;
13+
import java.util.function.Consumer;
1314
import org.slf4j.Logger;
1415
import org.slf4j.LoggerFactory;
1516

@@ -57,16 +58,13 @@ public void close() throws Exception {
5758
}
5859

5960
/**
60-
* For the batch, marks all the states that have now been flushed. Also returns states that can be
61-
* flushed. This method is descriptrive, it assumes that whatever consumes the state messages emits
62-
* them, internally it purges the states it returns. message that it can.
61+
* For the batch, marks all the states that have now been flushed. Also writes the states that can
62+
* be flushed back to platform via stateManager.
6363
* <p>
64-
*
65-
* @return list of states that can be flushed
6664
*/
67-
public List<PartialStateWithDestinationStats> flushStates(final Map<Long, Long> stateIdToCount) {
65+
public void flushStates(final Map<Long, Long> stateIdToCount, final Consumer<AirbyteMessage> outputRecordCollector) {
6866
stateIdToCount.forEach(stateManager::decrement);
69-
return stateManager.flushStates();
67+
stateManager.flushStates(outputRecordCollector);
7068
}
7169

7270
}

airbyte-cdk/java/airbyte-cdk/core/src/main/java/io/airbyte/cdk/integrations/destination_async/state/GlobalAsyncStateManager.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@
1010
import com.google.common.base.Strings;
1111
import io.airbyte.cdk.integrations.destination_async.GlobalMemoryManager;
1212
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
13+
import io.airbyte.commons.json.Jsons;
1314
import io.airbyte.protocol.models.v0.AirbyteMessage;
1415
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
1516
import io.airbyte.protocol.models.v0.AirbyteStateStats;
1617
import io.airbyte.protocol.models.v0.StreamDescriptor;
17-
import java.util.ArrayList;
18+
import java.time.Instant;
1819
import java.util.Collection;
19-
import java.util.List;
2020
import java.util.Map;
2121
import java.util.Optional;
2222
import java.util.Set;
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.ConcurrentMap;
2626
import java.util.concurrent.LinkedBlockingDeque;
2727
import java.util.concurrent.atomic.AtomicLong;
28+
import java.util.function.Consumer;
2829
import lombok.extern.slf4j.Slf4j;
2930
import org.apache.commons.io.FileUtils;
3031
import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -153,16 +154,12 @@ public void decrement(final long stateId, final long count) {
153154
}
154155

155156
/**
156-
* Returns state messages with no more inflight records i.e. counter = 0 across all streams.
157+
* Flushes state messages with no more inflight records i.e. counter = 0 across all streams.
157158
* Intended to be called by {@link io.airbyte.cdk.integrations.destination_async.FlushWorkers} after
158159
* a worker has finished flushing its record batch.
159160
* <p>
160-
* The return list of states should be emitted back to the platform.
161-
*
162-
* @return list of state messages with no more inflight records.
163161
*/
164-
public List<PartialStateWithDestinationStats> flushStates() {
165-
final List<PartialStateWithDestinationStats> output = new ArrayList<>();
162+
public void flushStates(final Consumer<AirbyteMessage> outputRecordCollector) {
166163
Long bytesFlushed = 0L;
167164
synchronized (LOCK) {
168165
for (final Map.Entry<StreamDescriptor, LinkedBlockingDeque<Long>> entry : descToStateIdQ.entrySet()) {
@@ -195,8 +192,13 @@ public List<PartialStateWithDestinationStats> flushStates() {
195192
if (allRecordsCommitted) {
196193
final StateMessageWithArrivalNumber stateMessage = oldestState.getLeft();
197194
final double flushedRecordsAssociatedWithState = stateIdToCounterForPopulatingDestinationStats.get(oldestStateId).doubleValue();
198-
output.add(new PartialStateWithDestinationStats(stateMessage.partialAirbyteStateMessage(),
199-
new AirbyteStateStats().withRecordCount(flushedRecordsAssociatedWithState), stateMessage.arrivalNumber()));
195+
196+
log.info("State with arrival number {} emitted from thread {} at {}", stateMessage.arrivalNumber(), Thread.currentThread().getName(),
197+
Instant.now().toString());
198+
final AirbyteMessage message = Jsons.deserialize(stateMessage.partialAirbyteStateMessage.getSerialized(), AirbyteMessage.class);
199+
message.getState().setDestinationStats(new AirbyteStateStats().withRecordCount(flushedRecordsAssociatedWithState));
200+
outputRecordCollector.accept(message);
201+
200202
bytesFlushed += oldestState.getRight();
201203

202204
// cleanup
@@ -212,7 +214,6 @@ public List<PartialStateWithDestinationStats> flushStates() {
212214
}
213215

214216
freeBytes(bytesFlushed);
215-
return output;
216217
}
217218

218219
private Long getStateIdAndIncrement(final StreamDescriptor streamDescriptor, final long increment) {
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.20.8
1+
version=0.20.9

airbyte-cdk/java/airbyte-cdk/core/src/test/java/io/airbyte/cdk/integrations/destination_async/buffers/BufferDequeueTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ public class BufferDequeueTest {
2222

2323
private static final int RECORD_SIZE_20_BYTES = 20;
2424
private static final String DEFAULT_NAMESPACE = "foo_namespace";
25-
public static final String RECORD_20_BYTES = "abc";
2625
private static final String STREAM_NAME = "stream1";
2726
private static final StreamDescriptor STREAM_DESC = new StreamDescriptor().withName(STREAM_NAME);
2827
private static final PartialAirbyteMessage RECORD_MSG_20_BYTES = new PartialAirbyteMessage()

0 commit comments

Comments
 (0)