20
20
from typing import List , Optional , Set , Dict , Any
21
21
22
22
from confluent_kafka .schema_registry import RegisteredSchema
23
+ from confluent_kafka .schema_registry .common .schema_registry_client import \
24
+ RulePhase
23
25
from confluent_kafka .schema_registry .common .serde import ErrorAction , \
24
26
FieldTransformer , Migration , NoneAction , RuleAction , \
25
27
RuleConditionError , RuleContext , RuleError , SchemaId
@@ -59,6 +61,17 @@ def _execute_rules(
59
61
source : Optional [Schema ], target : Optional [Schema ],
60
62
message : Any , inline_tags : Optional [Dict [str , Set [str ]]],
61
63
field_transformer : Optional [FieldTransformer ]
64
+ ) -> Any :
65
+ return self ._execute_rules_with_phase (
66
+ ser_ctx , subject , RulePhase .DOMAIN , rule_mode ,
67
+ source , target , message , inline_tags , field_transformer )
68
+
69
+ def _execute_rules_with_phase (
70
+ self , ser_ctx : SerializationContext , subject : str ,
71
+ rule_phase : RulePhase , rule_mode : RuleMode ,
72
+ source : Optional [Schema ], target : Optional [Schema ],
73
+ message : Any , inline_tags : Optional [Dict [str , Set [str ]]],
74
+ field_transformer : Optional [FieldTransformer ]
62
75
) -> Any :
63
76
if message is None or target is None :
64
77
return message
@@ -73,7 +86,10 @@ def _execute_rules(
73
86
rules .reverse ()
74
87
else :
75
88
if target is not None and target .rule_set is not None :
76
- rules = target .rule_set .domain_rules
89
+ if rule_phase == RulePhase .ENCODING :
90
+ rules = target .rule_set .encoding_rules
91
+ else :
92
+ rules = target .rule_set .domain_rules
77
93
if rule_mode == RuleMode .READ :
78
94
# Execute read rules in reverse order for symmetry
79
95
rules = rules [:] if rules else []
@@ -197,19 +213,25 @@ async def _get_writer_schema(
197
213
else :
198
214
raise SerializationError ("Schema ID or GUID is not set" )
199
215
200
- def _has_rules (self , rule_set : RuleSet , mode : RuleMode ) -> bool :
216
+ def _has_rules (self , rule_set : RuleSet , phase : RulePhase , mode : RuleMode ) -> bool :
201
217
if rule_set is None :
202
218
return False
219
+ if phase == RulePhase .MIGRATION :
220
+ rules = rule_set .migration_rules
221
+ elif phase == RulePhase .DOMAIN :
222
+ rules = rule_set .domain_rules
223
+ elif phase == RulePhase .ENCODING :
224
+ rules = rule_set .encoding_rules
203
225
if mode in (RuleMode .UPGRADE , RuleMode .DOWNGRADE ):
204
226
return any (rule .mode == mode or rule .mode == RuleMode .UPDOWN
205
- for rule in rule_set . migration_rules or [])
227
+ for rule in rules or [])
206
228
elif mode == RuleMode .UPDOWN :
207
- return any (rule .mode == mode for rule in rule_set . migration_rules or [])
229
+ return any (rule .mode == mode for rule in rules or [])
208
230
elif mode in (RuleMode .WRITE , RuleMode .READ ):
209
231
return any (rule .mode == mode or rule .mode == RuleMode .WRITEREAD
210
- for rule in rule_set . domain_rules or [])
232
+ for rule in rules or [])
211
233
elif mode == RuleMode .WRITEREAD :
212
- return any (rule .mode == mode for rule in rule_set . migration_rules or [])
234
+ return any (rule .mode == mode for rule in rules or [])
213
235
return False
214
236
215
237
async def _get_migrations (
@@ -235,7 +257,8 @@ async def _get_migrations(
235
257
if i == 0 :
236
258
previous = version
237
259
continue
238
- if version .schema .rule_set is not None and self ._has_rules (version .schema .rule_set , migration_mode ):
260
+ if version .schema .rule_set is not None and self ._has_rules (
261
+ version .schema .rule_set , RulePhase .MIGRATION , migration_mode ):
239
262
if migration_mode == RuleMode .UPGRADE :
240
263
migration = Migration (migration_mode , previous , version )
241
264
else :
@@ -265,7 +288,8 @@ def _execute_migrations(
265
288
migrations : List [Migration ], message : Any
266
289
) -> Any :
267
290
for migration in migrations :
268
- message = self ._execute_rules (ser_ctx , subject , migration .rule_mode ,
269
- migration .source .schema , migration .target .schema ,
270
- message , None , None )
291
+ message = self ._execute_rules_with_phase (
292
+ ser_ctx , subject , RulePhase .MIGRATION , migration .rule_mode ,
293
+ migration .source .schema , migration .target .schema ,
294
+ message , None , None )
271
295
return message
0 commit comments