Stream Processing Live Traffic Data with Kafka Streams
Tim Ysewyn Principal Java Software Engineer Spring & Spring Cloud Contributor @TYsewyn Who are we Tom Van den Bulck Principal Java Software Engineer Competence Leader Fast & Big Data @tomvdbulck
Setup Environment http://bit.ly/Hands-on-Labs-Devoxx-2018
What http://bit.ly/Hands-on-Labs-Devoxx-2018
What http://bit.ly/Hands-on-Labs-Devoxx-2018
What: Event ● Data it owns ● Data it needs ● References data
What: Streaming ● Reacts on events ● Continuously
Why ● Much shorter feedback loop ● More resource efficient ● Stream processing feels more natural ● Decentralize and decouple infrastructure
The Data
The Data ● Every minute XML is generated ○ So it is not the raw data ● Be aware: ○ Dutch words
The Data ● XML with fixed sensor data ○ <meetpunt unieke_id="3640"> <beschrijvende_id>H291L10</beschrijvende_id> <volledige_naam>Parking Kruibeke</volledige_naam> <Ident_8>A0140002</Ident_8> <lve_nr>437</lve_nr> <Kmp_Rsys>94,695</Kmp_Rsys> <Rijstrook>R10</Rijstrook> <X_coord_EPSG_31370>144477,0917</X_coord_EPSG_31370> <Y_coord_EPSG_31370>208290,6237</Y_coord_EPSG_31370> <lengtegraad_EPSG_4326>4,289767347</lengtegraad_EPSG_4326> <breedtegraad_EPSG_4326>51,18458196</breedtegraad_EPSG_4326> </meetpunt>
The Data ● XML with dynamic traffic data ○ <meetpunt beschrijvende_id="H222L10" unieke_id="29"> <lve_nr>55</lve_nr> <tijd_waarneming>2018-11-03T14:43:00+01:00</tijd_waarneming> <tijd_laatst_gewijzigd>2018-11-03T14:44:24+01:00</tijd_laatst_gewijzigd> <actueel_publicatie>1</actueel_publicatie> <beschikbaar>1</beschikbaar>
The Data ● XML with dynamic traffic data ○ <meetdata klasse_id="4"> <verkeersintensiteit>2</verkeersintensiteit> <voertuigsnelheid_rekenkundig>60</voertuigsnelheid_rekenkundig> <voertuigsnelheid_harmonisch>59</voertuigsnelheid_harmonisch> </meetdata>
The Data ● XML with dynamic traffic data ○ /* Note: the vehicle class MOTO(1), does not provide reliable data. */ MOTO(1), CAR(2), CAMIONET(3), // a VAN RIGGID_LORRIES(4), TRUCK_OR_BUS(5), UNKNOWN(0);
The Data ● XML with dynamic traffic data ○ <meetdata klasse_id="3"> <verkeersintensiteit>0</verkeersintensiteit> <voertuigsnelheid_rekenkundig>0</voertuigsnelheid_rekenkundig> <voertuigsnelheid_harmonisch>252</voertuigsnelheid_harmonisch> </meetdata>
The Data ● Do not worry ● We translated it to simplified POJO ● TrafficEvent.java
The Data: Some Lessons ● Think about the language ● Think about the values you are going to output ○ 252 when no readings ○ 254 when an error occurred
How
How
Lab 1: Send events to Kafka ● Dependencies ○ spring-cloud-starter-stream-kafka ○ spring-cloud-stream-reactive ● Added @Scheduling ● Added @EnableBinding ● Added @StreamEmitter (spring-cloud-stream-reactive) ● Added @SendTo ● Properties: ○ spring.cloud.stream.bindings.output.destination=traffic-data
Lab 2: Intake of data from Kafka ● @EnableBinding ● @StreamListener(Source.INPUT) ● Properties: ○ spring.cloud.stream.bindings.input.destination=traffic-data
Native streaming: KStream
Native streaming: KTable
Native streaming operations: toStream
Native streaming operations: Stateless ● selectKey ● filter ● map/mapValues ● flatMap/flatMapValues ● peek ● forEach ● groupByKey ● toStream
Native streaming operations: filter
Native streaming operations: map
Native streaming operations: flatMap
Native streaming operations: peek
Native streaming operations: forEach
Lab 3: Stateless ● Dependencies ○ spring-cloud-stream-binder-kafka-streams ● Added custom interface: KStreamSink ● Methods used ○ .filter ○ .print ● Update configuration: ○ spring.cloud.stream.default-binder=kafka ○ spring.cloud.stream.bindings.native-input.binder=kstream
Native streaming operations: stateful ● groupByKey (still stateless) ● count ● aggregations ● joining ● windowing
Native streaming operations: groupByKey ● Groups records in KGroupedStream ● Required before aggregation operations ● Writes data to new topic (might repartition)
Native streaming operations: count
Native streaming operations: aggregations ● Transforms groupedKStream to Ktable ● Need Initializer: aggValue = 0 ● Operation: “adder”: aggValue + oldValue
Native streaming operations: joining
Lab 3: Stateful ● GroupByKey ○ Use of SerDe (StringSerde and JsonSerde) ● Methods used ○ .count ○ .toStream: Convert KTable to KStream
Windows ● Tumbling ● Sliding ● Session
Tumbling
Sliding
Session windows
Session windows ● Limited by an inactivity gap ● Be aware: the data you need to process might grow
Lab 4: Windows ● Methods used ○ .windowedBy ○ .aggregate ■ Use of aggregator class ■ Materialized with ○ .mapValues: convert records
Session windows: Traffic Congestion
Session windows: Traffic Congestion
Session windows: Traffic Congestion ● Merge results of all lanes ● If average speed < 50km => slow traffic ● To: slow-traffic-topic ● @Input slow-traffic-topic => session window with gap of 5 minutes ● Aggregate results: vehicle count ● To: vehicles-involved-in-traffic-jam ● Because the session window also has a start and end time ● => length of the traffic jam
Thank you for attending!

Stream Processing Live Traffic Data with Kafka Streams

  • 1.
    Stream Processing Live TrafficData with Kafka Streams
  • 2.
    Tim Ysewyn Principal Java SoftwareEngineer Spring & Spring Cloud Contributor @TYsewyn Who are we Tom Van den Bulck Principal Java Software Engineer Competence Leader Fast & Big Data @tomvdbulck
  • 3.
  • 4.
  • 5.
  • 6.
    What: Event ● Datait owns ● Data it needs ● References data
  • 7.
    What: Streaming ● Reactson events ● Continuously
  • 8.
    Why ● Much shorterfeedback loop ● More resource efficient ● Stream processing feels more natural ● Decentralize and decouple infrastructure
  • 9.
  • 10.
    The Data ● Everyminute XML is generated ○ So it is not the raw data ● Be aware: ○ Dutch words
  • 11.
    The Data ● XMLwith fixed sensor data ○ <meetpunt unieke_id="3640"> <beschrijvende_id>H291L10</beschrijvende_id> <volledige_naam>Parking Kruibeke</volledige_naam> <Ident_8>A0140002</Ident_8> <lve_nr>437</lve_nr> <Kmp_Rsys>94,695</Kmp_Rsys> <Rijstrook>R10</Rijstrook> <X_coord_EPSG_31370>144477,0917</X_coord_EPSG_31370> <Y_coord_EPSG_31370>208290,6237</Y_coord_EPSG_31370> <lengtegraad_EPSG_4326>4,289767347</lengtegraad_EPSG_4326> <breedtegraad_EPSG_4326>51,18458196</breedtegraad_EPSG_4326> </meetpunt>
  • 12.
    The Data ● XMLwith dynamic traffic data ○ <meetpunt beschrijvende_id="H222L10" unieke_id="29"> <lve_nr>55</lve_nr> <tijd_waarneming>2018-11-03T14:43:00+01:00</tijd_waarneming> <tijd_laatst_gewijzigd>2018-11-03T14:44:24+01:00</tijd_laatst_gewijzigd> <actueel_publicatie>1</actueel_publicatie> <beschikbaar>1</beschikbaar>
  • 13.
    The Data ● XMLwith dynamic traffic data ○ <meetdata klasse_id="4"> <verkeersintensiteit>2</verkeersintensiteit> <voertuigsnelheid_rekenkundig>60</voertuigsnelheid_rekenkundig> <voertuigsnelheid_harmonisch>59</voertuigsnelheid_harmonisch> </meetdata>
  • 14.
    The Data ● XMLwith dynamic traffic data ○ /* Note: the vehicle class MOTO(1), does not provide reliable data. */ MOTO(1), CAR(2), CAMIONET(3), // a VAN RIGGID_LORRIES(4), TRUCK_OR_BUS(5), UNKNOWN(0);
  • 15.
    The Data ● XMLwith dynamic traffic data ○ <meetdata klasse_id="3"> <verkeersintensiteit>0</verkeersintensiteit> <voertuigsnelheid_rekenkundig>0</voertuigsnelheid_rekenkundig> <voertuigsnelheid_harmonisch>252</voertuigsnelheid_harmonisch> </meetdata>
  • 16.
    The Data ● Donot worry ● We translated it to simplified POJO ● TrafficEvent.java
  • 17.
    The Data: SomeLessons ● Think about the language ● Think about the values you are going to output ○ 252 when no readings ○ 254 when an error occurred
  • 18.
  • 19.
  • 20.
    Lab 1: Sendevents to Kafka ● Dependencies ○ spring-cloud-starter-stream-kafka ○ spring-cloud-stream-reactive ● Added @Scheduling ● Added @EnableBinding ● Added @StreamEmitter (spring-cloud-stream-reactive) ● Added @SendTo ● Properties: ○ spring.cloud.stream.bindings.output.destination=traffic-data
  • 21.
    Lab 2: Intakeof data from Kafka ● @EnableBinding ● @StreamListener(Source.INPUT) ● Properties: ○ spring.cloud.stream.bindings.input.destination=traffic-data
  • 22.
  • 23.
  • 24.
  • 25.
    Native streaming operations:Stateless ● selectKey ● filter ● map/mapValues ● flatMap/flatMapValues ● peek ● forEach ● groupByKey ● toStream
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
    Lab 3: Stateless ●Dependencies ○ spring-cloud-stream-binder-kafka-streams ● Added custom interface: KStreamSink ● Methods used ○ .filter ○ .print ● Update configuration: ○ spring.cloud.stream.default-binder=kafka ○ spring.cloud.stream.bindings.native-input.binder=kstream
  • 32.
    Native streaming operations:stateful ● groupByKey (still stateless) ● count ● aggregations ● joining ● windowing
  • 33.
    Native streaming operations:groupByKey ● Groups records in KGroupedStream ● Required before aggregation operations ● Writes data to new topic (might repartition)
  • 34.
  • 35.
    Native streaming operations:aggregations ● Transforms groupedKStream to Ktable ● Need Initializer: aggValue = 0 ● Operation: “adder”: aggValue + oldValue
  • 36.
  • 37.
    Lab 3: Stateful ●GroupByKey ○ Use of SerDe (StringSerde and JsonSerde) ● Methods used ○ .count ○ .toStream: Convert KTable to KStream
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
    Session windows ● Limitedby an inactivity gap ● Be aware: the data you need to process might grow
  • 43.
    Lab 4: Windows ●Methods used ○ .windowedBy ○ .aggregate ■ Use of aggregator class ■ Materialized with ○ .mapValues: convert records
  • 44.
  • 45.
  • 46.
    Session windows: TrafficCongestion ● Merge results of all lanes ● If average speed < 50km => slow traffic ● To: slow-traffic-topic ● @Input slow-traffic-topic => session window with gap of 5 minutes ● Aggregate results: vehicle count ● To: vehicles-involved-in-traffic-jam ● Because the session window also has a start and end time ● => length of the traffic jam
  • 47.
    Thank you forattending!

Editor's Notes

  • #4 Shorter feedback loop: fraud detection, much nicer feedback to the customers, …
  • #5 The old days: Query in order to retrieve data you need The glorious time of the batch jobs
  • #6 Reacts on Events Events should be as complete as possible Continuous stream
  • #7 Data it owns: this is data tied owned by the publisher in the event Data it needs: this is data which can originate from other services but which is necessary to handle the event Referenced Data: data which might be relevant for the event. For example when booking a holiday, the reference temperatures of the location to where you want to travel to. Example contract update: owns: new price / needs: contract data, old price, discounts, …. / references: customer data to contact customer
  • #8 Shorter feedback loop: fraud detection, much nicer feedback to the customers, …
  • #9 Much shorter feedback for your business users Because you are processing smaller sets of data at the same time resources can be used more efficiently Stream processing tends to feel more natural, as most data also enters your system as a stream There is no longer a need for large and expensive databases, each stream processing application maintains its own data and state And each application also tends to decide itself what it will consume
  • #16 No traffic data for that lane and vehicle type … so we say that the vehicle speed is 252 …
  • #29 Every record processed can result in 0, 1 or more new records
  • #45 Interactive whiteboard session to show what you could do with a session window on the current dataset. => merge results into single data point for entire highway section (all lanes and all vehicles) => if average speed < 50 km => traffic jam => send this out to another topic => apply session window with a gap of 5 minutes => aggregate results: vehicle count => resulting output should give you the amount of vehicles involved within a traffic jam => Because you also know the length of every given session you should also be able to know how long it lasted.
  • #46 Interactive whiteboard session to show what you could do with a session window on the current dataset. => merge results into single data point for entire highway section (all lanes and all vehicles) => if average speed < 50 km => traffic jam => send this out to another topic => apply session window with a gap of 5 minutes => aggregate results: vehicle count => resulting output should give you the amount of vehicles involved within a traffic jam => Because you also know the length of every given session you should also be able to know how long it lasted.
  • #47 Interactive whiteboard session to show what you could do with a session window on the current dataset. => merge results into single data point for entire highway section (all lanes and all vehicles) => if average speed < 50 km => traffic jam => send this out to another topic => apply session window with a gap of 5 minutes => aggregate results: vehicle count => resulting output should give you the amount of vehicles involved within a traffic jam => Because you also know the length of every given session you should also be able to know how long it lasted.