Skip to content

Commit 3a3e058

Browse files
authored
[DB-sources] : Improve heartbeat logic (#40516)
1 parent b66d528 commit 3a3e058

File tree

20 files changed

+79
-74
lines changed

20 files changed

+79
-74
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ corresponds to that version.
174174

175175
| Version | Date | Pull Request | Subject |
176176
|:--------| :--------- | :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| 0.40.7 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. |
177178
| 0.40.5 | 2024-06-26 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
178179
| 0.35.16 | 2024-06-25 | [\#40517](https://github.com/airbytehq/airbyte/pull/40517) | (backport) JdbcDatabase.executeWithinTransaction allows disabling SQL statement logging |
179180
| 0.40.4 | 2024-06-18 | [\#40254](https://github.com/airbytehq/airbyte/pull/40254) | Destinations: Do not throw on unrecognized airbyte message type (ignore message instead) |

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/DbAnalyticsUtils.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ object DbAnalyticsUtils {
1414
const val CDC_CURSOR_INVALID_KEY: String = "db-sources-cdc-cursor-invalid"
1515
const val DATA_TYPES_SERIALIZATION_ERROR_KEY = "db-sources-data-serialization-error"
1616
const val CDC_SNAPSHOT_FORCE_SHUTDOWN_KEY = "db-sources-snapshot-force-shutdown"
17+
const val DEBEZIUM_CLOSE_REASON_KEY = "db-sources-debezium-close-reason"
1718

1819
@JvmStatic
1920
fun cdcCursorInvalidMessage(): AirbyteAnalyticsTraceMessage {
@@ -33,4 +34,9 @@ object DbAnalyticsUtils {
3334
.withType(CDC_SNAPSHOT_FORCE_SHUTDOWN_KEY)
3435
.withValue("1")
3536
}
37+
38+
@JvmStatic
39+
fun debeziumCloseReasonMessage(reason: String): AirbyteAnalyticsTraceMessage {
40+
return AirbyteAnalyticsTraceMessage().withType(DEBEZIUM_CLOSE_REASON_KEY).withValue(reason)
41+
}
3642
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.40.5
1+
version=0.40.7

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,20 +80,21 @@ class AirbyteDebeziumHandler<T>(
8080
cdcSavedInfoFetcher.savedOffset,
8181
if (addDbNameToOffsetState)
8282
Optional.ofNullable<String>(config[JdbcUtils.DATABASE_KEY].asText())
83-
else Optional.empty<String>()
83+
else Optional.empty<String>(),
8484
)
8585
val schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage> =
8686
if (trackSchemaHistory)
8787
Optional.of<AirbyteSchemaHistoryStorage>(
8888
AirbyteSchemaHistoryStorage.Companion.initializeDBHistory(
8989
cdcSavedInfoFetcher.savedSchemaHistory,
90-
cdcStateHandler.compressSchemaHistoryForState()
91-
)
90+
cdcStateHandler.compressSchemaHistoryForState(),
91+
),
9292
)
9393
else Optional.empty<AirbyteSchemaHistoryStorage>()
9494
val publisher = DebeziumRecordPublisher(debeziumPropertiesManager)
9595
val queue: CapacityReportingBlockingQueue<ChangeEvent<String?, String?>> =
9696
CapacityReportingBlockingQueue(queueSize)
97+
9798
publisher.start(queue, offsetManager, schemaHistoryManager)
9899
// handle state machine around pub/sub logic.
99100
val eventIterator: AutoCloseableIterator<ChangeEventWithMetadata> =
@@ -102,13 +103,14 @@ class AirbyteDebeziumHandler<T>(
102103
targetPosition,
103104
{ publisher.hasClosed() },
104105
DebeziumShutdownProcedure(queue, { publisher.close() }, { publisher.hasClosed() }),
105-
firstRecordWaitTime
106+
firstRecordWaitTime,
107+
config
106108
)
107109

108110
val syncCheckpointDuration =
109111
if (config.has(DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY))
110112
Duration.ofSeconds(
111-
config[DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY].asLong()
113+
config[DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION_PROPERTY].asLong(),
112114
)
113115
else DebeziumIteratorConstants.SYNC_CHECKPOINT_DURATION
114116
val syncCheckpointRecords =
@@ -122,7 +124,7 @@ class AirbyteDebeziumHandler<T>(
122124
targetPosition,
123125
eventConverter,
124126
offsetManager,
125-
schemaHistoryManager
127+
schemaHistoryManager,
126128
)
127129

128130
// Usually sourceStateIterator requires airbyteStream as input. For DBZ iterator, stream is
@@ -133,7 +135,7 @@ class AirbyteDebeziumHandler<T>(
133135
eventIterator,
134136
null,
135137
messageProducer,
136-
StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration)
138+
StateEmitFrequency(syncCheckpointRecords, syncCheckpointDuration),
137139
)
138140
return AutoCloseableIterators.fromIterator(iterator)
139141
}

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIterator.kt

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
*/
44
package io.airbyte.cdk.integrations.debezium.internals
55

6+
import com.fasterxml.jackson.databind.JsonNode
67
import com.google.common.annotations.VisibleForTesting
78
import com.google.common.collect.AbstractIterator
9+
import io.airbyte.cdk.db.DbAnalyticsUtils.debeziumCloseReasonMessage
10+
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility
811
import io.airbyte.cdk.integrations.debezium.CdcTargetPosition
912
import io.airbyte.commons.lang.MoreBooleans
1013
import io.airbyte.commons.util.AutoCloseableIterator
@@ -36,6 +39,7 @@ class DebeziumRecordIterator<T>(
3639
private val publisherStatusSupplier: Supplier<Boolean>,
3740
private val debeziumShutdownProcedure: DebeziumShutdownProcedure<ChangeEvent<String?, String?>>,
3841
private val firstRecordWaitTime: Duration,
42+
private val config: JsonNode
3943
) : AbstractIterator<ChangeEventWithMetadata>(), AutoCloseableIterator<ChangeEventWithMetadata> {
4044
private val heartbeatEventSourceField: MutableMap<Class<out ChangeEvent<*, *>?>, Field?> =
4145
HashMap(1)
@@ -82,7 +86,8 @@ class DebeziumRecordIterator<T>(
8286
String.format(
8387
"No records were returned by Debezium in the timeout seconds %s, closing the engine and iterator",
8488
waitTime.seconds
85-
)
89+
),
90+
DebeziumCloseReason.TIMEOUT
8691
)
8792
}
8893
LOGGER.info { "no record found. polling again." }
@@ -101,12 +106,16 @@ class DebeziumRecordIterator<T>(
101106
// too long
102107
if (targetPosition.reachedTargetPosition(heartbeatPos)) {
103108
requestClose(
104-
"Closing: Heartbeat indicates sync is done by reaching the target position"
109+
"Closing: Heartbeat indicates sync is done by reaching the target position",
110+
DebeziumCloseReason.HEARTBEAT_REACHED_TARGET_POSITION
105111
)
106112
} else if (
107113
heartbeatPos == this.lastHeartbeatPosition && heartbeatPosNotChanging()
108114
) {
109-
requestClose("Closing: Heartbeat indicates sync is not progressing")
115+
requestClose(
116+
"Closing: Heartbeat indicates sync is not progressing",
117+
DebeziumCloseReason.HEARTBEAT_NOT_PROGRESSING
118+
)
110119
}
111120

112121
if (heartbeatPos != lastHeartbeatPosition) {
@@ -122,7 +131,10 @@ class DebeziumRecordIterator<T>(
122131
// if the last record matches the target file position, it is time to tell the producer
123132
// to shutdown.
124133
if (targetPosition.reachedTargetPosition(changeEventWithMetadata)) {
125-
requestClose("Closing: Change event reached target position")
134+
requestClose(
135+
"Closing: Change event reached target position",
136+
DebeziumCloseReason.CHANGE_EVENT_REACHED_TARGET_POSITION
137+
)
126138
}
127139
this.tsLastHeartbeat = null
128140
this.receivedFirstRecord = true
@@ -175,7 +187,7 @@ class DebeziumRecordIterator<T>(
175187
*/
176188
@Throws(Exception::class)
177189
override fun close() {
178-
requestClose("Closing: Iterator closing")
190+
requestClose("Closing: Iterator closing", DebeziumCloseReason.ITERATOR_CLOSE)
179191
}
180192

181193
private fun isHeartbeatEvent(event: ChangeEvent<String?, String?>): Boolean {
@@ -185,23 +197,25 @@ class DebeziumRecordIterator<T>(
185197
}
186198

187199
private fun heartbeatPosNotChanging(): Boolean {
188-
if (this.tsLastHeartbeat == null) {
200+
// Closing debezium due to heartbeat position not changing only exists as an escape hatch
201+
// for
202+
// testing setups. In production, we rely on the platform heartbeats to kill the sync
203+
if (!isTest() || this.tsLastHeartbeat == null) {
189204
return false
190205
}
191206
val timeElapsedSinceLastHeartbeatTs =
192207
Duration.between(this.tsLastHeartbeat, LocalDateTime.now())
193-
LOGGER.info {
194-
"Time since last hb_pos change ${timeElapsedSinceLastHeartbeatTs.toSeconds()}s"
195-
}
196-
// wait time for no change in heartbeat position is half of initial waitTime
197208
return timeElapsedSinceLastHeartbeatTs.compareTo(firstRecordWaitTime.dividedBy(2)) > 0
198209
}
199210

200-
private fun requestClose(closeLogMessage: String) {
211+
private fun requestClose(closeLogMessage: String, closeReason: DebeziumCloseReason) {
201212
if (signalledDebeziumEngineShutdown) {
202213
return
203214
}
204215
LOGGER.info { closeLogMessage }
216+
AirbyteTraceMessageUtility.emitAnalyticsTrace(
217+
debeziumCloseReasonMessage(closeReason.toString())
218+
)
205219
debeziumShutdownProcedure.initiateShutdownProcedure()
206220
signalledDebeziumEngineShutdown = true
207221
}
@@ -212,6 +226,10 @@ class DebeziumRecordIterator<T>(
212226
}
213227
}
214228

229+
private fun isTest(): Boolean {
230+
return config.has("is_test") && config["is_test"].asBoolean()
231+
}
232+
215233
/**
216234
* [DebeziumRecordIterator.heartbeatEventSourceField] acts as a cache so that we avoid using
217235
* reflection to setAccessible for each event
@@ -246,5 +264,13 @@ class DebeziumRecordIterator<T>(
246264
}
247265
}
248266

267+
enum class DebeziumCloseReason() {
268+
TIMEOUT,
269+
ITERATOR_CLOSE,
270+
HEARTBEAT_REACHED_TARGET_POSITION,
271+
CHANGE_EVENT_REACHED_TARGET_POSITION,
272+
HEARTBEAT_NOT_PROGRESSING
273+
}
274+
249275
companion object {}
250276
}

airbyte-cdk/java/airbyte-cdk/db-sources/src/test/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumRecordIteratorTest.kt

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
*/
44
package io.airbyte.cdk.integrations.debezium.internals
55

6+
import com.fasterxml.jackson.databind.JsonNode
7+
import com.fasterxml.jackson.databind.ObjectMapper
68
import io.airbyte.cdk.integrations.debezium.CdcTargetPosition
79
import io.debezium.engine.ChangeEvent
810
import java.time.Duration
@@ -34,6 +36,7 @@ class DebeziumRecordIteratorTest {
3436
{ false },
3537
mock(),
3638
Duration.ZERO,
39+
getTestConfig(), // Heartbeats should not be ignored for tests.
3740
)
3841
val lsn =
3942
debeziumRecordIterator.getHeartbeatPosition(
@@ -44,7 +47,7 @@ class DebeziumRecordIteratorTest {
4447
Collections.singletonMap("lsn", 358824993496L),
4548
null,
4649
null,
47-
null
50+
null,
4851
)
4952

5053
override fun key(): String? {
@@ -62,9 +65,15 @@ class DebeziumRecordIteratorTest {
6265
fun sourceRecord(): SourceRecord {
6366
return sourceRecord
6467
}
65-
}
68+
},
6669
)
6770

6871
Assertions.assertEquals(lsn, 358824993496L)
6972
}
73+
74+
fun getTestConfig(): JsonNode {
75+
val mapper: ObjectMapper = ObjectMapper()
76+
val testConfig = "{\"is_test\": true}"
77+
return mapper.readTree(testConfig)
78+
}
7079
}

airbyte-integrations/connectors/source-mongodb-v2/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ plugins {
33
}
44

55
airbyteJavaConnector {
6-
cdkVersionRequired = '0.38.1'
6+
cdkVersionRequired = '0.40.7'
77
features = ['db-sources', 'datastore-mongo']
88
useLocalCdk = false
99
}

airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ data:
88
connectorSubtype: database
99
connectorType: source
1010
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
11-
dockerImageTag: 1.4.1
11+
dockerImageTag: 1.4.2
1212
dockerRepository: airbyte/source-mongodb-v2
1313
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
1414
githubIssueLabel: source-mongodb-v2

airbyte-integrations/connectors/source-mssql/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ plugins {
33
}
44

55
airbyteJavaConnector {
6-
cdkVersionRequired = '0.40.1'
6+
cdkVersionRequired = '0.40.7'
77
features = ['db-sources']
88
useLocalCdk = false
99
}

airbyte-integrations/connectors/source-mssql/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
12-
dockerImageTag: 4.0.33
12+
dockerImageTag: 4.0.34
1313
dockerRepository: airbyte/source-mssql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
1515
githubIssueLabel: source-mssql

0 commit comments

Comments
 (0)