Skip to content

Commit 79b9e91

Browse files
Added transformation to set a field to the current system timestamp. Fixes jcustenborder#57 (jcustenborder#58)
* Added transformation to set a field to the current system timestamp. Fixes jcustenborder#57 * Added example for TimestampNow.
1 parent 134c95d commit 79b9e91

File tree

4 files changed

+406
-0
lines changed

4 files changed

+406
-0
lines changed
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/**
2+
* Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.github.jcustenborder.kafka.connect.transform.common;
17+
18+
import com.github.jcustenborder.kafka.connect.utils.config.Description;
19+
import com.github.jcustenborder.kafka.connect.utils.config.Title;
20+
import com.github.jcustenborder.kafka.connect.utils.data.SchemaBuilders;
21+
import com.github.jcustenborder.kafka.connect.utils.transformation.BaseKeyValueTransformation;
22+
import org.apache.kafka.common.config.ConfigDef;
23+
import org.apache.kafka.common.utils.Time;
24+
import org.apache.kafka.connect.connector.ConnectRecord;
25+
import org.apache.kafka.connect.data.Field;
26+
import org.apache.kafka.connect.data.Schema;
27+
import org.apache.kafka.connect.data.SchemaAndValue;
28+
import org.apache.kafka.connect.data.SchemaBuilder;
29+
import org.apache.kafka.connect.data.Struct;
30+
import org.apache.kafka.connect.data.Timestamp;
31+
32+
import java.util.Collection;
33+
import java.util.Date;
34+
import java.util.HashMap;
35+
import java.util.LinkedHashMap;
36+
import java.util.Map;
37+
import java.util.stream.Collectors;
38+
39+
@Title("TimestampNowField")
40+
@Description("This transformation is used to set a field with the current timestamp of the system running the " +
41+
"transformation.")
42+
public abstract class TimestampNowField<R extends ConnectRecord<R>> extends BaseKeyValueTransformation<R> {
43+
private TimestampNowFieldConfig config;
44+
Time time = Time.SYSTEM;
45+
46+
protected TimestampNowField(boolean isKey) {
47+
super(isKey);
48+
}
49+
50+
public static class Key<R extends ConnectRecord<R>> extends TimestampNowField<R> {
51+
public Key() {
52+
super(true);
53+
}
54+
}
55+
56+
public static class Value<R extends ConnectRecord<R>> extends TimestampNowField<R> {
57+
public Value() {
58+
super(false);
59+
}
60+
}
61+
62+
@Override
63+
public void close() {
64+
65+
}
66+
67+
Map<Schema, Schema> schemaCache = new HashMap<>();
68+
69+
static boolean isTimestampSchema(Schema schema) {
70+
return (Timestamp.SCHEMA.type() == schema.type() && Timestamp.SCHEMA.name().equals(schema.name()));
71+
}
72+
73+
@Override
74+
protected SchemaAndValue processStruct(R record, Schema inputSchema, Struct input) {
75+
Date timestamp = new Date(this.time.milliseconds());
76+
77+
Schema outputSchema = schemaCache.computeIfAbsent(inputSchema, schema -> {
78+
Collection<String> replaceFields = schema.fields().stream()
79+
.filter(f -> this.config.fields.contains(f.name()))
80+
.filter(f -> !isTimestampSchema(f.schema()))
81+
.map(Field::name)
82+
.collect(Collectors.toList());
83+
SchemaBuilder builder = SchemaBuilders.of(schema, replaceFields);
84+
this.config.fields.forEach(timestampField -> {
85+
Field existingField = builder.field(timestampField);
86+
if (null == existingField) {
87+
builder.field(timestampField, Timestamp.SCHEMA);
88+
}
89+
});
90+
return builder.build();
91+
});
92+
93+
Struct output = new Struct(outputSchema);
94+
inputSchema.fields().stream()
95+
.filter(f -> !this.config.fields.contains(f.name()))
96+
.forEach(f -> output.put(f.name(), input.get(f.name())));
97+
this.config.fields.forEach(field -> output.put(field, timestamp));
98+
return new SchemaAndValue(outputSchema, output);
99+
}
100+
101+
@Override
102+
protected SchemaAndValue processMap(R record, Map<String, Object> input) {
103+
Map<String, Object> result = new LinkedHashMap<>(input);
104+
Date timestamp = new Date(this.time.milliseconds());
105+
this.config.fields.forEach(field -> result.put(field, timestamp));
106+
return new SchemaAndValue(null, result);
107+
}
108+
109+
@Override
110+
public void configure(Map<String, ?> settings) {
111+
this.config = new TimestampNowFieldConfig(settings);
112+
}
113+
114+
@Override
115+
public ConfigDef config() {
116+
return TimestampNowFieldConfig.config();
117+
}
118+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/**
2+
* Copyright © 2017 Jeremy Custenborder (jcustenborder@gmail.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.github.jcustenborder.kafka.connect.transform.common;
17+
18+
import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder;
19+
import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils;
20+
import org.apache.kafka.common.config.AbstractConfig;
21+
import org.apache.kafka.common.config.ConfigDef;
22+
23+
import java.util.Map;
24+
import java.util.Set;
25+
26+
class TimestampNowFieldConfig extends AbstractConfig {
27+
public static final String FIELDS_CONF = "fields";
28+
public static final String FIELDS_DOC = "The field(s) that will be inserted with the timestamp of the system.";
29+
30+
public final Set<String> fields;
31+
32+
public TimestampNowFieldConfig(Map<?, ?> originals) {
33+
super(config(), originals);
34+
this.fields = ConfigUtils.getSet(this, FIELDS_CONF);
35+
}
36+
37+
public static ConfigDef config() {
38+
return new ConfigDef()
39+
.define(
40+
ConfigKeyBuilder.of(FIELDS_CONF, ConfigDef.Type.LIST)
41+
.documentation(FIELDS_DOC)
42+
.importance(ConfigDef.Importance.HIGH)
43+
.build()
44+
);
45+
}
46+
}
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
package com.github.jcustenborder.kafka.connect.transform.common;
2+
3+
import com.google.common.collect.ImmutableMap;
4+
import org.apache.kafka.common.utils.Time;
5+
import org.apache.kafka.connect.data.Schema;
6+
import org.apache.kafka.connect.data.SchemaBuilder;
7+
import org.apache.kafka.connect.data.Struct;
8+
import org.apache.kafka.connect.data.Timestamp;
9+
import org.apache.kafka.connect.sink.SinkRecord;
10+
import org.junit.jupiter.api.BeforeEach;
11+
import org.junit.jupiter.api.Test;
12+
13+
import java.util.Date;
14+
import java.util.Map;
15+
16+
import static com.github.jcustenborder.kafka.connect.utils.AssertStruct.assertStruct;
17+
import static org.junit.jupiter.api.Assertions.assertEquals;
18+
import static org.junit.jupiter.api.Assertions.assertNotNull;
19+
import static org.junit.jupiter.api.Assertions.assertTrue;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.when;
22+
23+
public class TimestampNowFieldTest {
24+
25+
TimestampNowField<SinkRecord> transformation;
26+
Date timestamp = new Date(1586963336123L);
27+
28+
@BeforeEach
29+
public void beforeEach() {
30+
this.transformation = new TimestampNowField.Value<>();
31+
Date timestamp = new Date(1586963336123L);
32+
Time time = mock(Time.class);
33+
when(time.milliseconds()).thenReturn(timestamp.getTime());
34+
this.transformation.time = time;
35+
this.transformation.configure(
36+
ImmutableMap.of(TimestampNowFieldConfig.FIELDS_CONF, "timestamp")
37+
);
38+
}
39+
@BeforeEach
40+
public void afterEach() {
41+
this.transformation.close();
42+
}
43+
44+
@Test
45+
public void structFieldMissing() {
46+
final Schema inputSchema = SchemaBuilder.struct()
47+
.name("something")
48+
.field("firstName", Schema.STRING_SCHEMA)
49+
.field("lastName", Schema.STRING_SCHEMA)
50+
.build();
51+
final Schema expectedSchema = SchemaBuilder.struct()
52+
.name("something")
53+
.field("firstName", Schema.STRING_SCHEMA)
54+
.field("lastName", Schema.STRING_SCHEMA)
55+
.field("timestamp", Timestamp.SCHEMA)
56+
.build();
57+
final Struct inputStruct = new Struct(inputSchema)
58+
.put("firstName", "example")
59+
.put("lastName", "user");
60+
final Struct expectedStruct = new Struct(expectedSchema)
61+
.put("firstName", "example")
62+
.put("lastName", "user")
63+
.put("timestamp", timestamp);
64+
final SinkRecord input = new SinkRecord(
65+
"test",
66+
1,
67+
null,
68+
null,
69+
inputSchema,
70+
inputStruct,
71+
1234L
72+
);
73+
final SinkRecord output = this.transformation.apply(input);
74+
assertNotNull(output, "output should not be null.");
75+
assertTrue(output.value() instanceof Struct, "value should be a struct");
76+
final Struct actualStruct = (Struct) output.value();
77+
assertStruct(expectedStruct, actualStruct);
78+
}
79+
@Test
80+
public void structFieldExists() {
81+
final Schema inputSchema = SchemaBuilder.struct()
82+
.name("something")
83+
.field("firstName", Schema.STRING_SCHEMA)
84+
.field("lastName", Schema.STRING_SCHEMA)
85+
.field("timestamp", Timestamp.SCHEMA)
86+
.build();
87+
final Schema expectedSchema = SchemaBuilder.struct()
88+
.name("something")
89+
.field("firstName", Schema.STRING_SCHEMA)
90+
.field("lastName", Schema.STRING_SCHEMA)
91+
.field("timestamp", Timestamp.SCHEMA)
92+
.build();
93+
final Struct inputStruct = new Struct(inputSchema)
94+
.put("firstName", "example")
95+
.put("lastName", "user");
96+
final Struct expectedStruct = new Struct(expectedSchema)
97+
.put("firstName", "example")
98+
.put("lastName", "user")
99+
.put("timestamp", timestamp);
100+
final SinkRecord input = new SinkRecord(
101+
"test",
102+
1,
103+
null,
104+
null,
105+
inputSchema,
106+
inputStruct,
107+
1234L
108+
);
109+
final SinkRecord output = this.transformation.apply(input);
110+
assertNotNull(output, "output should not be null.");
111+
assertTrue(output.value() instanceof Struct, "value should be a struct");
112+
final Struct actualStruct = (Struct) output.value();
113+
assertStruct(expectedStruct, actualStruct);
114+
}
115+
@Test
116+
public void structFieldMismatch() {
117+
final Schema inputSchema = SchemaBuilder.struct()
118+
.name("something")
119+
.field("firstName", Schema.STRING_SCHEMA)
120+
.field("lastName", Schema.STRING_SCHEMA)
121+
.field("timestamp", Schema.STRING_SCHEMA)
122+
.build();
123+
final Schema expectedSchema = SchemaBuilder.struct()
124+
.name("something")
125+
.field("firstName", Schema.STRING_SCHEMA)
126+
.field("lastName", Schema.STRING_SCHEMA)
127+
.field("timestamp", Timestamp.SCHEMA)
128+
.build();
129+
final Struct inputStruct = new Struct(inputSchema)
130+
.put("firstName", "example")
131+
.put("lastName", "user");
132+
final Struct expectedStruct = new Struct(expectedSchema)
133+
.put("firstName", "example")
134+
.put("lastName", "user")
135+
.put("timestamp", timestamp);
136+
final SinkRecord input = new SinkRecord(
137+
"test",
138+
1,
139+
null,
140+
null,
141+
inputSchema,
142+
inputStruct,
143+
1234L
144+
);
145+
final SinkRecord output = this.transformation.apply(input);
146+
assertNotNull(output, "output should not be null.");
147+
assertTrue(output.value() instanceof Struct, "value should be a struct");
148+
final Struct actualStruct = (Struct) output.value();
149+
assertStruct(expectedStruct, actualStruct);
150+
}
151+
152+
@Test
153+
public void mapFieldMissing() {
154+
final Map<String, Object> expected = ImmutableMap.of(
155+
"firstName", "example", "lastName", "user", "timestamp", timestamp
156+
);
157+
final SinkRecord input = new SinkRecord(
158+
"test",
159+
1,
160+
null,
161+
null,
162+
null,
163+
ImmutableMap.of("firstName", "example", "lastName", "user"),
164+
1234L
165+
);
166+
final SinkRecord output = this.transformation.apply(input);
167+
assertNotNull(output, "output should not be null.");
168+
assertTrue(output.value() instanceof Map, "value should be a struct");
169+
final Map<String, Object> actual = (Map<String, Object>) output.value();
170+
assertEquals(expected, actual);
171+
}
172+
173+
@Test
174+
public void config() {
175+
assertNotNull(this.transformation.config());
176+
}
177+
178+
}

0 commit comments

Comments
 (0)