Skip to content

Commit 0f21d20

Browse files
Added transformation to filter records based on items in fields. Fixes jcustenborder#30. (jcustenborder#31)
1 parent 189cb8a commit 0f21d20

File tree

4 files changed

+280
-29
lines changed

4 files changed

+280
-29
lines changed

pom.xml

Lines changed: 15 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
<parent>
2424
<groupId>com.github.jcustenborder.kafka.connect</groupId>
2525
<artifactId>kafka-connect-parent</artifactId>
26-
<version>1.1.0-cp3</version>
26+
<version>2.0.0</version>
2727
</parent>
2828
<artifactId>kafka-connect-transform-common</artifactId>
2929
<version>0.1.0-SNAPSHOT</version>
@@ -82,34 +82,20 @@
8282
<plugin>
8383
<groupId>io.confluent</groupId>
8484
<artifactId>kafka-connect-maven-plugin</artifactId>
85-
<version>0.9.0</version>
86-
<executions>
87-
<execution>
88-
<goals>
89-
<goal>kafka-connect</goal>
90-
</goals>
91-
<configuration>
92-
<ownerUsername>jcustenborder</ownerUsername>
93-
<confluentControlCenterIntegration>true</confluentControlCenterIntegration>
94-
<documentationUrl>
95-
https://jcustenborder.github.io/kafka-connect-documentation/
96-
</documentationUrl>
97-
<ownerName>Jeremy Custenborder</ownerName>
98-
<dockerNamespace>jcustenborder</dockerNamespace>
99-
<dockerName>kafka-connect-docker</dockerName>
100-
<pluginTypes>
101-
<pluginType>transform</pluginType>
102-
</pluginTypes>
103-
<tags>
104-
<tag>Transformation</tag>
105-
</tags>
106-
<title>Common Transformations</title>
107-
<supportUrl>${pom.issueManagement.url}</supportUrl>
108-
<supportSummary>Support provided through community involvement.
109-
</supportSummary>
110-
</configuration>
111-
</execution>
112-
</executions>
85+
<configuration>
86+
<confluentControlCenterIntegration>true</confluentControlCenterIntegration>
87+
<documentationUrl>https://jcustenborder.github.io/kafka-connect-documentation/</documentationUrl>
88+
<componentTypes>
89+
<componentType>source</componentType>
90+
</componentTypes>
91+
<tags>
92+
<tag>Twitter</tag>
93+
<tag>Social</tag>
94+
</tags>
95+
<title>Kafka Connect Common Transformations</title>
96+
<supportUrl>${pom.issueManagement.url}</supportUrl>
97+
<supportSummary>Support provided through community involvement.</supportSummary>
98+
</configuration>
11399
</plugin>
114100
</plugins>
115101
</build>
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/**
2+
* Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.github.jcustenborder.kafka.connect.transform.common;
17+
18+
import com.github.jcustenborder.kafka.connect.utils.config.Description;
19+
import com.github.jcustenborder.kafka.connect.utils.config.DocumentationTip;
20+
import com.github.jcustenborder.kafka.connect.utils.config.Title;
21+
import org.apache.kafka.common.config.ConfigDef;
22+
import org.apache.kafka.connect.connector.ConnectRecord;
23+
import org.apache.kafka.connect.data.Field;
24+
import org.apache.kafka.connect.data.Schema;
25+
import org.apache.kafka.connect.data.SchemaAndValue;
26+
import org.apache.kafka.connect.data.Struct;
27+
import org.apache.kafka.connect.transforms.Transformation;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import java.util.Map;
32+
import java.util.regex.Matcher;
33+
34+
public abstract class PatternFilter<R extends ConnectRecord<R>> implements Transformation<R> {
35+
private static final Logger log = LoggerFactory.getLogger(PatternFilter.class);
36+
37+
@Override
38+
public ConfigDef config() {
39+
return PatternFilterConfig.config();
40+
}
41+
42+
PatternFilterConfig config;
43+
44+
@Override
45+
public void configure(Map<String, ?> settings) {
46+
this.config = new PatternFilterConfig(settings);
47+
}
48+
49+
@Override
50+
public void close() {
51+
52+
}
53+
54+
R filter(R record, Struct struct) {
55+
for (Field field : struct.schema().fields()) {
56+
if (this.config.fields.contains(field.name())) {
57+
if (field.schema().type() == Schema.Type.STRING) {
58+
String input = struct.getString(field.name());
59+
if (null != input) {
60+
Matcher matcher = this.config.pattern.matcher(input);
61+
if (matcher.matches()) {
62+
return null;
63+
}
64+
}
65+
}
66+
}
67+
}
68+
return record;
69+
}
70+
71+
R filter(R record, Map map) {
72+
for (Object field : map.keySet()) {
73+
if (this.config.fields.contains(field)) {
74+
Object value = map.get(field);
75+
76+
if (value instanceof String) {
77+
String input = (String) value;
78+
Matcher matcher = this.config.pattern.matcher(input);
79+
if (matcher.matches()) {
80+
return null;
81+
}
82+
}
83+
}
84+
}
85+
86+
return record;
87+
}
88+
89+
90+
R filter(R record, final boolean key) {
91+
final SchemaAndValue input = key ?
92+
new SchemaAndValue(record.keySchema(), record.key()) :
93+
new SchemaAndValue(record.valueSchema(), record.value());
94+
final R result;
95+
if (input.schema() != null) {
96+
if (Schema.Type.STRUCT == input.schema().type()) {
97+
result = filter(record, (Struct) input.value());
98+
} else if (Schema.Type.MAP == input.schema().type()) {
99+
result = filter(record, (Map) input.value());
100+
} else {
101+
result = record;
102+
}
103+
} else if (input.value() instanceof Map) {
104+
result = filter(record, (Map) input.value());
105+
} else {
106+
result = record;
107+
}
108+
109+
return result;
110+
}
111+
112+
@Title("PatternFilter(Key)")
113+
@Description("This transformation is used to filter records based on a regular expression.")
114+
@DocumentationTip("This transformation is used to filter records based on fields in the Key of the record.")
115+
public static class Key<R extends ConnectRecord<R>> extends PatternFilter<R> {
116+
@Override
117+
public R apply(R r) {
118+
return filter(r, true);
119+
}
120+
}
121+
122+
@Title("PatternFilter(Value)")
123+
@Description("This transformation is used to filter records based on a regular expression.")
124+
@DocumentationTip("This transformation is used to filter records based on fields in the Value of the record.")
125+
public static class Value<R extends ConnectRecord<R>> extends PatternFilter<R> {
126+
@Override
127+
public R apply(R r) {
128+
return filter(r, false);
129+
}
130+
}
131+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/**
2+
* Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.github.jcustenborder.kafka.connect.transform.common;
17+
18+
import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder;
19+
import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils;
20+
import com.github.jcustenborder.kafka.connect.utils.config.validators.Validators;
21+
import org.apache.kafka.common.config.AbstractConfig;
22+
import org.apache.kafka.common.config.ConfigDef;
23+
24+
import java.util.Collections;
25+
import java.util.HashSet;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.Set;
29+
import java.util.regex.Pattern;
30+
31+
public class PatternFilterConfig extends AbstractConfig {
32+
public final Pattern pattern;
33+
public final Set<String> fields;
34+
35+
public static final String PATTERN_CONFIG = "pattern";
36+
public static final String PATTERN_DOC = "The regex to test the message with. ";
37+
38+
public static final String FIELD_CONFIG = "fields";
39+
public static final String FIELD_DOC = "The fields to transform.";
40+
41+
42+
public PatternFilterConfig(Map<String, ?> settings) {
43+
super(config(), settings);
44+
this.pattern = ConfigUtils.pattern(this, PATTERN_CONFIG);
45+
List<String> fields = getList(FIELD_CONFIG);
46+
this.fields = new HashSet<>(fields);
47+
}
48+
49+
public static ConfigDef config() {
50+
return new ConfigDef()
51+
.define(
52+
ConfigKeyBuilder.of(PATTERN_CONFIG, ConfigDef.Type.STRING)
53+
.documentation(PATTERN_DOC)
54+
.importance(ConfigDef.Importance.HIGH)
55+
.validator(Validators.pattern())
56+
.build()
57+
).define(
58+
ConfigKeyBuilder.of(FIELD_CONFIG, ConfigDef.Type.LIST)
59+
.documentation(FIELD_DOC)
60+
.defaultValue(Collections.emptyList())
61+
.importance(ConfigDef.Importance.HIGH)
62+
.build()
63+
);
64+
}
65+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package com.github.jcustenborder.kafka.connect.transform.common;
2+
3+
import com.google.common.collect.ImmutableMap;
4+
import org.apache.kafka.connect.data.Schema;
5+
import org.apache.kafka.connect.data.SchemaBuilder;
6+
import org.apache.kafka.connect.data.Struct;
7+
import org.apache.kafka.connect.sink.SinkRecord;
8+
import org.junit.jupiter.api.BeforeEach;
9+
import org.junit.jupiter.api.Test;
10+
11+
import static org.junit.jupiter.api.Assertions.assertNotNull;
12+
import static org.junit.jupiter.api.Assertions.assertNull;
13+
14+
public class PatternFilterTest {
15+
public PatternFilter.Value transform;
16+
17+
@BeforeEach
18+
public void before() {
19+
this.transform = new PatternFilter.Value();
20+
this.transform.configure(
21+
ImmutableMap.of(
22+
PatternFilterConfig.FIELD_CONFIG, "input",
23+
PatternFilterConfig.PATTERN_CONFIG, "^filter$"
24+
)
25+
);
26+
}
27+
28+
SinkRecord map(String value) {
29+
return new SinkRecord(
30+
"asdf",
31+
1,
32+
null,
33+
null,
34+
null,
35+
ImmutableMap.of("input", value),
36+
1234L
37+
);
38+
}
39+
40+
SinkRecord struct(String value) {
41+
Schema schema = SchemaBuilder.struct()
42+
.field("input", Schema.STRING_SCHEMA)
43+
.build();
44+
Struct struct = new Struct(schema)
45+
.put("input", value);
46+
return new SinkRecord(
47+
"asdf",
48+
1,
49+
null,
50+
null,
51+
schema,
52+
struct,
53+
1234L
54+
);
55+
}
56+
57+
@Test
58+
public void filtered() {
59+
assertNull(this.transform.apply(struct("filter")));
60+
assertNull(this.transform.apply(map("filter")));
61+
}
62+
63+
@Test
64+
public void notFiltered() {
65+
assertNotNull(this.transform.apply(struct("ok")));
66+
assertNotNull(this.transform.apply(map("ok")));
67+
}
68+
69+
}

0 commit comments

Comments
 (0)