@@ -3454,6 +3454,325 @@ abstract class MergeIntoSuiteBase
34543454 """ { "key": "A", "value": [ { "a": { "y": 20, "x": [ { "c": 10, "d": 30, "e": null } ] }, "b": 2 } ] }
34553455 { "key": "B", "value": [ { "a": { "y": 60, "x": [ { "c": 20, "d": 50, "e": null } ] }, "b": 3 } ] }""" ,
34563456 expectErrorWithoutEvolutionContains = " Cannot cast" )
3457+
3458+ // Struct evolution inside of map values.
3459+ testNestedStructsEvolution(" new source column in map struct value" )(
3460+ target =
3461+ """ { "key": "A", "map": { "key": { "a": 1 } } }
3462+ { "key": "C", "map": { "key": { "a": 3 } } }""" ,
3463+ source =
3464+ """ { "key": "A", "map": { "key": { "a": 2, "b": 2 } } }
3465+ { "key": "B", "map": { "key": { "a": 1, "b": 2 } } }""" ,
3466+ targetSchema = new StructType ()
3467+ .add(" key" , StringType )
3468+ .add(" map" , MapType (
3469+ StringType ,
3470+ new StructType ().add(" a" , IntegerType ))),
3471+ sourceSchema = new StructType ()
3472+ .add(" key" , StringType )
3473+ .add(" map" , MapType (
3474+ StringType ,
3475+ new StructType ().add(" a" , IntegerType ).add(" b" , IntegerType ))),
3476+ resultSchema = new StructType ()
3477+ .add(" key" , StringType )
3478+ .add(" map" , MapType (
3479+ StringType ,
3480+ new StructType ().add(" a" , IntegerType ).add(" b" , IntegerType ))),
3481+ clauses = update(" *" ) :: insert(" *" ) :: Nil ,
3482+ result =
3483+ """ { "key": "A", "map": { "key": { "a": 2, "b": 2 } } }
3484+ { "key": "B", "map": { "key": { "a": 1, "b": 2 } } }
3485+ { "key": "C", "map": { "key": { "a": 3, "b": null } } }""" ,
3486+ expectErrorWithoutEvolutionContains = " Cannot cast" )
3487+
3488+ testNestedStructsEvolution(" new source column in nested map struct value" )(
3489+ target =
3490+ """ {"key": "A", "map": { "key": { "innerKey": { "a": 1 } } } }
3491+ {"key": "C", "map": { "key": { "innerKey": { "a": 3 } } } }""" ,
3492+ source =
3493+ """ {"key": "A", "map": { "key": { "innerKey": { "a": 2, "b": 3 } } } }
3494+ {"key": "B", "map": { "key": { "innerKey": { "a": 2, "b": 3 } } } }""" ,
3495+ targetSchema = new StructType ()
3496+ .add(" key" , StringType )
3497+ .add(" map" , MapType (
3498+ StringType ,
3499+ MapType (StringType , new StructType ().add(" a" , IntegerType )))),
3500+ sourceSchema = new StructType ()
3501+ .add(" key" , StringType )
3502+ .add(" map" , MapType (
3503+ StringType ,
3504+ MapType (StringType , new StructType ().add(" a" , IntegerType ).add(" b" , IntegerType )))),
3505+ resultSchema = new StructType ()
3506+ .add(" key" , StringType )
3507+ .add(" map" , MapType (
3508+ StringType ,
3509+ MapType (StringType , new StructType ().add(" a" , IntegerType ).add(" b" , IntegerType )))),
3510+ clauses = update(" *" ) :: insert(" *" ) :: Nil ,
3511+ result =
3512+ """ {"key": "A", "map": { "key": { "innerKey": { "a": 2, "b": 3 } } } }
3513+ {"key": "B", "map": { "key": { "innerKey": { "a": 2, "b": 3 } } } }
3514+ {"key": "C", "map": { "key": { "innerKey": { "a": 3, "b": null } } } }""" ,
3515+ expectErrorWithoutEvolutionContains = " Cannot cast" )
3516+
3517+ testNestedStructsEvolution(" source map struct value contains less columns than target" )(
3518+ target =
3519+ """ { "key": "A", "map": { "key": { "a": 1, "b": 1 } } }
3520+ { "key": "C", "map": { "key": { "a": 3, "b": 1 } } }""" ,
3521+ source =
3522+ """ { "key": "A", "map": { "key": { "a": 2 } } }
3523+ { "key": "B", "map": { "key": { "a": 1 } } }""" ,
3524+ targetSchema = new StructType ()
3525+ .add(" key" , StringType )
3526+ .add(" map" , MapType (
3527+ StringType ,
3528+ new StructType ().add(" a" , IntegerType ).add(" b" , IntegerType ))),
3529+ sourceSchema = new StructType ()
3530+ .add(" key" , StringType )
3531+ .add(" map" , MapType (
3532+ StringType ,
3533+ new StructType ().add(" a" , IntegerType ))),
3534+ resultSchema = new StructType ()
3535+ .add(" key" , StringType )
3536+ .add(" map" , MapType (
3537+ StringType ,
3538+ new StructType ().add(" a" , IntegerType ).add(" b" , IntegerType ))),
3539+ clauses = update(" *" ) :: insert(" *" ) :: Nil ,
3540+ result =
3541+ """ { "key": "A", "map": { "key": { "a": 2, "b": null } } }
3542+ { "key": "B", "map": { "key": { "a": 1, "b": null } } }
3543+ { "key": "C", "map": { "key": { "a": 3, "b": 1 } } }""" ,
3544+ expectErrorWithoutEvolutionContains = " Cannot cast" )
3545+
3546+ testNestedStructsEvolution(" source nested map struct value contains less columns than target" )(
3547+ target =
3548+ """ {"key": "A", "map": { "key": { "innerKey": { "a": 1, "b": 1 } } } }
3549+ {"key": "C", "map": { "key": { "innerKey": { "a": 3, "b": 1 } } } }""" ,
3550+ source =
3551+ """ {"key": "A", "map": { "key": { "innerKey": { "a": 2 } } } }
3552+ {"key": "B", "map": { "key": { "innerKey": { "a": 2 } } } }""" ,
3553+ targetSchema = new StructType ()
3554+ .add(" key" , StringType )
3555+ .add(" map" , MapType (
3556+ StringType ,
3557+ MapType (StringType , new StructType ().add(" a" , IntegerType ).add(" b" , IntegerType )))),
3558+ sourceSchema = new StructType ()
3559+ .add(" key" , StringType )
3560+ .add(" map" , MapType (
3561+ StringType ,
3562+ MapType (StringType , new StructType ().add(" a" , IntegerType )))),
3563+ resultSchema = new StructType ()
3564+ .add(" key" , StringType )
3565+ .add(" map" , MapType (
3566+ StringType ,
3567+ MapType (StringType , new StructType ().add(" a" , IntegerType ).add(" b" , IntegerType )))),
3568+ clauses = update(" *" ) :: insert(" *" ) :: Nil ,
3569+ result =
3570+ """ {"key": "A", "map": { "key": { "innerKey": { "a": 2, "b": null } } } }
3571+ {"key": "B", "map": { "key": { "innerKey": { "a": 2, "b": null } } } }
3572+ {"key": "C", "map": { "key": { "innerKey": { "a": 3, "b": 1 } } } }""" ,
3573+ expectErrorWithoutEvolutionContains = " Cannot cast" )
3574+
3575+ testNestedStructsEvolution(" source nested map struct value contains different type than target" )(
3576+ target =
3577+ """ {"key": "A", "map": { "key": { "a": 1, "b" : 1 } } }
3578+ {"key": "C", "map": { "key": { "a": 3, "b" : 1 } } }""" ,
3579+ source =
3580+ """ {"key": "A", "map": { "key": { "a": 1, "b" : "2" } } }
3581+ {"key": "B", "map": { "key": { "a": 2, "b" : "2" } } }""" ,
3582+ targetSchema = new StructType ()
3583+ .add(" key" , StringType )
3584+ .add(" map" , MapType (
3585+ StringType ,
3586+ new StructType ().add(" a" , IntegerType ).add(" b" , IntegerType ))),
3587+ sourceSchema = new StructType ()
3588+ .add(" key" , StringType )
3589+ .add(" map" , MapType (
3590+ StringType ,
3591+ new StructType ().add(" a" , IntegerType ).add(" b" , StringType ))),
3592+ resultSchema = new StructType ()
3593+ .add(" key" , StringType )
3594+ .add(" map" , MapType (
3595+ StringType ,
3596+ new StructType ().add(" a" , IntegerType ).add(" b" , IntegerType ))),
3597+ clauses = update(" *" ) :: insert(" *" ) :: Nil ,
3598+ result =
3599+ """ {"key": "A", "map": { "key": { "a": 1, "b" : 2 } } }
3600+ {"key": "B", "map": { "key": { "a": 2, "b" : 2 } } }
3601+ {"key": "C", "map": { "key": { "a": 3, "b" : 1 } } }""" ,
3602+ resultWithoutEvolution =
3603+ """ {"key": "A", "map": { "key": { "a": 1, "b" : 2 } } }
3604+ {"key": "B", "map": { "key": { "a": 2, "b" : 2 } } }
3605+ {"key": "C", "map": { "key": { "a": 3, "b" : 1 } } }""" )
3606+
3607+
3608+ testNestedStructsEvolution(" source nested map struct value in different order" )(
3609+ target =
3610+ """ {"key": "A", "map": { "key": { "a" : 1, "b" : 1 } } }
3611+ {"key": "C", "map": { "key": { "a" : 3, "b" : 1 } } }""" ,
3612+ source =
3613+ """ {"key": "A", "map": { "key": { "b" : 2, "a" : 1, "c" : 3 } } }
3614+ {"key": "B", "map": { "key": { "b" : 2, "a" : 2, "c" : 4 } } }""" ,
3615+ targetSchema = new StructType ()
3616+ .add(" key" , StringType )
3617+ .add(" map" , MapType (
3618+ StringType ,
3619+ new StructType ().add(" a" , IntegerType ).add(" b" , IntegerType ))),
3620+ sourceSchema = new StructType ()
3621+ .add(" key" , StringType )
3622+ .add(" map" , MapType (
3623+ StringType ,
3624+ new StructType ().add(" a" , IntegerType ).add(" b" , IntegerType ).add(" c" , IntegerType ))),
3625+ resultSchema = new StructType ()
3626+ .add(" key" , StringType )
3627+ .add(" map" , MapType (
3628+ StringType ,
3629+ new StructType ().add(" a" , IntegerType ).add(" b" , IntegerType ).add(" c" , IntegerType ))),
3630+ clauses = update(" *" ) :: insert(" *" ) :: Nil ,
3631+ result =
3632+ """ {"key": "A", "map": { "key": { "a": 1, "b" : 2, "c" : 3 } } }
3633+ {"key": "B", "map": { "key": { "a": 2, "b" : 2, "c" : 4 } } }
3634+ {"key": "C", "map": { "key": { "a": 3, "b" : 1, "c" : null } } }""" ,
3635+ expectErrorWithoutEvolutionContains = " Cannot cast" )
3636+
3637+ testNestedStructsEvolution(" source map struct value to map array value" )(
3638+ target =
3639+ """ { "key": "A", "map": { "key": [ 1, 2 ] } }
3640+ { "key": "C", "map": { "key": [ 3, 4 ] } }""" ,
3641+ source =
3642+ """ { "key": "A", "map": { "key": { "a": 2 } } }
3643+ { "key": "B", "map": { "key": { "a": 1 } } }""" ,
3644+ targetSchema = new StructType ()
3645+ .add(" key" , StringType )
3646+ .add(" map" , MapType (
3647+ StringType ,
3648+ ArrayType (IntegerType ))),
3649+ sourceSchema = new StructType ()
3650+ .add(" key" , StringType )
3651+ .add(" map" , MapType (
3652+ StringType ,
3653+ new StructType ().add(" a" , IntegerType ))),
3654+ clauses = update(" *" ) :: insert(" *" ) :: Nil ,
3655+ expectErrorContains = " Failed to merge incompatible data types" ,
3656+ expectErrorWithoutEvolutionContains = " Cannot cast" )
3657+
3658+ testNestedStructsEvolution(" source struct nested in map array values contains more columns in different order" )(
3659+ target =
3660+ """ { "key": "A", "map": { "key": [ { "a": 1, "b": 2 } ] } }
3661+ { "key": "C", "map": { "key": [ { "a": 3, "b": 4 } ] } }""" ,
3662+ source =
3663+ """ { "key": "A", "map": { "key": [ { "b": 6, "c": 7, "a": 5 } ] } }
3664+ { "key": "B", "map": { "key": [ { "b": 9, "c": 10, "a": 8 } ] } }""" ,
3665+ targetSchema = new StructType ()
3666+ .add(" key" , StringType )
3667+ .add(" map" , MapType (
3668+ StringType ,
3669+ ArrayType (
3670+ new StructType ().add(" a" , IntegerType ).add(" b" , IntegerType )))),
3671+ sourceSchema = new StructType ()
3672+ .add(" key" , StringType )
3673+ .add(" map" , MapType (
3674+ StringType ,
3675+ ArrayType (
3676+ new StructType ().add(" a" , IntegerType ).add(" b" , IntegerType ).add(" c" , IntegerType )))),
3677+ resultSchema = new StructType ()
3678+ .add(" key" , StringType )
3679+ .add(" map" , MapType (
3680+ StringType ,
3681+ ArrayType (
3682+ new StructType ().add(" a" , IntegerType ).add(" b" , IntegerType ).add(" c" , IntegerType )))),
3683+ clauses = update(" *" ) :: insert(" *" ) :: Nil ,
3684+ result =
3685+ """ { "key": "A", "map": { "key": [ { "a": 5, "b": 6, "c": 7 } ] } }
3686+ { "key": "B", "map": { "key": [ { "a": 8, "b": 9, "c": 10 } ] } }
3687+ { "key": "C", "map": { "key": [ { "a": 3, "b": 4, "c": null } ] } }""" ,
3688+ expectErrorWithoutEvolutionContains = " Cannot cast" )
3689+
3690+ // Struct evolution inside of map keys.
3691+ testEvolution(" new source column in map struct key" )(
3692+ targetData = Seq ((1 , 2 , 3 , 4 ), (3 , 5 , 6 , 7 )).toDF(" key" , " a" , " b" , " value" )
3693+ .selectExpr(" key" , " map(named_struct('a', a, 'b', b), value) as x" ),
3694+ sourceData = Seq ((1 , 10 , 30 , 50 , 1 ), (2 , 20 , 40 , 60 , 2 )).toDF(" key" , " a" , " b" , " c" , " value" )
3695+ .selectExpr(" key" , " map(named_struct('a', a, 'b', b, 'c', c), value) as x" ),
3696+ clauses = update(" *" ) :: insert(" *" ) :: Nil ,
3697+ expected = Seq ((1 , 10 , 30 , 50 , 1 ), (2 , 20 , 40 , 60 , 2 ), (3 , 5 , 6 , null , 7 ))
3698+ .asInstanceOf [List [(Integer , Integer , Integer , Integer , Integer )]]
3699+ .toDF(" key" , " a" , " b" , " c" , " value" )
3700+ .selectExpr(" key" , " map(named_struct('a', a, 'b', b, 'c', c), value) as x" ),
3701+ expectErrorWithoutEvolutionContains = " Cannot cast"
3702+ )
3703+
3704+ testEvolution(" source nested map struct key contains less columns than target" )(
3705+ targetData = Seq ((1 , 2 , 3 , 4 , 5 ), (3 , 6 , 7 , 8 , 9 )).toDF(" key" , " a" , " b" , " c" , " value" )
3706+ .selectExpr(" key" , " map(named_struct('a', a, 'b', b, 'c', c), value) as x" ),
3707+ sourceData = Seq ((1 , 10 , 50 , 1 ), (2 , 20 , 60 , 2 )).toDF(" key" , " a" , " c" , " value" )
3708+ .selectExpr(" key" , " map(named_struct('a', a, 'c', c), value) as x" ),
3709+ clauses = update(" *" ) :: insert(" *" ) :: Nil ,
3710+ expected = Seq ((1 , 10 , null , 50 , 1 ), (2 , 20 , null , 60 , 2 ), (3 , 6 , 7 , 8 , 9 ))
3711+ .asInstanceOf [List [(Integer , Integer , Integer , Integer , Integer )]]
3712+ .toDF(" key" , " a" , " b" , " c" , " value" )
3713+ .selectExpr(" key" , " map(named_struct('a', a, 'b', b, 'c', c), value) as x" ),
3714+ expectErrorWithoutEvolutionContains = " Cannot cast"
3715+ )
3716+
3717+ testEvolution(" source nested map struct key contains different type than target" )(
3718+ targetData = Seq ((1 , 2 , 3 , 4 ), (3 , 5 , 6 , 7 )).toDF(" key" , " a" , " b" , " value" )
3719+ .selectExpr(" key" , " map(named_struct('a', a, 'b', b), value) as x" ),
3720+ sourceData = Seq ((1 , 10 , " 30" , 1 ), (2 , 20 , " 40" , 2 )).toDF(" key" , " a" , " b" , " value" )
3721+ .selectExpr(" key" , " map(named_struct('a', a, 'b', b), value) as x" ),
3722+ clauses = update(" *" ) :: insert(" *" ) :: Nil ,
3723+ expected = Seq ((1 , 10 , 30 , 1 ), (2 , 20 , 40 , 2 ), (3 , 5 , 6 , 7 ))
3724+ .asInstanceOf [List [(Integer , Integer , Integer , Integer )]]
3725+ .toDF(" key" , " a" , " b" , " value" )
3726+ .selectExpr(" key" , " map(named_struct('a', a, 'b', b), value) as x" ),
3727+ expectedWithoutEvolution = Seq ((1 , 10 , 30 , 1 ), (2 , 20 , 40 , 2 ), (3 , 5 , 6 , 7 ))
3728+ .asInstanceOf [List [(Integer , Integer , Integer , Integer )]]
3729+ .toDF(" key" , " a" , " b" , " value" )
3730+ .selectExpr(" key" , " map(named_struct('a', a, 'b', b), value) as x" )
3731+ )
3732+
3733+ testEvolution(" source nested map struct key in different order" )(
3734+ targetData = Seq ((1 , 2 , 3 , 4 ), (3 , 5 , 6 , 7 )).toDF(" key" , " a" , " b" , " value" )
3735+ .selectExpr(" key" , " map(named_struct('a', a, 'b', b), value) as x" ),
3736+ sourceData = Seq ((1 , 10 , 30 , 1 ), (2 , 20 , 40 , 2 )).toDF(" key" , " a" , " b" , " value" )
3737+ .selectExpr(" key" , " map(named_struct('b', b, 'a', a), value) as x" ),
3738+ clauses = update(" *" ) :: insert(" *" ) :: Nil ,
3739+ expected = Seq ((1 , 30 , 10 , 1 ), (2 , 40 , 20 , 2 ), (3 , 5 , 6 , 7 ))
3740+ .asInstanceOf [List [(Integer , Integer , Integer , Integer )]]
3741+ .toDF(" key" , " a" , " b" , " value" )
3742+ .selectExpr(" key" , " map(named_struct('a', a, 'b', b), value) as x" ),
3743+ expectedWithoutEvolution = Seq ((1 , 30 , 10 , 1 ), (2 , 40 , 20 , 2 ), (3 , 5 , 6 , 7 ))
3744+ .asInstanceOf [List [(Integer , Integer , Integer , Integer )]]
3745+ .toDF(" key" , " a" , " b" , " value" )
3746+ .selectExpr(" key" , " map(named_struct('a', a, 'b', b), value) as x" )
3747+ )
3748+
3749+ testEvolution(
3750+ " source struct nested in map array keys contains more columns" )(
3751+ targetData = Seq ((1 , 2 , 3 , 4 ), (3 , 5 , 6 , 7 )).toDF(" key" , " a" , " b" , " value" )
3752+ .selectExpr(" key" , " map(array(named_struct('a', a, 'b', b)), value) as x" ),
3753+ sourceData = Seq ((1 , 10 , 30 , 50 , 1 ), (2 , 20 , 40 , 60 , 2 )).toDF(" key" , " a" , " b" , " c" , " value" )
3754+ .selectExpr(" key" , " map(array(named_struct('a', a, 'b', b, 'c', c)), value) as x" ),
3755+ clauses = update(" *" ) :: insert(" *" ) :: Nil ,
3756+ expected = Seq ((1 , 10 , 30 , 50 , 1 ), (2 , 20 , 40 , 60 , 2 ), (3 , 5 , 6 , null , 7 ))
3757+ .asInstanceOf [List [(Integer , Integer , Integer , Integer , Integer )]]
3758+ .toDF(" key" , " a" , " b" , " c" , " value" )
3759+ .selectExpr(" key" , " map(array(named_struct('a', a, 'b', b, 'c', c)), value) as x" ),
3760+ expectErrorWithoutEvolutionContains = " cannot cast"
3761+ )
3762+
3763+ testEvolution(" struct evolution in both map keys and values" )(
3764+ targetData = Seq ((1 , 2 , 3 , 4 , 5 ), (3 , 6 , 7 , 8 , 9 )).toDF(" key" , " a" , " b" , " d" , " e" )
3765+ .selectExpr(" key" , " map(named_struct('a', a, 'b', b), named_struct('d', d, 'e', e)) as x" ),
3766+ sourceData = Seq ((1 , 10 , 30 , 50 , 70 , 90 , 110 ), (2 , 20 , 40 , 60 , 80 , 100 , 120 ))
3767+ .toDF(" key" , " a" , " b" , " c" , " d" , " e" , " f" )
3768+ .selectExpr(" key" , " map(named_struct('a', a, 'b', b, 'c', c), named_struct('d', d, 'e', e, 'f', f)) as x" ),
3769+ clauses = update(" *" ) :: insert(" *" ) :: Nil ,
3770+ expected = Seq ((1 , 10 , 30 , 50 , 70 , 90 , 110 ), (2 , 20 , 40 , 60 , 80 , 100 , 120 ), (3 , 6 , 7 , null , 8 , 9 , null ))
3771+ .asInstanceOf [List [(Integer , Integer , Integer , Integer , Integer , Integer , Integer )]]
3772+ .toDF(" key" , " a" , " b" , " c" , " d" , " e" , " f" )
3773+ .selectExpr(" key" , " map(named_struct('a', a, 'b', b, 'c', c), named_struct('d', d, 'e', e, 'f', f)) as x" ),
3774+ expectErrorWithoutEvolutionContains = " cannot cast"
3775+ )
34573776 // scalastyle:on line.size.limit
34583777
34593778 testEvolution(" new column with update * and insert non-*" )(
0 commit comments