|
36 | 36 | import java.util.Collection; |
37 | 37 | import java.util.Collections; |
38 | 38 | import java.util.HashSet; |
39 | | -import java.util.LinkedHashMap; |
40 | 39 | import java.util.List; |
41 | 40 | import java.util.Map; |
42 | 41 | import java.util.Optional; |
@@ -442,9 +441,9 @@ private String updateTableQueryBuilder(final StreamConfig stream, |
442 | 441 | } |
443 | 442 |
|
444 | 443 | private String insertNewRecords(final StreamConfig stream, |
445 | | - final String finalSuffix, |
446 | | - final boolean forceSafeCasting, |
447 | | - final Optional<Instant> minRawTimestamp) { |
| 444 | + final String finalSuffix, |
| 445 | + final boolean forceSafeCasting, |
| 446 | + final Optional<Instant> minRawTimestamp) { |
448 | 447 | final String columnList = stream.columns().keySet().stream().map(quotedColumnId -> quotedColumnId.name(QUOTE) + ",").collect(joining("\n")); |
449 | 448 | final String extractNewRawRecords = extractNewRawRecords(stream, forceSafeCasting, minRawTimestamp); |
450 | 449 |
|
@@ -490,12 +489,16 @@ private String upsertNewRecords(final StreamConfig stream, |
490 | 489 | cursorComparison = |
491 | 490 | // First, compare the cursors. |
492 | 491 | "(target_table." + cursor + " < new_record." + cursor |
493 | | - // Then, break ties with extracted_at. (also explicitly check for both new_record and final table having null cursor |
494 | | - // because NULL != NULL in SQL) |
495 | | - + " OR (target_table." + cursor + " = new_record." + cursor + " AND target_table._airbyte_extracted_at < new_record._airbyte_extracted_at)" |
496 | | - + " OR (target_table." + cursor + " IS NULL AND new_record." + cursor + " IS NULL AND target_table._airbyte_extracted_at < new_record._airbyte_extracted_at)" |
497 | | - // Or, if the final table has null cursor but new_record has non-null cursor, then take the new record. |
498 | | - + " OR (target_table." + cursor + " IS NULL AND new_record." + cursor + " IS NOT NULL))"; |
| 492 | + // Then, break ties with extracted_at. (also explicitly check for both new_record and final table |
| 493 | + // having null cursor |
| 494 | + // because NULL != NULL in SQL) |
| 495 | + + " OR (target_table." + cursor + " = new_record." + cursor |
| 496 | + + " AND target_table._airbyte_extracted_at < new_record._airbyte_extracted_at)" |
| 497 | + + " OR (target_table." + cursor + " IS NULL AND new_record." + cursor |
| 498 | + + " IS NULL AND target_table._airbyte_extracted_at < new_record._airbyte_extracted_at)" |
| 499 | + // Or, if the final table has null cursor but new_record has non-null cursor, then take the new |
| 500 | + // record. |
| 501 | + + " OR (target_table." + cursor + " IS NULL AND new_record." + cursor + " IS NOT NULL))"; |
499 | 502 | } else { |
500 | 503 | // If there's no cursor, then we just take the most-recently-emitted record |
501 | 504 | cursorComparison = "target_table._airbyte_extracted_at < new_record._airbyte_extracted_at"; |
@@ -531,29 +534,29 @@ private String upsertNewRecords(final StreamConfig stream, |
531 | 534 | "cdcSkipInsertClause", cdcSkipInsertClause, |
532 | 535 | "column_list", columnList, |
533 | 536 | "newRecordColumnList", newRecordColumnList)).replace( |
534 | | - """ |
535 | | - MERGE ${project_id}.${final_table_id} target_table |
536 | | - USING ( |
537 | | - ${extractNewRawRecords} |
538 | | - ) new_record |
539 | | - ON ${pkEquivalent} |
540 | | - ${cdcDeleteClause} |
541 | | - WHEN MATCHED AND ${cursorComparison} THEN UPDATE SET |
542 | | - ${columnAssignments} |
543 | | - _airbyte_meta = new_record._airbyte_meta, |
544 | | - _airbyte_raw_id = new_record._airbyte_raw_id, |
545 | | - _airbyte_extracted_at = new_record._airbyte_extracted_at |
546 | | - WHEN NOT MATCHED ${cdcSkipInsertClause} THEN INSERT ( |
547 | | - ${column_list} |
548 | | - _airbyte_meta, |
549 | | - _airbyte_raw_id, |
550 | | - _airbyte_extracted_at |
551 | | - ) VALUES ( |
552 | | - ${newRecordColumnList} |
553 | | - new_record._airbyte_meta, |
554 | | - new_record._airbyte_raw_id, |
555 | | - new_record._airbyte_extracted_at |
556 | | - );"""); |
| 537 | + """ |
| 538 | + MERGE ${project_id}.${final_table_id} target_table |
| 539 | + USING ( |
| 540 | + ${extractNewRawRecords} |
| 541 | + ) new_record |
| 542 | + ON ${pkEquivalent} |
| 543 | + ${cdcDeleteClause} |
| 544 | + WHEN MATCHED AND ${cursorComparison} THEN UPDATE SET |
| 545 | + ${columnAssignments} |
| 546 | + _airbyte_meta = new_record._airbyte_meta, |
| 547 | + _airbyte_raw_id = new_record._airbyte_raw_id, |
| 548 | + _airbyte_extracted_at = new_record._airbyte_extracted_at |
| 549 | + WHEN NOT MATCHED ${cdcSkipInsertClause} THEN INSERT ( |
| 550 | + ${column_list} |
| 551 | + _airbyte_meta, |
| 552 | + _airbyte_raw_id, |
| 553 | + _airbyte_extracted_at |
| 554 | + ) VALUES ( |
| 555 | + ${newRecordColumnList} |
| 556 | + new_record._airbyte_meta, |
| 557 | + new_record._airbyte_raw_id, |
| 558 | + new_record._airbyte_extracted_at |
| 559 | + );"""); |
557 | 560 | } |
558 | 561 |
|
559 | 562 | /** |
|
0 commit comments