Skip to content

Commit 3559220

Browse files
authored
[FLINK-38452] Fix unable to read incremental data from MongoDB collections with dots (.) (#4148)
Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com>
1 parent c7891ed commit 3559220

File tree

3 files changed

+70
-32
lines changed

3 files changed

+70
-32
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBScanFetchTask;
3232
import org.apache.flink.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask;
3333
import org.apache.flink.cdc.connectors.mongodb.source.utils.CollectionDiscoveryUtils.CollectionDiscoveryInfo;
34+
import org.apache.flink.cdc.connectors.mongodb.source.utils.MongoUtils;
3435

3536
import com.mongodb.client.MongoClient;
3637
import io.debezium.relational.Column;
@@ -74,29 +75,11 @@ public String getName() {
7475
return "MongoDB";
7576
}
7677

77-
private static TableId parseTableId(String str) {
78-
return parseTableId(str, true);
79-
}
80-
81-
private static TableId parseTableId(String str, boolean useCatalogBeforeSchema) {
82-
String[] parts = str.split("[.]", 2);
83-
int numParts = parts.length;
84-
if (numParts == 1) {
85-
return new TableId(null, null, parts[0]);
86-
} else if (numParts == 2) {
87-
return useCatalogBeforeSchema
88-
? new TableId(parts[0], null, parts[1])
89-
: new TableId(null, parts[0], parts[1]);
90-
} else {
91-
return null;
92-
}
93-
}
94-
9578
@Override
9679
public List<TableId> discoverDataCollections(MongoDBSourceConfig sourceConfig) {
9780
CollectionDiscoveryInfo discoveryInfo = discoverAndCacheDataCollections(sourceConfig);
9881
return discoveryInfo.getDiscoveredCollections().stream()
99-
.map(MongoDBDialect::parseTableId)
82+
.map(MongoUtils::parseTableId)
10083
.collect(Collectors.toList());
10184
}
10285

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/main/java/org/apache/flink/cdc/connectors/mongodb/source/utils/MongoUtils.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,31 @@ public class MongoUtils {
101101

102102
private MongoUtils() {}
103103

104+
/**
105+
* MongoDB allows dots to be presented in collection names, but not in database names (see <a
106+
* href="https://www.mongodb.com/docs/manual/reference/limits/">docs</a> for more details). <br>
107+
* So, for a canonical TableId with multiple dots in it (like {@code foo.bar.baz}, it is
108+
* guaranteed that the first part ({@code foo}) is the database name and the latter part ({@code
109+
* bar.baz}) is the table name.
110+
*/
111+
public static TableId parseTableId(String str) {
112+
return parseTableId(str, true);
113+
}
114+
115+
public static TableId parseTableId(String str, boolean useCatalogBeforeSchema) {
116+
String[] parts = str.split("[.]", 2);
117+
int numParts = parts.length;
118+
if (numParts == 1) {
119+
return new TableId(null, null, parts[0]);
120+
} else if (numParts == 2) {
121+
return useCatalogBeforeSchema
122+
? new TableId(parts[0], null, parts[1])
123+
: new TableId(null, parts[0], parts[1]);
124+
} else {
125+
return null;
126+
}
127+
}
128+
104129
public static ChangeStreamDescriptor getChangeStreamDescriptor(
105130
MongoDBSourceConfig sourceConfig,
106131
List<String> discoveredDatabases,
@@ -115,7 +140,7 @@ public static ChangeStreamDescriptor getChangeStreamDescriptor(
115140
collectionList, discoveredCollections)) {
116141
changeStreamFilter =
117142
ChangeStreamDescriptor.collection(
118-
TableId.parse(discoveredCollections.get(0)));
143+
parseTableId(discoveredCollections.get(0)));
119144
} else {
120145
Pattern namespaceRegex =
121146
CollectionDiscoveryUtils.includeListAsFlatPattern(collectionList);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mongodb-cdc/src/test/java/org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.java

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.junit.jupiter.params.ParameterizedTest;
3131
import org.junit.jupiter.params.provider.ValueSource;
3232

33+
import java.util.Arrays;
3334
import java.util.List;
3435

3536
import static org.apache.flink.cdc.connectors.mongodb.utils.MongoDBContainer.FLINK_USER;
@@ -264,27 +265,56 @@ void testMatchDatabaseAndCollectionContainsDash(boolean parallelismSnapshot) thr
264265

265266
@ParameterizedTest(name = "parallelismSnapshot: {0}")
266267
@ValueSource(booleans = {true, false})
267-
void testMatchCollectionWithDots(boolean parallelismSnapshot) throws Exception {
268+
void testMatchCollectionWithQuotedDots(boolean parallelismSnapshot) throws Exception {
269+
testMatchCollectionWithDots(parallelismSnapshot, "[.]coll[.]name");
270+
}
271+
272+
@ParameterizedTest(name = "parallelismSnapshot: {0}")
273+
@ValueSource(booleans = {true, false})
274+
void testMatchCollectionWithUnquotedDots(boolean parallelismSnapshot) throws Exception {
275+
testMatchCollectionWithDots(parallelismSnapshot, ".coll.name");
276+
}
277+
278+
private void testMatchCollectionWithDots(boolean parallelismSnapshot, String matchExpr)
279+
throws Exception {
268280
setup(parallelismSnapshot);
269-
// 1. Given colllections:
281+
282+
// 1. Given collections:
270283
// db: [coll.name]
271284
String db = MONGO_CONTAINER.executeCommandFileInSeparateDatabase("ns-dotted");
272285

273-
TableResult result = submitTestCase(db, db + "[.]coll[.]name", parallelismSnapshot);
286+
TableResult result = submitTestCase(db, db + matchExpr, parallelismSnapshot);
274287

275288
// 2. Wait change stream records come
276289
waitForSinkSize("mongodb_sink", 3);
277290

278291
// 3. Check results
279-
String[] expected =
280-
new String[] {
281-
String.format("+I[%s, coll.name, A101]", db),
282-
String.format("+I[%s, coll.name, A102]", db),
283-
String.format("+I[%s, coll.name, A103]", db)
284-
};
285-
286-
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("mongodb_sink");
287-
Assertions.assertThat(actual).containsExactlyInAnyOrder(expected);
292+
Assertions.assertThat(TestValuesTableFactory.getRawResultsAsStrings("mongodb_sink"))
293+
.containsExactlyInAnyOrder(
294+
String.format("+I[%s, coll.name, A101]", db),
295+
String.format("+I[%s, coll.name, A102]", db),
296+
String.format("+I[%s, coll.name, A103]", db));
297+
298+
// 4. Prepare some incremental data
299+
mongodbClient
300+
.getDatabase(db)
301+
.getCollection("coll.name")
302+
.insertMany(
303+
Arrays.asList(
304+
Document.parse("{\"seq\": \"A104\"}"),
305+
Document.parse("{\"seq\": \"A105\"}"),
306+
Document.parse("{\"seq\": \"A106\"}")));
307+
308+
// 5. Check incremental records
309+
waitForSinkSize("mongodb_sink", 6);
310+
Assertions.assertThat(TestValuesTableFactory.getRawResultsAsStrings("mongodb_sink"))
311+
.containsExactlyInAnyOrder(
312+
String.format("+I[%s, coll.name, A101]", db),
313+
String.format("+I[%s, coll.name, A102]", db),
314+
String.format("+I[%s, coll.name, A103]", db),
315+
String.format("+I[%s, coll.name, A104]", db),
316+
String.format("+I[%s, coll.name, A105]", db),
317+
String.format("+I[%s, coll.name, A106]", db));
288318

289319
result.getJobClient().get().cancel().get();
290320
}

0 commit comments

Comments
 (0)