File tree Expand file tree Collapse file tree 4 files changed +8
-11
lines changed
data-pipeline/http-materializer
src/main/java/no/sysco/testing/kafka/pipeline/materializer/infrastructure Expand file tree Collapse file tree 4 files changed +8
-11
lines changed Original file line number Diff line number Diff line change 1212 <artifactId >http-materializer</artifactId >
1313
1414 <dependencies >
15+
1516 <!-- generated avro schema -->
1617 <dependency >
1718 <groupId >no.sysco.testing</groupId >
1819 <artifactId >schema</artifactId >
1920 </dependency >
21+
2022 <!-- config -->
2123 <dependency >
2224 <groupId >com.typesafe</groupId >
3032 <version >3.14.0</version >
3133 </dependency >
3234
33-
35+ <!-- kafka dependencies -->
3436 <dependency >
3537 <groupId >org.apache.kafka</groupId >
3638 <artifactId >kafka-streams</artifactId >
6264 <artifactId >slf4j-api</artifactId >
6365 <scope >test</scope >
6466 </dependency >
65-
6667 <dependency >
6768 <groupId >com.github.tomakehurst</groupId >
6869 <artifactId >wiremock-jre8</artifactId >
70+ <scope >test</scope >
6971 </dependency >
7072 <dependency >
7173 <groupId >org.awaitility</groupId >
Original file line number Diff line number Diff line change @@ -38,7 +38,7 @@ public KafkaMessageMaterializer(
3838 this .transformer = transformer ;
3939 this .sourceTopic = config .kafkaConfig .sourceTopic ;
4040
41- Properties properties = new Properties ();
41+ final Properties properties = new Properties ();
4242 properties .put (StreamsConfig .APPLICATION_ID_CONFIG , config .name +"steam-processing-v1" );
4343 properties .put (StreamsConfig .BOOTSTRAP_SERVERS_CONFIG , config .kafkaConfig .bootstrapServers );
4444 properties .put (AbstractKafkaAvroSerDeConfig .SCHEMA_REGISTRY_URL_CONFIG , config .kafkaConfig .schemaRegistryUrl );
@@ -69,11 +69,8 @@ static Topology topology(
6969 return builder .build ();
7070 }
7171
72- @ Override public void run () {
73- kafkaStreams .start ();
74- }
72+ @ Override public void run () { kafkaStreams .start (); }
7573 public void stop () { Optional .ofNullable (kafkaStreams ).ifPresent (KafkaStreams ::close ); }
76-
7774 public KafkaStreams .State getState () {
7875 return kafkaStreams .state ();
7976 }
Original file line number Diff line number Diff line change 1111import okhttp3 .RequestBody ;
1212import okhttp3 .Response ;
1313
14- // todo: proper logging
1514public class DatabaseWebServiceRest implements DatabaseWebService {
1615 private static final Logger log = Logger .getLogger (DatabaseWebServiceRest .class .getName ());
1716 private final String url ;
@@ -26,7 +25,7 @@ public DatabaseWebServiceRest(final MaterializerConfig applicationConfig) {
2625
2726 @ Override public void saveMessage (final MessageJsonRepresentation message ) {
2827 RequestBody body = RequestBody .create (JSON , message .json ());
29- Request request =
28+ Request request =
3029 new Request .Builder ()
3130 .addHeader ("Accept" , "application/json; charset=utf-8" )
3231 .addHeader ("Content-Type" , "application/json; charset=utf-8" )
@@ -41,7 +40,6 @@ public DatabaseWebServiceRest(final MaterializerConfig applicationConfig) {
4140 log .severe ("Request failed with status code: " +statusCode );
4241 throw new RuntimeException ("Request failed with status " + statusCode );
4342 }
44-
4543 log .info ("Response received successfully: " + statusCode );
4644 } catch (IOException e ) {
4745 e .printStackTrace ();
Original file line number Diff line number Diff line change 6868 <dependency >
6969 <groupId >com.typesafe</groupId >
7070 <artifactId >config</artifactId >
71- <version >1.3.2 </version >
71+ <version >1.3.3 </version >
7272 </dependency >
7373 <!-- kafka clients -->
7474 <dependency >
You can’t perform that action at this time.
0 commit comments