Skip to content

Commit dbb2210

Browse files
johanl-dballisonport-db
authored andcommitted
Support struct evolution inside maps
## Description This PR resolves issue #1641 to allow automatic schema evolution in structs that are inside maps. Assuming the target and source tables have the following schemas: target: `id string, map map<int, struct<a: int, b: int>>` source: `id string, map map<int, struct<a: int, b: int, c: int>>` ``` SET spark.databricks.delta.schema.autoMerge.enabled = true; MERGE INTO target t USING source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET * ``` returns an analysis error today: ``` AnalysisException: cannot resolve 's.map' due to data type mismatch: cannot cast map<string,struct<a:int,b:int>> to map<string,struct<a:int,b:int,c:string>>; ``` With this change, the merge command succeeds and the target table schema evolves to include field `c` inside the map value. The same also works for map keys. - Tests are added to `MergeIntoSuiteBase` and `MergeIntoSQLSuite` to cover struct evolution inside of maps values and keys. ## Does this PR introduce _any_ user-facing changes? Yes, struct evolution inside of maps now succeeds instead of failing with an analysis error, see previous example. Closes #1868 GitOrigin-RevId: 07ce2531e03c4e2fa69e8a34f33ba8d2dc3a0228
1 parent 71e0a83 commit dbb2210

File tree

2 files changed

+344
-0
lines changed

2 files changed

+344
-0
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/UpdateExpressionsSupport.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,31 @@ trait UpdateExpressionsSupport extends CastSupport with SQLConfHelper with Analy
114114
ArrayType(toEt, containsNull = true)
115115
)
116116
}
117+
case (from: MapType, to: MapType) if !Cast.canCast(from, to) =>
118+
// Manually convert map keys and values if the types are not compatible to allow schema
119+
// evolution. This is slower than direct cast so we only do it when required.
120+
def createMapConverter(convert: (Expression, Expression) => Expression): Expression = {
121+
val keyVar = NamedLambdaVariable("keyVar", from.keyType, nullable = false)
122+
val valueVar =
123+
NamedLambdaVariable("valueVar", from.valueType, from.valueContainsNull)
124+
LambdaFunction(convert(keyVar, valueVar), Seq(keyVar, valueVar))
125+
}
126+
127+
var transformedKeysAndValues = fromExpression
128+
if (from.keyType != to.keyType) {
129+
transformedKeysAndValues =
130+
TransformKeys(transformedKeysAndValues, createMapConverter {
131+
(key, _) => castIfNeeded(key, to.keyType, allowStructEvolution)
132+
})
133+
}
134+
135+
if (from.valueType != to.valueType) {
136+
transformedKeysAndValues =
137+
TransformValues(transformedKeysAndValues, createMapConverter {
138+
(_, value) => castIfNeeded(value, to.valueType, allowStructEvolution)
139+
})
140+
}
141+
cast(transformedKeysAndValues, to)
117142
case (from: StructType, to: StructType)
118143
if !DataType.equalsIgnoreCaseAndNullability(from, to) && resolveStructsByName =>
119144
// All from fields must be present in the final schema, or we'll silently lose data.

spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala

Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3454,6 +3454,325 @@ abstract class MergeIntoSuiteBase
34543454
"""{ "key": "A", "value": [ { "a": { "y": 20, "x": [ { "c": 10, "d": 30, "e": null } ] }, "b": 2 } ] }
34553455
{ "key": "B", "value": [ { "a": { "y": 60, "x": [ { "c": 20, "d": 50, "e": null } ] }, "b": 3 } ] }""",
34563456
expectErrorWithoutEvolutionContains = "Cannot cast")
3457+
3458+
// Struct evolution inside of map values.
3459+
testNestedStructsEvolution("new source column in map struct value")(
3460+
target =
3461+
"""{ "key": "A", "map": { "key": { "a": 1 } } }
3462+
{ "key": "C", "map": { "key": { "a": 3 } } }""",
3463+
source =
3464+
"""{ "key": "A", "map": { "key": { "a": 2, "b": 2 } } }
3465+
{ "key": "B", "map": { "key": { "a": 1, "b": 2 } } }""",
3466+
targetSchema = new StructType()
3467+
.add("key", StringType)
3468+
.add("map", MapType(
3469+
StringType,
3470+
new StructType().add("a", IntegerType))),
3471+
sourceSchema = new StructType()
3472+
.add("key", StringType)
3473+
.add("map", MapType(
3474+
StringType,
3475+
new StructType().add("a", IntegerType).add("b", IntegerType))),
3476+
resultSchema = new StructType()
3477+
.add("key", StringType)
3478+
.add("map", MapType(
3479+
StringType,
3480+
new StructType().add("a", IntegerType).add("b", IntegerType))),
3481+
clauses = update("*") :: insert("*") :: Nil,
3482+
result =
3483+
"""{ "key": "A", "map": { "key": { "a": 2, "b": 2 } } }
3484+
{ "key": "B", "map": { "key": { "a": 1, "b": 2 } } }
3485+
{ "key": "C", "map": { "key": { "a": 3, "b": null } } }""",
3486+
expectErrorWithoutEvolutionContains = "Cannot cast")
3487+
3488+
testNestedStructsEvolution("new source column in nested map struct value")(
3489+
target =
3490+
"""{"key": "A", "map": { "key": { "innerKey": { "a": 1 } } } }
3491+
{"key": "C", "map": { "key": { "innerKey": { "a": 3 } } } }""",
3492+
source =
3493+
"""{"key": "A", "map": { "key": { "innerKey": { "a": 2, "b": 3 } } } }
3494+
{"key": "B", "map": { "key": { "innerKey": { "a": 2, "b": 3 } } } }""",
3495+
targetSchema = new StructType()
3496+
.add("key", StringType)
3497+
.add("map", MapType(
3498+
StringType,
3499+
MapType(StringType, new StructType().add("a", IntegerType)))),
3500+
sourceSchema = new StructType()
3501+
.add("key", StringType)
3502+
.add("map", MapType(
3503+
StringType,
3504+
MapType(StringType, new StructType().add("a", IntegerType).add("b", IntegerType)))),
3505+
resultSchema = new StructType()
3506+
.add("key", StringType)
3507+
.add("map", MapType(
3508+
StringType,
3509+
MapType(StringType, new StructType().add("a", IntegerType).add("b", IntegerType)))),
3510+
clauses = update("*") :: insert("*") :: Nil,
3511+
result =
3512+
"""{"key": "A", "map": { "key": { "innerKey": { "a": 2, "b": 3 } } } }
3513+
{"key": "B", "map": { "key": { "innerKey": { "a": 2, "b": 3 } } } }
3514+
{"key": "C", "map": { "key": { "innerKey": { "a": 3, "b": null } } } }""",
3515+
expectErrorWithoutEvolutionContains = "Cannot cast")
3516+
3517+
testNestedStructsEvolution("source map struct value contains less columns than target")(
3518+
target =
3519+
"""{ "key": "A", "map": { "key": { "a": 1, "b": 1 } } }
3520+
{ "key": "C", "map": { "key": { "a": 3, "b": 1 } } }""",
3521+
source =
3522+
"""{ "key": "A", "map": { "key": { "a": 2 } } }
3523+
{ "key": "B", "map": { "key": { "a": 1 } } }""",
3524+
targetSchema = new StructType()
3525+
.add("key", StringType)
3526+
.add("map", MapType(
3527+
StringType,
3528+
new StructType().add("a", IntegerType).add("b", IntegerType))),
3529+
sourceSchema = new StructType()
3530+
.add("key", StringType)
3531+
.add("map", MapType(
3532+
StringType,
3533+
new StructType().add("a", IntegerType))),
3534+
resultSchema = new StructType()
3535+
.add("key", StringType)
3536+
.add("map", MapType(
3537+
StringType,
3538+
new StructType().add("a", IntegerType).add("b", IntegerType))),
3539+
clauses = update("*") :: insert("*") :: Nil,
3540+
result =
3541+
"""{ "key": "A", "map": { "key": { "a": 2, "b": null } } }
3542+
{ "key": "B", "map": { "key": { "a": 1, "b": null } } }
3543+
{ "key": "C", "map": { "key": { "a": 3, "b": 1 } } }""",
3544+
expectErrorWithoutEvolutionContains = "Cannot cast")
3545+
3546+
testNestedStructsEvolution("source nested map struct value contains less columns than target")(
3547+
target =
3548+
"""{"key": "A", "map": { "key": { "innerKey": { "a": 1, "b": 1 } } } }
3549+
{"key": "C", "map": { "key": { "innerKey": { "a": 3, "b": 1 } } } }""",
3550+
source =
3551+
"""{"key": "A", "map": { "key": { "innerKey": { "a": 2 } } } }
3552+
{"key": "B", "map": { "key": { "innerKey": { "a": 2 } } } }""",
3553+
targetSchema = new StructType()
3554+
.add("key", StringType)
3555+
.add("map", MapType(
3556+
StringType,
3557+
MapType(StringType, new StructType().add("a", IntegerType).add("b", IntegerType)))),
3558+
sourceSchema = new StructType()
3559+
.add("key", StringType)
3560+
.add("map", MapType(
3561+
StringType,
3562+
MapType(StringType, new StructType().add("a", IntegerType)))),
3563+
resultSchema = new StructType()
3564+
.add("key", StringType)
3565+
.add("map", MapType(
3566+
StringType,
3567+
MapType(StringType, new StructType().add("a", IntegerType).add("b", IntegerType)))),
3568+
clauses = update("*") :: insert("*") :: Nil,
3569+
result =
3570+
"""{"key": "A", "map": { "key": { "innerKey": { "a": 2, "b": null } } } }
3571+
{"key": "B", "map": { "key": { "innerKey": { "a": 2, "b": null } } } }
3572+
{"key": "C", "map": { "key": { "innerKey": { "a": 3, "b": 1 } } } }""",
3573+
expectErrorWithoutEvolutionContains = "Cannot cast")
3574+
3575+
testNestedStructsEvolution("source nested map struct value contains different type than target")(
3576+
target =
3577+
"""{"key": "A", "map": { "key": { "a": 1, "b" : 1 } } }
3578+
{"key": "C", "map": { "key": { "a": 3, "b" : 1 } } }""",
3579+
source =
3580+
"""{"key": "A", "map": { "key": { "a": 1, "b" : "2" } } }
3581+
{"key": "B", "map": { "key": { "a": 2, "b" : "2" } } }""",
3582+
targetSchema = new StructType()
3583+
.add("key", StringType)
3584+
.add("map", MapType(
3585+
StringType,
3586+
new StructType().add("a", IntegerType).add("b", IntegerType))),
3587+
sourceSchema = new StructType()
3588+
.add("key", StringType)
3589+
.add("map", MapType(
3590+
StringType,
3591+
new StructType().add("a", IntegerType).add("b", StringType))),
3592+
resultSchema = new StructType()
3593+
.add("key", StringType)
3594+
.add("map", MapType(
3595+
StringType,
3596+
new StructType().add("a", IntegerType).add("b", IntegerType))),
3597+
clauses = update("*") :: insert("*") :: Nil,
3598+
result =
3599+
"""{"key": "A", "map": { "key": { "a": 1, "b" : 2 } } }
3600+
{"key": "B", "map": { "key": { "a": 2, "b" : 2 } } }
3601+
{"key": "C", "map": { "key": { "a": 3, "b" : 1 } } }""",
3602+
resultWithoutEvolution =
3603+
"""{"key": "A", "map": { "key": { "a": 1, "b" : 2 } } }
3604+
{"key": "B", "map": { "key": { "a": 2, "b" : 2 } } }
3605+
{"key": "C", "map": { "key": { "a": 3, "b" : 1 } } }""")
3606+
3607+
3608+
testNestedStructsEvolution("source nested map struct value in different order")(
3609+
target =
3610+
"""{"key": "A", "map": { "key": { "a" : 1, "b" : 1 } } }
3611+
{"key": "C", "map": { "key": { "a" : 3, "b" : 1 } } }""",
3612+
source =
3613+
"""{"key": "A", "map": { "key": { "b" : 2, "a" : 1, "c" : 3 } } }
3614+
{"key": "B", "map": { "key": { "b" : 2, "a" : 2, "c" : 4 } } }""",
3615+
targetSchema = new StructType()
3616+
.add("key", StringType)
3617+
.add("map", MapType(
3618+
StringType,
3619+
new StructType().add("a", IntegerType).add("b", IntegerType))),
3620+
sourceSchema = new StructType()
3621+
.add("key", StringType)
3622+
.add("map", MapType(
3623+
StringType,
3624+
new StructType().add("a", IntegerType).add("b", IntegerType).add("c", IntegerType))),
3625+
resultSchema = new StructType()
3626+
.add("key", StringType)
3627+
.add("map", MapType(
3628+
StringType,
3629+
new StructType().add("a", IntegerType).add("b", IntegerType).add("c", IntegerType))),
3630+
clauses = update("*") :: insert("*") :: Nil,
3631+
result =
3632+
"""{"key": "A", "map": { "key": { "a": 1, "b" : 2, "c" : 3 } } }
3633+
{"key": "B", "map": { "key": { "a": 2, "b" : 2, "c" : 4 } } }
3634+
{"key": "C", "map": { "key": { "a": 3, "b" : 1, "c" : null } } }""",
3635+
expectErrorWithoutEvolutionContains = "Cannot cast")
3636+
3637+
testNestedStructsEvolution("source map struct value to map array value")(
3638+
target =
3639+
"""{ "key": "A", "map": { "key": [ 1, 2 ] } }
3640+
{ "key": "C", "map": { "key": [ 3, 4 ] } }""",
3641+
source =
3642+
"""{ "key": "A", "map": { "key": { "a": 2 } } }
3643+
{ "key": "B", "map": { "key": { "a": 1 } } }""",
3644+
targetSchema = new StructType()
3645+
.add("key", StringType)
3646+
.add("map", MapType(
3647+
StringType,
3648+
ArrayType(IntegerType))),
3649+
sourceSchema = new StructType()
3650+
.add("key", StringType)
3651+
.add("map", MapType(
3652+
StringType,
3653+
new StructType().add("a", IntegerType))),
3654+
clauses = update("*") :: insert("*") :: Nil,
3655+
expectErrorContains = "Failed to merge incompatible data types",
3656+
expectErrorWithoutEvolutionContains = "Cannot cast")
3657+
3658+
testNestedStructsEvolution("source struct nested in map array values contains more columns in different order")(
3659+
target =
3660+
"""{ "key": "A", "map": { "key": [ { "a": 1, "b": 2 } ] } }
3661+
{ "key": "C", "map": { "key": [ { "a": 3, "b": 4 } ] } }""",
3662+
source =
3663+
"""{ "key": "A", "map": { "key": [ { "b": 6, "c": 7, "a": 5 } ] } }
3664+
{ "key": "B", "map": { "key": [ { "b": 9, "c": 10, "a": 8 } ] } }""",
3665+
targetSchema = new StructType()
3666+
.add("key", StringType)
3667+
.add("map", MapType(
3668+
StringType,
3669+
ArrayType(
3670+
new StructType().add("a", IntegerType).add("b", IntegerType)))),
3671+
sourceSchema = new StructType()
3672+
.add("key", StringType)
3673+
.add("map", MapType(
3674+
StringType,
3675+
ArrayType(
3676+
new StructType().add("a", IntegerType).add("b", IntegerType).add("c", IntegerType)))),
3677+
resultSchema = new StructType()
3678+
.add("key", StringType)
3679+
.add("map", MapType(
3680+
StringType,
3681+
ArrayType(
3682+
new StructType().add("a", IntegerType).add("b", IntegerType).add("c", IntegerType)))),
3683+
clauses = update("*") :: insert("*") :: Nil,
3684+
result =
3685+
"""{ "key": "A", "map": { "key": [ { "a": 5, "b": 6, "c": 7 } ] } }
3686+
{ "key": "B", "map": { "key": [ { "a": 8, "b": 9, "c": 10 } ] } }
3687+
{ "key": "C", "map": { "key": [ { "a": 3, "b": 4, "c": null } ] } }""",
3688+
expectErrorWithoutEvolutionContains = "Cannot cast")
3689+
3690+
// Struct evolution inside of map keys.
3691+
testEvolution("new source column in map struct key")(
3692+
targetData = Seq((1, 2, 3, 4), (3, 5, 6, 7)).toDF("key", "a", "b", "value")
3693+
.selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x"),
3694+
sourceData = Seq((1, 10, 30, 50, 1), (2, 20, 40, 60, 2)).toDF("key", "a", "b", "c", "value")
3695+
.selectExpr("key", "map(named_struct('a', a, 'b', b, 'c', c), value) as x"),
3696+
clauses = update("*") :: insert("*") :: Nil,
3697+
expected = Seq((1, 10, 30, 50, 1), (2, 20, 40, 60, 2), (3, 5, 6, null, 7))
3698+
.asInstanceOf[List[(Integer, Integer, Integer, Integer, Integer)]]
3699+
.toDF("key", "a", "b", "c", "value")
3700+
.selectExpr("key", "map(named_struct('a', a, 'b', b, 'c', c), value) as x"),
3701+
expectErrorWithoutEvolutionContains = "Cannot cast"
3702+
)
3703+
3704+
testEvolution("source nested map struct key contains less columns than target")(
3705+
targetData = Seq((1, 2, 3, 4, 5), (3, 6, 7, 8, 9)).toDF("key", "a", "b", "c", "value")
3706+
.selectExpr("key", "map(named_struct('a', a, 'b', b, 'c', c), value) as x"),
3707+
sourceData = Seq((1, 10, 50, 1), (2, 20, 60, 2)).toDF("key", "a", "c", "value")
3708+
.selectExpr("key", "map(named_struct('a', a, 'c', c), value) as x"),
3709+
clauses = update("*") :: insert("*") :: Nil,
3710+
expected = Seq((1, 10, null, 50, 1), (2, 20, null, 60, 2), (3, 6, 7, 8, 9))
3711+
.asInstanceOf[List[(Integer, Integer, Integer, Integer, Integer)]]
3712+
.toDF("key", "a", "b", "c", "value")
3713+
.selectExpr("key", "map(named_struct('a', a, 'b', b, 'c', c), value) as x"),
3714+
expectErrorWithoutEvolutionContains = "Cannot cast"
3715+
)
3716+
3717+
testEvolution("source nested map struct key contains different type than target")(
3718+
targetData = Seq((1, 2, 3, 4), (3, 5, 6, 7)).toDF("key", "a", "b", "value")
3719+
.selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x"),
3720+
sourceData = Seq((1, 10, "30", 1), (2, 20, "40", 2)).toDF("key", "a", "b", "value")
3721+
.selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x"),
3722+
clauses = update("*") :: insert("*") :: Nil,
3723+
expected = Seq((1, 10, 30, 1), (2, 20, 40, 2), (3, 5, 6, 7))
3724+
.asInstanceOf[List[(Integer, Integer, Integer, Integer)]]
3725+
.toDF("key", "a", "b", "value")
3726+
.selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x"),
3727+
expectedWithoutEvolution = Seq((1, 10, 30, 1), (2, 20, 40, 2), (3, 5, 6, 7))
3728+
.asInstanceOf[List[(Integer, Integer, Integer, Integer)]]
3729+
.toDF("key", "a", "b", "value")
3730+
.selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x")
3731+
)
3732+
3733+
testEvolution("source nested map struct key in different order")(
3734+
targetData = Seq((1, 2, 3, 4), (3, 5, 6, 7)).toDF("key", "a", "b", "value")
3735+
.selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x"),
3736+
sourceData = Seq((1, 10, 30, 1), (2, 20, 40, 2)).toDF("key", "a", "b", "value")
3737+
.selectExpr("key", "map(named_struct('b', b, 'a', a), value) as x"),
3738+
clauses = update("*") :: insert("*") :: Nil,
3739+
expected = Seq((1, 30, 10, 1), (2, 40, 20, 2), (3, 5, 6, 7))
3740+
.asInstanceOf[List[(Integer, Integer, Integer, Integer)]]
3741+
.toDF("key", "a", "b", "value")
3742+
.selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x"),
3743+
expectedWithoutEvolution = Seq((1, 30, 10, 1), (2, 40, 20, 2), (3, 5, 6, 7))
3744+
.asInstanceOf[List[(Integer, Integer, Integer, Integer)]]
3745+
.toDF("key", "a", "b", "value")
3746+
.selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x")
3747+
)
3748+
3749+
testEvolution(
3750+
"source struct nested in map array keys contains more columns")(
3751+
targetData = Seq((1, 2, 3, 4), (3, 5, 6, 7)).toDF("key", "a", "b", "value")
3752+
.selectExpr("key", "map(array(named_struct('a', a, 'b', b)), value) as x"),
3753+
sourceData = Seq((1, 10, 30, 50, 1), (2, 20, 40, 60, 2)).toDF("key", "a", "b", "c", "value")
3754+
.selectExpr("key", "map(array(named_struct('a', a, 'b', b, 'c', c)), value) as x"),
3755+
clauses = update("*") :: insert("*") :: Nil,
3756+
expected = Seq((1, 10, 30, 50, 1), (2, 20, 40, 60, 2), (3, 5, 6, null, 7))
3757+
.asInstanceOf[List[(Integer, Integer, Integer, Integer, Integer)]]
3758+
.toDF("key", "a", "b", "c", "value")
3759+
.selectExpr("key", "map(array(named_struct('a', a, 'b', b, 'c', c)), value) as x"),
3760+
expectErrorWithoutEvolutionContains = "cannot cast"
3761+
)
3762+
3763+
testEvolution("struct evolution in both map keys and values")(
3764+
targetData = Seq((1, 2, 3, 4, 5), (3, 6, 7, 8, 9)).toDF("key", "a", "b", "d", "e")
3765+
.selectExpr("key", "map(named_struct('a', a, 'b', b), named_struct('d', d, 'e', e)) as x"),
3766+
sourceData = Seq((1, 10, 30, 50, 70, 90, 110), (2, 20, 40, 60, 80, 100, 120))
3767+
.toDF("key", "a", "b", "c", "d", "e", "f")
3768+
.selectExpr("key", "map(named_struct('a', a, 'b', b, 'c', c), named_struct('d', d, 'e', e, 'f', f)) as x"),
3769+
clauses = update("*") :: insert("*") :: Nil,
3770+
expected = Seq((1, 10, 30, 50, 70, 90, 110), (2, 20, 40, 60, 80, 100, 120), (3, 6, 7, null, 8, 9, null))
3771+
.asInstanceOf[List[(Integer, Integer, Integer, Integer, Integer, Integer, Integer)]]
3772+
.toDF("key", "a", "b", "c", "d", "e", "f")
3773+
.selectExpr("key", "map(named_struct('a', a, 'b', b, 'c', c), named_struct('d', d, 'e', e, 'f', f)) as x"),
3774+
expectErrorWithoutEvolutionContains = "cannot cast"
3775+
)
34573776
// scalastyle:on line.size.limit
34583777

34593778
testEvolution("new column with update * and insert non-*")(

0 commit comments

Comments
 (0)