Skip to content

Commit 15fb57e

Browse files
committed
Upgraded Stream App to Kafka Streams API 1.1
1 parent 00e0b76 commit 15fb57e

File tree

1 file changed

+6
-5
lines changed

1 file changed

+6
-5
lines changed

src/main/java/com/github/megachucky/kafka/streams/machinelearning/Kafka_Streams_TensorFlow_Serving_gRPC_Example.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525

2626
import org.apache.kafka.common.serialization.Serdes;
2727
import org.apache.kafka.streams.KafkaStreams;
28+
import org.apache.kafka.streams.StreamsBuilder;
2829
import org.apache.kafka.streams.StreamsConfig;
2930
import org.apache.kafka.streams.kstream.KStream;
30-
import org.apache.kafka.streams.kstream.KStreamBuilder;
3131

3232
/**
3333
* @author Kai Waehner
@@ -62,12 +62,13 @@ public static void main(String[] args) throws Exception {
6262

6363
// Specify default (de)serializers for record keys and for record
6464
// values.
65-
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
66-
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
65+
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
66+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
6767

6868
// In the subsequent lines we define the processing topology of the Streams
6969
// application.
70-
final KStreamBuilder builder = new KStreamBuilder();
70+
// final KStreamBuilder builder = new KStreamBuilder();
71+
final StreamsBuilder builder = new StreamsBuilder();
7172

7273
// Construct a `KStream` from the input topic "ImageInputTopic", where
7374
// message values represent lines of text
@@ -108,7 +109,7 @@ public static void main(String[] args) throws Exception {
108109

109110
// Start Kafka Streams Application to process new incoming images from the Input
110111
// Topic
111-
final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
112+
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
112113

113114
streams.cleanUp();
114115

0 commit comments

Comments
 (0)