Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.4.2
dockerImageTag: 1.4.3
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.*;
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator;
import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency;
import io.airbyte.cdk.integrations.source.relationaldb.streamstatus.StreamStatusTraceEmitterIterator;
Expand All @@ -18,6 +19,7 @@
import io.airbyte.integrations.source.mongodb.state.IdType;
import io.airbyte.integrations.source.mongodb.state.MongoDbStateManager;
import io.airbyte.integrations.source.mongodb.state.MongoDbStreamState;
import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage;
import io.airbyte.protocol.models.v0.CatalogHelpers;
Expand Down Expand Up @@ -52,6 +54,8 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIterators(
final boolean decorateWithCompletedStatus) {
final boolean isEnforceSchema = config.getEnforceSchema();
final var checkpointInterval = config.getCheckpointInterval();
final String MULTIPLE_ID_TYPES_ANALYTICS_MESSAGE_KEY = "db-sources-mongo-multiple-id-types";

return streams
.stream()
.map(airbyteStream -> {
Expand All @@ -62,7 +66,10 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIterators(

final var idTypes = aggregateIdField(collection);
if (idTypes.size() > 1) {
throw new ConfigErrorException("The _id fields in a collection must be consistently typed (collection = " + collectionName + ").");
LOGGER.warn("The _id fields in this collection are not consistently typed, which may lead to data loss (collection = {}).",
collectionName);
AirbyteTraceMessageUtility
.emitAnalyticsTrace(new AirbyteAnalyticsTraceMessage().withType(MULTIPLE_ID_TYPES_ANALYTICS_MESSAGE_KEY).withValue("1"));
}

idTypes.stream().findFirst().ifPresent(idType -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -378,25 +377,6 @@ void testGetIteratorsNonEmptyInitialState() {
assertFalse(collection4.hasNext());
}

@Test
void testGetIteratorsThrowsExceptionWhenThereAreDifferentIdTypes() {
insertDocuments(COLLECTION1, List.of(
new Document(Map.of(
CURSOR_FIELD, OBJECT_ID1,
NAME_FIELD, NAME1)),
new Document(Map.of(
CURSOR_FIELD, "string-id",
NAME_FIELD, NAME2))));

final InitialSnapshotHandler initialSnapshotHandler = new InitialSnapshotHandler();
final MongoDbStateManager stateManager = mock(MongoDbStateManager.class);

final var thrown = assertThrows(ConfigErrorException.class,
() -> initialSnapshotHandler.getIterators(STREAMS, stateManager, mongoClient.getDatabase(DB_NAME),
/* MongoConstants.CHECKPOINT_INTERVAL, true */ CONFIG, false, false));
assertTrue(thrown.getMessage().contains("must be consistently typed"));
}

@Test
void testGetIteratorsThrowsExceptionWhenThereAreUnsupportedIdTypes() {
insertDocuments(COLLECTION1, List.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,23 +273,6 @@ void testUnableToExtractOffsetFromStateException() {
CONFIG));
}

@Test
void testMultipleIdTypesThrowsException() {
final Document aggregate1 = Document.parse("{\"_id\": {\"_id\": \"objectId\"}, \"count\": 1}");
final Document aggregate2 = Document.parse("{\"_id\": {\"_id\": \"string\"}, \"count\": 1}");

when(aggregateCursor.hasNext()).thenReturn(true, true, false);
when(aggregateCursor.next()).thenReturn(aggregate1, aggregate2);
doCallRealMethod().when(aggregateIterable).forEach(any(Consumer.class));

final MongoDbStateManager stateManager =
MongoDbStateManager.createStateManager(createInitialDebeziumState(InitialSnapshotStatus.IN_PROGRESS), CONFIG);

final var thrown = assertThrows(ConfigErrorException.class, () -> cdcInitializer
.createCdcIterators(mongoClient, cdcConnectorMetadataInjector, CONFIGURED_CATALOG_STREAMS, stateManager, EMITTED_AT, CONFIG));
assertTrue(thrown.getMessage().contains("must be consistently typed"));
}

@Test
void testUnsupportedIdTypeThrowsException() {
final Document aggregate = Document.parse("{\"_id\": {\"_id\": \"exotic\"}, \"count\": 1}");
Expand Down
5 changes: 3 additions & 2 deletions docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ For more information regarding configuration parameters, please see [MongoDb Doc
<summary>Expand to review</summary>

| Version | Date | Pull Request | Subject |
|:--------|:-----------| :------------------------------------------------------- |:----------------------------------------------------------------------------------------------------------|
| :------ | :--------- | :------------------------------------------------------- | :-------------------------------------------------------------------------------------------------------- |
| 1.4.3 | 2024-07-22 | [39145](https://github.com/airbytehq/airbyte/pull/39145) | Warn (vs fail) on different \_id types in collection. |
| 1.4.2 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz hearbeat. |
| 1.4.1 | 2024-06-11 | [39530](https://github.com/airbytehq/airbyte/pull/39530) | Adopt new CDK. |
| 1.4.0 | 2024-06-11 | [38238](https://github.com/airbytehq/airbyte/pull/38238) | Update mongodbv2 to use dbz 2.6.2 |
Expand Down Expand Up @@ -274,4 +275,4 @@ For more information regarding configuration parameters, please see [MongoDb Doc
| 0.1.1 | 2021-09-21 | [6364](https://github.com/airbytehq/airbyte/pull/6364) | Source MongoDb: added support via TLS/SSL |
| 0.1.0 | 2021-08-30 | [5530](https://github.com/airbytehq/airbyte/pull/5530) | New source: MongoDb ported to java |

</details>
</details>