Skip to content

Commit ef79768

Browse files
committed
Refactoring: Simplified stream process by not using global list anymore as placeholder
1 parent 02734da commit ef79768

File tree

1 file changed

+13
-17
lines changed

1 file changed

+13
-17
lines changed

src/main/java/com/github/megachucky/kafka/streams/machinelearning/Kafka_Streams_TensorFlow_Serving_gRPC_Example.java

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.FileInputStream;
2121
import java.io.InputStream;
22+
import java.util.Collections;
2223
import java.util.List;
2324
import java.util.Map;
2425
import java.util.Properties;
@@ -43,9 +44,6 @@ public class Kafka_Streams_TensorFlow_Serving_gRPC_Example {
4344
// Image path will be received from Kafka message to topic 'imageInputTopic'
4445
private static String imagePath = null;
4546

46-
// Prediction of the TensorFlow Image Recognition model
47-
private static List<Map.Entry<String, Double>> list = null;
48-
4947
public static void main(String[] args) throws Exception {
5048

5149
// Configure Kafka Streams Application
@@ -67,16 +65,13 @@ public static void main(String[] args) throws Exception {
6765

6866
// In the subsequent lines we define the processing topology of the Streams
6967
// application.
70-
// final KStreamBuilder builder = new KStreamBuilder();
7168
final StreamsBuilder builder = new StreamsBuilder();
7269

7370
// Construct a `KStream` from the input topic "ImageInputTopic", where
7471
// message values represent lines of text
7572
final KStream<String, String> imageInputLines = builder.stream(imageInputTopic);
7673

77-
// Stream Processor (in this case 'foreach' to add custom logic, i.e. apply the
78-
// analytic model)
79-
imageInputLines.foreach((key, value) -> {
74+
KStream<String, Object> transformedMessage = imageInputLines.mapValues(value -> {
8075

8176
System.out.println("Image path: " + value);
8277

@@ -88,22 +83,23 @@ public static void main(String[] args) throws Exception {
8883
InputStream jpegStream;
8984
try {
9085
jpegStream = new FileInputStream(imagePath);
91-
list = recogniser.recognise(jpegStream);
92-
System.out.println(list);
86+
87+
// Prediction of the TensorFlow Image Recognition model:
88+
List<Map.Entry<String, Double>> list = recogniser.recognise(jpegStream);
89+
String prediction = list.toString();
90+
System.out.println("Prediction: " + prediction);
9391
recogniser.close();
9492
jpegStream.close();
93+
94+
return prediction;
9595
} catch (Exception e) {
9696
e.printStackTrace();
97+
98+
return Collections.emptyList().toString();
9799
}
98100

99101
});
100102

101-
// airlineInputLines.print();
102-
103-
// Transform message: Add prediction information
104-
KStream<String, Object> transformedMessage = imageInputLines
105-
.mapValues(value -> "Prediction: What is the content of this picture? => " + list);
106-
107103
// Send prediction information to Output Topic
108104
transformedMessage.to(imageOutputTopic);
109105

@@ -117,8 +113,8 @@ public static void main(String[] args) throws Exception {
117113

118114
System.out.println("Image Recognition Microservice is running...");
119115

120-
System.out.println("Input images arrive at Kafka topic " + imageInputTopic + "; Output predictions going to Kafka topic "
121-
+ imageOutputTopic);
116+
System.out.println("Input images arrive at Kafka topic " + imageInputTopic
117+
+ "; Output predictions going to Kafka topic " + imageOutputTopic);
122118

123119
// Add shutdown hook to respond to SIGTERM and gracefully close Kafka
124120
// Streams

0 commit comments

Comments
 (0)