11package seko .kafka .connect .transformer .python ;
22
3- import org .apache .kafka .common .config .ConfigDef ;
43import org .apache .kafka .connect .connector .ConnectRecord ;
5- import org .apache .kafka .connect .transforms .Transformation ;
6- import org .apache .kafka .connect .transforms .util .Requirements ;
7- import org .apache .kafka .connect .transforms .util .SimpleConfig ;
84import org .python .core .PyCode ;
95import org .python .core .PyObject ;
106import org .python .util .PythonInterpreter ;
117import org .slf4j .Logger ;
128import org .slf4j .LoggerFactory ;
9+ import seko .kafka .connect .transformer .script .AbstractScriptTransformer ;
1310
1411import java .util .ArrayList ;
1512import java .util .List ;
1613import java .util .Map ;
1714import java .util .Optional ;
1815
19- import static org .apache .kafka .common .config .ConfigDef .Importance .MEDIUM ;
20- import static org .apache .kafka .common .config .ConfigDef .NO_DEFAULT_VALUE ;
21- import static org .apache .kafka .common .config .ConfigDef .Type .STRING ;
22- import static seko .kafka .connect .transformer .script .configs .Configuration .KEY_SCRIPT_CONFIG ;
23- import static seko .kafka .connect .transformer .script .configs .Configuration .VALUE_SCRIPT_CONFIG ;
24-
25- public class PythonTransformer <R extends ConnectRecord <R >> implements Transformation <R > {
26- private static final String PURPOSE = "field extraction" ;
27- private static final ConfigDef CONFIG_DEF = new ConfigDef ()
28- .define (KEY_SCRIPT_CONFIG , STRING , NO_DEFAULT_VALUE , MEDIUM , "Field name to extract." )
29- .define (VALUE_SCRIPT_CONFIG , STRING , NO_DEFAULT_VALUE , MEDIUM , "Format extracted field." );
30-
31-
16+ public class PythonTransformer <R extends ConnectRecord <R >> extends AbstractScriptTransformer {
3217 private static final Logger log = LoggerFactory .getLogger (PythonTransformer .class );
3318
34- private String valueScript ;
35- private String keyScript ;
36-
37- @ Override
38- public R apply (R record ) {
39- Map <String , Object > key = Requirements .requireMapOrNull (record .key (), PURPOSE );
40-
41- if (keyScript != null && key != null ) {
42- key = transform (key , keyScript );
43- }
4419
45- Map <String , Object > value = Requirements .requireMapOrNull (record .value (), PURPOSE );
46- if (valueScript != null && value != null ) {
47- value = transform (value , valueScript );
48- }
49-
50- return newRecord (record , key , value );
51- }
52-
53- private Map <String , Object > transform (Map <String , Object > source , String script ) {
20+ public Map <String , Object > transform (Map <String , Object > source , String script ) {
5421 PythonInterpreter interpreter = new PythonInterpreter ();
5522 interpreter .set ("source" , source );
5623
@@ -69,35 +36,4 @@ private Map<String, Object> transform(Map<String, Object> source, String script)
6936 }
7037 }
7138
72- @ Override
73- public ConfigDef config () {
74- return CONFIG_DEF ;
75- }
76-
77-
78- private R newRecord (R record , Map <String , Object > newKey , Map <String , Object > newValue ) {
79- Object key = newKey == null ? record .key () : newKey ;
80- Object value = newValue == null ? record .value () : newValue ;
81-
82- return record .newRecord (record .topic (), record .kafkaPartition (), record .keySchema (),
83- key , record .valueSchema (), value , record .timestamp ());
84- }
85-
86- @ Override
87- public void close () {
88-
89- }
90-
91- @ Override
92- public void configure (Map <String , ?> configs ) {
93- SimpleConfig config = new SimpleConfig (CONFIG_DEF , configs );
94- String keyScript = config .getString (KEY_SCRIPT_CONFIG );
95- if (keyScript != null && !keyScript .trim ().isEmpty ()) {
96- this .keyScript = keyScript ;
97- }
98- String valueScript = config .getString (VALUE_SCRIPT_CONFIG );
99- if (valueScript != null && !valueScript .trim ().isEmpty ()) {
100- this .valueScript = valueScript ;
101- }
102- }
10339}
0 commit comments