Skip to content

Commit 9aaf726

Browse files
authored
Update the IcebergTableSynchronizer.kt to allow us to force commit between schema updates (#69213)
1 parent c2df299 commit 9aaf726

File tree

3 files changed

+64
-9
lines changed

3 files changed

+64
-9
lines changed

airbyte-cdk/bulk/changelog.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
## Version 0.1.69
2+
3+
**Load CDK**
4+
5+
* Changed: Update the IcebergTableSynchronizer to allow for individual update operations commit in preparation for BigLake
6+
17
## Version 0.1.68
28

39
**Load CDK**

airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/IcebergTableSynchronizer.kt

Lines changed: 57 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,19 +64,26 @@ class IcebergTableSynchronizer(
6464
*
6565
* @param table The Iceberg table to update.
6666
* @param incomingSchema The schema describing incoming data.
67+
* @param columnTypeChangeBehavior How to handle column type changes.
68+
* @param requireSeparateCommitsForColumnReplace If true, when replacing a column (deleting and
69+
* re-adding with the same name but different type), the delete and add operations are committed
70+
* separately. This is required for some catalogs (like BigLake) that don't support deleting and
71+
* adding a column with the same name in a single commit, even with different field IDs. Default
72+
* is false for backward compatibility.
6773
* @return The updated [Schema], after changes have been applied and committed.
6874
*/
6975
fun maybeApplySchemaChanges(
7076
table: Table,
7177
incomingSchema: Schema,
7278
columnTypeChangeBehavior: ColumnTypeChangeBehavior,
79+
requireSeparateCommitsForColumnReplace: Boolean = false,
7380
): SchemaUpdateResult {
7481
val existingSchema = table.schema()
7582
val diff = comparator.compareSchemas(incomingSchema, existingSchema)
7683

7784
if (!diff.hasChanges()) {
7885
// If no differences, return the existing schema as-is.
79-
return SchemaUpdateResult(existingSchema, pendingUpdate = null)
86+
return SchemaUpdateResult(existingSchema, pendingUpdates = emptyList())
8087
}
8188

8289
val update: UpdateSchema = table.updateSchema().allowIncompatibleChanges()
@@ -85,6 +92,9 @@ class IcebergTableSynchronizer(
8592
diff.removedColumns.forEach { removedColumn -> update.deleteColumn(removedColumn) }
8693

8794
// 2) Update types => find a supertype for each changed column
95+
val columnsToReplaceInSecondCommit =
96+
mutableMapOf<String, org.apache.iceberg.types.Types.NestedField>()
97+
8898
diff.updatedDataTypes.forEach { columnName ->
8999
val existingField =
90100
existingSchema.findField(columnName)
@@ -110,10 +120,20 @@ class IcebergTableSynchronizer(
110120
}
111121
ColumnTypeChangeBehavior.OVERWRITE -> {
112122
// Even when allowIncompatibleChanges is enabled, Iceberg still doesn't allow
113-
// arbitrary type changes.
114-
// So we have to drop+add the column here.
115-
update.deleteColumn(columnName)
116-
update.addColumn(columnName, incomingField.type())
123+
// arbitrary type changes via updateColumn().
124+
// So we have to drop+add the column (replace it).
125+
126+
if (requireSeparateCommitsForColumnReplace) {
127+
// For catalogs like BigLake that don't support delete+add in single commit,
128+
// we only delete here and will add in a separate update later.
129+
update.deleteColumn(columnName)
130+
// Store the field to add back later
131+
columnsToReplaceInSecondCommit[columnName] = incomingField
132+
} else {
133+
// Standard Iceberg behavior: delete+add in single commit
134+
update.deleteColumn(columnName)
135+
update.addColumn(columnName, incomingField.type())
136+
}
117137
}
118138
}
119139
}
@@ -174,16 +194,45 @@ class IcebergTableSynchronizer(
174194
update.setIdentifierFields(updatedIdentifierFields)
175195
}
176196

197+
// If we're doing separate commits for column replacements, commit the delete operations now
198+
if (requireSeparateCommitsForColumnReplace && columnsToReplaceInSecondCommit.isNotEmpty()) {
199+
// Commit the first update (with deletes but not adds for replaced columns)
200+
if (columnTypeChangeBehavior.commitImmediately) {
201+
update.commit()
202+
}
203+
204+
// Refresh table to get updated schema after delete
205+
table.refresh()
206+
207+
// Create a new update for adding the replaced columns back with their new types
208+
val addUpdate = table.updateSchema().allowIncompatibleChanges()
209+
210+
// Add back the replaced columns with their new types
211+
columnsToReplaceInSecondCommit.forEach { (columnName, field) ->
212+
addUpdate.addColumn(null, columnName, field.type())
213+
}
214+
215+
// Apply and return this second update
216+
val finalSchema = addUpdate.apply()
217+
return if (columnTypeChangeBehavior.commitImmediately) {
218+
addUpdate.commit()
219+
SchemaUpdateResult(finalSchema, pendingUpdates = emptyList())
220+
} else {
221+
// Return both updates in order: first the delete (update), then the add (addUpdate)
222+
SchemaUpdateResult(finalSchema, pendingUpdates = listOf(update, addUpdate))
223+
}
224+
}
225+
177226
// `apply` just validates that the schema change is valid, it doesn't actually commit().
178227
// It returns the schema that the table _would_ have after committing.
179228
val newSchema: Schema = update.apply()
180229
if (columnTypeChangeBehavior.commitImmediately) {
181230
update.commit()
182-
return SchemaUpdateResult(newSchema, pendingUpdate = null)
231+
return SchemaUpdateResult(newSchema, pendingUpdates = emptyList())
183232
} else {
184-
return SchemaUpdateResult(newSchema, update)
233+
return SchemaUpdateResult(newSchema, pendingUpdates = listOf(update))
185234
}
186235
}
187236
}
188237

189-
data class SchemaUpdateResult(val schema: Schema, val pendingUpdate: UpdateSchema?)
238+
data class SchemaUpdateResult(val schema: Schema, val pendingUpdates: List<UpdateSchema>)
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.1.68
1+
version=0.1.69

0 commit comments

Comments
 (0)