BASEL BERN BRUGG DÜSSELDORF FRANKFURT A.M. FREIBURG I.BR. GENF HAMBURG KOPENHAGEN LAUSANNE MÜNCHEN STUTTGART WIEN ZÜRICH Apache Kafka Event Sourcing, Monitoring, Librdkafka, Scaling & Partitioning Guido Schmutz 9.8.2018 @gschmutz guidoschmutz.wordpress.com
Guido Schmutz Working at Trivadis for more than 21 years Oracle ACE Director for Fusion Middleware and SOA Consultant, Trainer Software Architect for Java, Oracle, SOA and Big Data / Fast Data Head of Trivadis Architecture Board Technology Manager @ Trivadis More than 30 years of software development experience Contact: guido.schmutz@trivadis.com Blog: http://guidoschmutz.wordpress.com Slideshare: http://www.slideshare.net/gschmutz Twitter: gschmutz
Agenda 1. What is Apache Kafka? 2. Kafka Clients 3. Kafka Producer & Kafka Consumer 4. Kafka Connect 5. KSQL & Kafka Streams 6. Message Deduplication 7. Kafka in modern Software Architecture 8. CQRS and Event Sourcing
What is Apache Kafka?
Apache Kafka – A Streaming Platform High-Level Architecture Distributed Log at the Core Scale-Out Architecture Logs do not (necessarily) forget
Apache Kafka History 2012 2013 2014 2015 2016 2017 Cluster mirroring data compression Intra-cluster replication 0.7 0.8 0.9 Data Processing (Streams API) 0.10 Data Integration (Connect API) 0.11 2018 Exactly Once Semantics Performance Improvements KSQL Developer Preview 1.0 JBOD Support Support Java 9 1.1 Header for Connect Replica movement between log dirs
Apache Kafka - Architecture Kafka Broker Topic 1 Consumer Topic 1 Topic 2 1 2 3 4 5 6 Topic 2 Consumer1 2 3 4 5 6 Producer
Apache Kafka - Architecture Kafka Broker Topic 1 Consumer 1 2 3 4 5 6 Topic 2 Consumer Partition 0 1 2 3 4 5 6 Partition 0 1 2 3 4 5 6 Partition 1 Producer Topic 1 Topic 2
Apache Kafka Kafka Broker 1 Topic 1 Consumer Producer Topic 1 P 0 1 2 3 4 5 P 2 1 2 3 4 5 Kafka Broker 2 P 2 1 2 3 4 5 P 1 1 2 3 4 5 Kafka Broker 3 P 0 1 2 3 4 5 P 1 1 2 3 4 5 Topic 1 Topic 1
Kafka Consumer - Consumer Groups • common to have multiple applications that read data from same topic • each application should get all of the messages • unique consumer group assigned to each application • number of consumers (threads) per group can be different • Kafka scales to large number of consumers without impacting performance Kafka Movement Topic Partition 0 Consumer Group 1 Consumer 1 Partition 1 Partition 2 Partition 3 Consumer 2 Consumer 3 Consumer 4 Consumer Group 2 Consumer 1 Consumer 2
Kafka Consumer - Consumer Groups Kafka Movement Topic Partition 0 Consumer Group 1 Consumer 1 Partition 1 Partition 2 Partition 3 Kafka Movement Topic Partition 0 Consumer Group 1 Partition 1 Partition 2 Partition 3 Kafka Movement Topic Partition 0 Consumer Group 1 Partition 1 Partition 2 Partition 3 Kafka Movement Topic Partition 0 Consumer Group 1 Partition 1 Partition 2 Partition 3 Consumer 1 Consumer 2 Consumer 3 Consumer 4 Consumer 1 Consumer 2 Consumer 3 Consumer 4 Consumer 5 Consumer 1 Consumer 2 2 Consumers / each get messages from 2 partitions 1 Consumer / get messages from all partitions 5 Consumers / one gets no messages 4 Consumers / each get messages from 1 partition
Durable and Highly Available Messaging Producer 1 Broker 1 Broker 2 Broker 3 Producer 1 Broker 1 Broker 2 Broker 3 Consumer 1 Consumer 1 Consumer 2Consumer 2 Consumer Group 1 Consumer Group 1
Durable and Highly Available Messaging (II) Producer 1 Broker 1 Broker 2 Broker 3 Producer 1 Broker 1 Broker 2 Broker 3 Consumer 1 Consumer 1 Consumer 2 Consumer 2 Consumer Group 1 Consumer Group 1
Hold Data for Long-Term – Data Retention Producer 1 Broker 1 Broker 2 Broker 3 1. Never 2. Time based (TTL) log.retention.{ms | minutes | hours} 3. Size based log.retention.bytes 4. Log compaction based (entries with same key are removed): kafka-topics.sh --zookeeper zk:2181 --create --topic customers --replication-factor 1 --partitions 1 --config cleanup.policy=compact
Keep Topics in Compacted Form 0 1 2 3 4 5 6 7 8 9 10 11 K1 K2 K1 K1 K3 K2 K4 K5 K5 K2 K6 K2 V1 V2 V3 V4 V5 V6 V7 V8 V9 V10 V11 Offset Key Value 3 4 6 8 9 10 K1 K3 K4 K5 K2 K6 V4 V5 V7 V9 V10 V11 Offset Key Value Compaction V1 V2 V3 V4 V5 V6 V7 V8 V9 V1 0 V1 1 K1 K3 K4 K5 K2 K6
How to provision a Kafka environment ? On Premises • Bare Metal Installation • Docker • Mesos / Kubernetes • Hadoop Distributions Cloud • Oracle Event Hub Cloud Service • Azure HDInsight Kafka • Confluent Cloud • …
Important Broker Configuration Settings Name Description broker.id Every broker must have an integer identifier which is unique within the cluster listeners Comma-separated list of URIs we will listen on and the listener names zookeeper.connect Location of the Zookeeper used for storing the broker metadata auto.create.topic.enable Enable automatic creation of topics on the server. default.replication.factor Specifies default replication factors for automatically created topics. num.partitions Specifies the default number of log partitions per topic, for automatically created topics. delete.topic.enable Allows users to delete a topic from Kafka using the admin tool, for Kafka versions 0.9 and later. see also: https://docs.confluent.io/current/installation/configuration/broker-configs.html
Important Broker Configuration Settings (II) Name Description log.dirs Comma-separated list of paths on the local filesystem, where Kafka will persist the log segments log.retentions.[ms|minutes |hours] The number of milliseconds/minutes/hours to keep a log file before deleting it log.retention.bytes The maximum size of the log before deleting it log.segment.bytes The maximum size of a single log file broker.rack Rack of the broker. This will be used in rack aware replication assignment for fault tolerance. message.max.bytes The largest record batch size allowed by Kafka, defaults to 1MB see also: https://docs.confluent.io/current/installation/configuration/broker-configs.html
How to create a Topic • Command line interface Alternatives: • Using AdminUtils.createTopic method • Auto-create via auto.create.topics.enable = true $ kafka-topics.sh --zookeeper zk1:2181 --create --topic my.topic –-partitions 3 –-replication-factor 2 --config x=y
Important Topic Configuration Settings Name Description cleanup.policy This string designates the retention policy to use on old log segments. Either “compact” or “delete”. min.cleanable.dirty.ration This configuration controls how frequently the log compactor will attempt to clean the log min.compaction.lag.ms The minimum time a message will remain uncompacted in the log segment.ms controls the period of time after which Kafka will force the log to roll even if the segment file isn’t full to ensure that retention can delete or compact old data message.max.bytes The largest record batch size allowed by Kafka, defaults to 1MB see also: https://docs.confluent.io/current/installation/configuration/topic-configs.html
Demo (I) Truck-2 truck position Truck-1 Truck-3 console consumer Testdata-Generator by Hortonworks 1522846456703,101,31,1927624662,Normal,37. 31,-94.31,-4802309397906690837
Demo (I) – Create Kafka Topic $ kafka-topics --zookeeper zookeeper:2181 --create --topic truck_position --partitions 8 --replication-factor 1 $ kafka-topics --zookeeper zookeeper:2181 –list __consumer_offsets _confluent-metrics _schemas docker-connect-configs docker-connect-offsets docker-connect-status truck_position
Demo (I) – Run Producer and Kafka-Console-Consumer
Demo (I) – Java Producer to "truck_position" Constructing a Kafka Producer private Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers","broker-1:9092); kafkaProps.put("key.serializer", "...StringSerializer"); kafkaProps.put("value.serializer", "...StringSerializer"); producer = new KafkaProducer<String, String>(kafkaProps); ProducerRecord<String, String> record = new ProducerRecord<>("truck_position", driverId, eventData); try { metadata = producer.send(record).get(); } catch (Exception e) {}
Demo (II) – devices send to MQTT instead of Kafka Truck-2 truck/nn/ position Truck-1 Truck-3 1522846456703,101,31,1927624662,Normal,37. 31,-94.31,-4802309397906690837
Demo (II) – devices send to MQTT instead of Kafka
Demo (II) - devices send to MQTT instead of Kafka – how to get the data into Kafka? Truck-2 truck/nn/ position Truck-1 Truck-3 truck position raw ? 1522846456703,101,31,1927624662,Normal,37. 31,-94.31,-4802309397906690837
Apache Kafka – wait there is more! Source Connector trucking_ driver Kafka Broker Sink Connector Stream Processing
Kafka Clients
Kafka Client Architecture Kafka Broker and Protocol Librdkafka (C & C++) Confluent REST Proxy Kafka Java API Confluent MQTTT Proxy C#/.NET C++ Node.js PHP PHP … Application / Client Code Client Side Server Side
Librdkafka Librdkafka is a C library implementation of the Kafka protocol with both producer and consumer support (https://github.com/edenhill/librdkafka) • High-level producer • High-level balanced KafkaConsumer (requires broker >= 0.9) • Simple consumer (legacy) • Compression: snappy, gzip, lz4 • SSL Support • SASL • Broker version support >= 0.8 (broker version compatibility) • Statistic metrics
PHP Kafka Client 2 different versions on top of librdkafka exists, only php-rdkafka is up to date php-rdkafka (https://github.com/arnaud-lb/php-rdkafka) • Thin wrapper of librdkafka • Supports high-level • Supports low-level consumer API • Supports producer API • Supports metadata API
Kafka Producer API
Apache Kafka – Producer Producer Kafka Broker Consumer Kafka Kafka msgmsg
Kafka Producer Producers send records to topics Producer picks which partition to send record to per topic • Can be done in a round-robin • Can be done sticky (by key) • Can be done by priority (by manually selecting partition) • Typically based on key of record • Kafka default partitioner for Java uses hash of keys to choose partitions, or a round-robin strategy if no key Important: Producer picks partition P 0 1 2 3 4 5
Strong Ordering Guarantees most business systems need strong ordering guarantees messages that require relative ordering need to be sent to the same partition supply same key for all messages that require a relative order To maintain global ordering use a single partition topic Producer 1 Consumer 1 Broker 1 Broker 2 Broker 3 Consumer 2 Consumer 3 Key-1 Key-2 Key-3 Key-4 Key-5 Key-6 Key-3 Key-1 Consumer GroupP1 P2 P3 P4 P5 P6
Kafka Producer – High Level Overview Producer Client Kafka Broker Movement Topic Partition 0 Partitione r Movement Topic Serializer Producer Record message 1 message 2 message 3 message 4 Batch Movement Topic Partition 1 message 1 message 2 message 3 Batch Partition 0 Partition 1 Retr y? Fail ? Topic Message [ Partition ] [ Key ] Value yes yes if can’t retry: throw exception successful: return metadata Compression(optional)
Kafka Producer - Java API Constructing a Kafka Producer • bootstrap.servers - List of host:port pairs of brokers that the producer will use to establish initial connection to the Kafka cluster • key.serializer – Name of a class that will be used to serialize the keys of the records we will produce to Kafka • value.serializer - Name of a class that will be used to serialize the values of the records we will produce to Kafka private Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092"); kafkaProps.put("key.serializer", "...StringSerializer"); kafkaProps.put("value.serializer", "...StringSerializer"); producer = new KafkaProducer<String, String>(kafkaProps);
Kafka Producer - Java API Sending Message Fire-and-Forget (no control if message has been sent successful) ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "Key", "Value"); try { producer.send(record); } catch (Exception e) {}
Kafka Producer - Java API Sending Message Synchronously (wait until reply from Kafka arrives back) ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "Key", "Value"); try { producer.send(record).get(); } catch (Exception e) {}
Kafka Producer - Java API Sending Message Asynchronously private class ProducerCallback implements Callback { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { e.printStackTrace(); } } } ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "key", "value"); producer.send(record, new ProducerCallback());
Kafka Producer - Durability Guarantees Producer can configure acknowledgements (acks property) Value Description Throughpu t Latency Durability 0 • Producer doesn’t wait for leader high low low (no guarantee) 1 (default) • Producer waits for leader • Leader sends ack when message written to log • No wait for followers medium medium medium (leader) all (-1) • Producer waits for leader • Leader sends ack when all In-Sync Replica have acknowledged low high high (ISR)
Important Producer Configuration Settings Name Description key.serializer (Java) Serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface value.serializer (Java) Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. acks The number of acknowledgments the producer requires the leader to have received before considering a request complete. Controls durability of records that are sent. bootstrap.servers List of host:port pairs of brokers that the producer will use to establish initial connection to the Kafka cluster compression.type The compression type for all data generated by the producer (none, gzip, snappy, lz4) Apache Kafka: https://kafka.apache.org/documentation/#configuration Librdkafka: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
Important Producer Configuration Settings (II) Name Description retries Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. retry.backoff.ms amount of time to wait before attempting to retry a failed request to a given topic partition request.timeout.ms maximum amount of time the client will wait for the response of a request. batch.size (Java) controls the default batch size in bytes (batch.num.messages in librdkafka) linger.ms controls if data should be sent before batch is full max.in.flight.requests .per.connection maximum number of unacknowledged requests the client will send on a single connection before blocking Apache Kafka: https://kafka.apache.org/documentation/#configuration Librdkafka: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
Kafka Consumer API
Apache Kafka – Consumer Producer Kafka Broker Consumer Kafka Kafka msgmsg
Kafka Consumer - Partition offsets Offset: messages in the partitions are each assigned a unique (per partition) and sequential id called the offset • Consumers track their pointers via (offset, partition, topic) tuples • Consumers remember offset where they left of • Consumers groups each have their own offset per partition Consumer Group A Consumer Group B
How does Consumer work? Kafka Broker Msg(s) Commit Consumer Input Topic __consumer_offsets Topic Polling Loop Loop Process (msgs) Commit(offsets)
Consumer Using auto-commit mode • set auto.commit to true • set auto.commit.interval.ms to control the frequency of commits Commit current offsets • set auto.commit to false • Manually invoke commit() to commit the offsets when processing is done consumer.subscribe([topics]) while(true) msgs = consumer.poll(waitForMs) for (msg IN msgs) process(msg) consumer.close() consumer.subscribe([topics]) while(true) msgs = consumer.poll(waitForMs) for (msg IN msgs) process(msg) consumer.commit() consumer.close()
Kafka Consumer – Java API Constructing a Kafka Consumer private Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092"); kafkaProps.put("group.id","MovementsConsumerGroup"); kafkaProps.put("key.deserializer", "...StringDeSerializer"); kafkaProps.put("value.deserializer", "...StringDeSerializer"); consumer = new KafkaConsumer<String, String>(kafkaProps);
Kafka Consumer Subscription - Java API Next step after creating the consumer is to subscribe on one or more topics Subscribe takes a list of topics as a parameter Can also subscribe on regular expression consumer.subscribe( Collections.singletonList(”truck_movement")); consumer.subscribe(“truck.*”);
Kafka Consumer Poll Loop – Java API Kafka Consumer Poll Loop (with synchronous offset commit) consumer.subscribe(Collections.singletonList("topic")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { // process message, available information: // record.topic(), record.partition(), record.offset(), // record.key(), record.value()); } consumer.commitSync(); } } finally { consumer.close(); }
Run your own offset commit store Processed messages stored in a target supporting transactions If using Kafka default offset commitment, potential for duplicates, if commit fails Store processed messages and offset in atomic action into same database consumer.subscribe([topics]) while(true) msgs = consumer.poll(waitForMs) for (msg IN msgs) process(msg) storeInDB(msg) consumer.commit(currentOffsets) consumer.close() consumer.subscribe([topics]) while(true) msgs = consumer.poll(waitForMs) for (msg IN msgs) process(msg) storeInDB(msg) storeInDBV(currentOffsets) consumer.close()
Run your own offset commit store Kafka Broker Msg(s) Commit Consumer Input Topic __consumer_offsets Topic Polling Loop DB Processed Msgs Offset Commits seekTo(offset) Loop storeToDB (msgs) storeToDB(offsets) Msg(s) process (msgs) Transaction
Important Consumer Configuration Settings Name Description key.serializer (Java) Serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface value.serializer (Java) Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. bootstrap.servers List of host:port pairs of brokers that the producer will use to establish initial connection to the Kafka cluster fetch.min.bytes Minimum number of bytes the broker responds with. If fetch.wait.max.ms expires before the accumulated data will be sent fetch.wait.max.ms The maximum amount of time the server will block before answering the fetch request Apache Kafka: https://kafka.apache.org/documentation/#configuration Librdkafka: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
Important Consumer Configuration Settings (II) Name Description max.partition.fetch.byt es controls the maximum number of bytes the server will return per partition session.timeout.ms amount of time a consumer can be out of contact with the brokers while still considered alive auto.offset.reset behavior of the consumer when it starts reading a partition for which it doesn’t have a committed offset or if the committed offset it has is invalid. “Earliest” or “latest” (default) enable.auto.commit controls whether the consumer will commit offsets automatically, and defaults to true auto.commit.interval.ms controls how frequently offsets will be committed Apache Kafka: https://kafka.apache.org/documentation/#configuration Librdkafka: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
Kafka Connect
Kafka Connect - Overview Source Connecto r Sink Connecto r
Kafka Connect – Single Message Transforms (SMT) Simple Transformations for a single message Defined as part of Kafka Connect • some useful transforms provided out-of-the-box • Easily implement your own Optionally deploy 1+ transforms with each connector • Modify messages produced by source connector • Modify messages sent to sink connectors Makes it much easier to mix and match connectors Some of currently available transforms: • InsertField • ReplaceField • MaskField • ValueToKey • ExtractField • TimestampRouter • RegexRouter • SetSchemaMetaData • Flatten • TimestampConverter
Kafka Connect – Many Connectors 60+ since first release (0.9+) 20+ from Confluent and Partners Source: http://www.confluent.io/product/connectors Confluent supported Connectors Certified Connectors Community Connectors
Demo (III) Truck-2 truck/nn/ position Truck-1 Truck-3 mqtt to kafka truck_ position console consumer 1522846456703,101,31,1927624662,Normal,37. 31,-94.31,-4802309397906690837
Demo (III) – Create MQTT Connect through REST API #!/bin/bash curl -X "POST" "http://192.168.69.138:8083/connectors" -H "Content-Type: application/json" -d $'{ "name": "mqtt-source", "config": { "connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector", "connect.mqtt.connection.timeout": "1000", "tasks.max": "1", "connect.mqtt.kcql": "INSERT INTO truck_position SELECT * FROM truck/+/position", "name": "MqttSourceConnector", "connect.mqtt.service.quality": "0", "connect.mqtt.client.id": "tm-mqtt-connect-01", "connect.mqtt.converter.throw.on.error": "true", "connect.mqtt.hosts": "tcp://mosquitto:1883" } }'
Demo (III) – Call REST API and Kafka Console Consumer
Demo (III) Truck-2 truck/nn/ position Truck-1 Truck-3 mqtt to kafka truck_ position console consumer what about some analytics ? 1522846456703,101,31,1927624662,Normal,37. 31,-94.31,-4802309397906690837
KSQL
KSQL: a Streaming SQL Engine for Apache Kafka • Enables stream processing with zero coding required • The simples way to process streams of data in real-time • Powered by Kafka and Kafka Streams: scalable, distributed, mature • All you need is Kafka – no complex deployments • available as Developer preview! • STREAM and TABLE as first-class citizens • STREAM = data in motion • TABLE = collected state of a stream • join STREAM and TABLE
Demo (IV) Truck-2 truck/nn/ position Truck-1 Truck-3 mqtt to kafka truck_ position_s detect_dang erous_drivin g dangerous_ driving console consumer 1522846456703,101,31,1927624662,Normal,37. 31,-94.31,-4802309397906690837
Demo (IV) - Start Kafka KSQL $ docker-compose exec ksql-cli ksql-cli local --bootstrap-server broker-1:9092 ====================================== = _ __ _____ ____ _ = = | |/ // ____|/ __ | | = = | ' /| (___ | | | | | = = | < ___ | | | | | = = | . ____) | |__| | |____ = = |_|______/ __________| = = = = Streaming SQL Engine for Kafka = Copyright 2017 Confluent Inc. CLI v0.1, Server v0.1 located at http://localhost:9098 Having trouble? Type 'help' (case-insensitive) for a rundown of how things work! ksql>
Demo (IV) - Create Stream ksql> CREATE STREAM truck_position_s (ts VARCHAR, truckId VARCHAR, driverId BIGINT, routeId BIGINT, eventType VARCHAR, latitude DOUBLE, longitude DOUBLE, correlationId VARCHAR) WITH (kafka_topic='truck_position', value_format='DELIMITED'); Message ---------------- Stream created
Demo (IV) - Create Stream ksql> SELECT * FROM truck_position_s; 1522847870317 | "truck/13/position0 | �1522847870310 | 44 | 13 | 1390372503 | Normal | 41.71 | -91.32 | -2458274393837068406 1522847870376 | "truck/14/position0 | �1522847870370 | 35 | 14 | 1961634315 | Normal | 37.66 | -94.3 | -2458274393837068406 1522847870418 | "truck/21/position0 | �1522847870410 | 58 | 21 | 137128276 | Normal | 36.17 | -95.99 | -2458274393837068406 1522847870397 | "truck/29/position0 | �1522847870390 | 18 | 29 | 1090292248 | Normal | 41.67 | -91.24 | -2458274393837068406 ksql> SELECT * FROM truck_position_s WHERE eventType != 'Normal'; 1522847914246 | "truck/11/position0 | �1522847914240 | 54 | 11 | 1198242881 | Lane Departure | 40.86 | -89.91 | -2458274393837068406 1522847915125 | "truck/10/position0 | �1522847915120 | 93 | 10 | 1384345811 | Overspeed | 40.38 | -89.17 | -2458274393837068406 1522847919216 | "truck/12/position0 | �1522847919210 | 75 | 12 | 24929475 | Overspeed | 42.23 | -91.78 | -2458274393837068406
Demo (IV) - Create Stream ksql> describe truck_position_s; Field | Type --------------------------------- ROWTIME | BIGINT ROWKEY | VARCHAR(STRING) TS | VARCHAR(STRING) TRUCKID | VARCHAR(STRING) DRIVERID | BIGINT ROUTEID | BIGINT EVENTTYPE | VARCHAR(STRING) LATITUDE | DOUBLE LONGITUDE | DOUBLE CORRELATIONID | VARCHAR(STRING)
Demo (IV) - Create Stream ksql> CREATE STREAM dangerous_driving_s WITH (kafka_topic= dangerous_driving_s', value_format='JSON') AS SELECT * FROM truck_position_s WHERE eventtype != 'Normal'; Message ---------------------------- Stream created and running ksql> select * from dangerous_driving_s; 1522848286143 | "truck/15/position0 | �1522848286125 | 98 | 15 | 987179512 | Overspeed | 34.78 | -92.31 | -2458274393837068406 1522848295729 | "truck/11/position0 | �1522848295720 | 54 | 11 | 1198242881 | Unsafe following distance | 38.43 | -90.35 | -2458274393837068406 1522848313018 | "truck/11/position0 | �1522848313000 | 54 | 11 | 1198242881 | Overspeed | 41.87 | -87.67 | -2458274393837068406
Demo (V) Truck-2 truck/nn/ position Truck-1 Truck-3 mqtt- source truck_ position detect_dang erous_drivin g dangerous_ driving Truck Driver jdbc- source trucking_ driver join_dangero us_driving_dr iver dangerous_d riving_driver 27, Walter, Ward, Y, 24-JUL-85, 2017-10-02 15:19:00 console consumer {"id":27,"firstName":"Walter" ,"lastName":"Ward","availab le":"Y","birthdate":"24-JUL- 85","last_update":15069230 52012} 1522846456703,101,31,1927624662,Normal,37. 31,-94.31,-4802309397906690837
Demo (V) – Create JDBC Connect through REST API #!/bin/bash curl -X "POST" "http://192.168.69.138:8083/connectors" -H "Content-Type: application/json" -d $'{ "name": "jdbc-driver-source", "config": { "connector.class": "JdbcSourceConnector", "connection.url":"jdbc:postgresql://db/sample?user=sample&password=sample", "mode": "timestamp", "timestamp.column.name":"last_update", "table.whitelist":"driver", "validate.non.null":"false", "topic.prefix":"trucking_", "key.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "name": "jdbc-driver-source", "transforms":"createKey,extractInt", "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey", "transforms.createKey.fields":"id", "transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractInt.field":"id" } }'
Demo (V) – Create JDBC Connect through REST API
Demo (V) - Create Table with Driver State ksql> CREATE TABLE driver_t (id BIGINT, first_name VARCHAR, last_name VARCHAR, available VARCHAR) WITH (kafka_topic='trucking_driver', value_format='JSON', key='id'); Message ---------------- Table created
Demo (V) - Create Table with Driver State ksql> CREATE STREAM dangerous_driving_and_driver_s WITH (kafka_topic='dangerous_driving_and_driver_s', value_format='JSON') AS SELECT driverId, first_name, last_name, truckId, routeId, eventtype FROM truck_position_s LEFT JOIN driver_t ON dangerous_driving_and_driver_s.driverId = driver_t.id; Message ---------------------------- Stream created and running ksql> select * from dangerous_driving_and_driver_s; 1511173352906 | 21 | 21 | Lila | Page | 58 | 1594289134 | Unsafe tail distance 1511173353669 | 12 | 12 | Laurence | Lindsey | 93 | 1384345811 | Lane Departure 1511173435385 | 11 | 11 | Micky | Isaacson | 22 | 1198242881 | Unsafe tail distance
Kafka Streams
Kafka Streams - Overview • Designed as a simple and lightweight library in Apache Kafka • no external dependencies on systems other than Apache Kafka • Part of open source Apache Kafka, introduced in 0.10+ • Leverages Kafka as its internal messaging layer • Supports fault-tolerant local state • Event-at-a-time processing (not microbatch) with millisecond latency • Windowing with out-of-order data using a Google DataFlow-like model Source: Confluent
Kafka Stream DSL and Processor Topology KStream<Integer, String> stream1 = builder.stream("in-1"); KStream<Integer, String> stream2= builder.stream("in-2"); KStream<Integer, String> joined = stream1.leftJoin(stream2, …); KTable<> aggregated = joined.groupBy(…).count("store"); aggregated.to("out-1"); 1 2 l j a t State
Kafka Stream DSL and Processor Topology KStream<Integer, String> stream1 = builder.stream("in-1"); KStream<Integer, String> stream2= builder.stream("in-2"); KStream<Integer, String> joined = stream1.leftJoin(stream2, …); KTable<> aggregated = joined.groupBy(…).count("store"); aggregated.to("out-1"); 1 2 l j a t State
Kafka Streams Cluster Processor Topology Kafka Cluster input-1 input-2 store (changelog) output 1 2 l j a t State
Kafka Cluster Processor Topology input-1 Partition 0 Partition 1 Partition 2 Partition 3 input-2 Partition 0 Partition 1 Partition 2 Partition 3 Kafka Streams 1 Kafka Streams 2
Kafka Cluster Processor Topology input-1 Partition 0 Partition 1 Partition 2 Partition 3 input-2 Partition 0 Partition 1 Partition 2 Partition 3 Kafka Streams 1 Kafka Streams 2 Kafka Streams 3 Kafka Streams 4
Kafka Streams: Key Features • Native, 100%-compatible Kafka integration • Secure stream processing using Kafka's security features • Elastic and highly scalable • Fault-tolerant • Stateful and stateless computations • Interactive queries • Time model • Windowing • Supports late-arriving and out-of-order data • Millisecond processing latency, no micro-batching • At-least-once and exactly-once processing guarantees
Demo (IV) Truck-2 truck/nn/ position Truck-1 Truck-3 mqtt to kafka truck_ position_s detect_dang erous_drivin g dangerous_ driving console consumer 1522846456703,101,31,1927624662,Normal,37. 31,-94.31,-4802309397906690837
Demo (IV) - Create Stream final KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> source = builder.stream(stringSerde, stringSerde, "truck_position"); KStream<String, TruckPosition> positions = source.map((key,value) -> new KeyValue<>(key, TruckPosition.create(key,value))); KStream<String, TruckPosition> filtered = positions.filter(TruckPosition::filterNonNORMAL); filtered.map((key,value) -> new KeyValue<>(key,value.toCSV())) .to("dangerous_driving");
Message De-duplication
Why/When to we get duplicate messages? Producer: everything works fine on broker side, but ACK is not received by Producer Consumer: message has been processed but offset commit failed. Next consume will re-consume from last committed offset and some duplicate processing will occur. Producer Kafka Broker A Ack A A A retry Kafka Broker A Commit A A B Consumer reconsume Process A Process A
What to do with duplicate messages • Avoid it -> not possible if using at-least-once • Do nothing -> implement all final message consumers in an idempotent manner • Use Kafka “Exactly Once Message” (EOS) Processing • Only available within Kafka • Currently only available for Java clients • Planed to make it available to librdkafka in Q3 of 2018 (see issue 1308) • Implement Deduplicator functionality (see next slide)
Deduplicator Pseudo-code for the deduplication functionality • Each message needs an unique id • A list of “seen messages” has to be stored in an efficient manner (writing, detecting existence, aging out) • Time or space window for removing message-ids after a while (in order to grow for ever) High-level architecture of an efficient Deduplicator • See also: • Delivering billions of messages exactly once • Event Deduplication Example for Kafka Streams def dedupe(stream): for message in stream: if has_seen(message.id): discard(message) else: publish_and_commit(message)
Kafka in modern Software Architecture
Shop Rich UI Shop Backend Application Traditional Approach Search Facade Customer DAO Order DAO Order Facade Shop UI Product DAO UI Logic DataBusiness GUI Customer Fat Client App Customer BOCustomer UI DataGUI Data Storage Shared Database sync request/response
Shop UI App Business Activity Service SOA Approach Contract-first Web Services Technical layers offer their own interfaces Reuse on each level Lower layer often wraps legacy code Search BAS Customer DAO Order DAO Order BAS Shop UI Product DAO UI Logic GUI Business Entity ServiceShop Web App Shop UI UI Logic GUI Data Storage Customer Database Customer BES Payment BES Product BES Order BES Custer BAS Order and Product DB SOAP SOAP SOAP SOAP SOAP SOAP SOAP
Shop UI App Business Activity Service Virtualized SOA Approach Search BAS Customer DAO Order DAO Order BAS Shop UI UI Logic GUI Business Entity Service Shop Web App Shop UI UI Logic GUI Data Storage Customer Database Customer BES Payment BES Product BES Order BES Custer BAS Order and Product DB Service Virtualization Layer Service Bus SOAP SOAP SOAP SOAP SOAP SOAP SOAP
Microservice Approach Tightly Scoped behind clear interfaces Responsible for managing their own data (not necessarily the infrastructure) Should be highly decoupled Independently deployable, self- contained and autonomous SOA done right ?! Tightly Scoped Responsible for managing their own data Highly decoupled Independently deployable, self- contained and autonomous { } Customer API Customer Customer Logic Order Microservice { } Order API Order Order Logic Product Microservice { } Product API Product Product Logic Stock Microservice { } Stock API Stock Stock Logic Shop Web App Shop UI UI Logic GUI REST REST REST REST
Microservice Approach with API Gateway Customer Microservice { } Customer API Customer Customer Logic Order Microservice { } Order API Order Order Logic Product Microservice { } Product API Product Product Logic Stock Microservice { } Stock API Stock Stock Logic Shop Web App Shop UI UI Logic GUI REST REST REST REST API Gateway
Synchronous World of Request-Response leads to tight, point-to-point couplings problem in lower end of chain have a ripple effect on other service • crash of service • overloaded service / slow response time • change of interface Service 2Service 1 { } API Logic { } API Logic StateState Service 3 { } API Logic State Service 4 { } API Logic State Service 5 { } API Logic State Service 7 { } API Logic State Service 6 { } API Logic State
Three mechanisms through which services can interact Request-Driven (Imperative) Event Driven (Functional) Service Logic State Consume Event “Event Ordered” OrderEvent Command ”Order IPad” boolean order(IPad) Publish Event ”Event Validated” OrderValidatedEvent Query ”Retrieve my Orders List<Orders> getAllOrders(for) Event Broker Service Logic State
Event-Driven (Async) Microservice Approach Customer Microservice { } Customer API Customer Customer Logic Order Microservice { } Order API Order Order Logic Product Microservice { } Product API Product Product Logic Stock Microservice { } Stock API Stock Stock Logic Shop Web App Shop UI UI Logic GUI REST REST REST REST API Gateway Event Store sync request/response async request/response async, event pub/sub
Hadoop Clusterd Hadoop Cluster Big Data Kafka – the Event Hub and more …. ! Billing & Ordering CRM / Profile Marketing Campaigns SQL Search BI Tools Enterprise Data Warehouse Search / Explore Online & Mobile Apps File Import / SQL Import Event Hub Parallel Processing Storage Storage RawRefined Results Microservice State { } API Stream Processor State { } API Event Stream Event Stream Service Location Social Click stream Sensor Data Mobile Apps Weather Data Stream Processing Microservices
Hadoop Clusterd Hadoop Cluster Big Data Kafka – the Event Hub and more …. ! Billing & Ordering CRM / Profile Marketing Campaigns SQL Search BI Tools Enterprise Data Warehouse Search / Explore Online & Mobile Apps File Import / SQL Import Event Hub Parallel Processing Storage Storage RawRefined Results Microservice State { } API Stream Processor State { } API Event Stream Event Stream Service Location Social Click stream Sensor Data Mobile Apps Weather Data Stream Processing Microservices
CQRS and Event Sourcing
Command Query Responsibility Segregation (CQRS) Optimize different nonfunctional requirements for read and write behavior split between • commands that trigger changes in state • queries that provide read access to the state of resources support services with higher performance and capacity requirements for reading data than for writing data Data Storage Write Data Store Read Data Store (Materialized Views) Service Command Service Query Service App UI Projection Service UI Logic
Event Sourcing persists the state of a business entity as a sequence of state-changing events Whenever state of business entity changes, a new event is appended to the list of events Saving an event is a single operation and is inherently atomic The application reconstructs an entity’s current state by replaying the events Data Storage Event Store Service Event Service Publisher App UI UI Logic Replayer Other App
Event Sourcing & CQRS Event sourcing is commonly combined with the CQRS pattern materializing views from the stored events Optionally Commands can be stored in event store and transformed into events by the command handler Data Storage Event Store Service Command Service App UI UI Logic Query Service Read Data Store (Materialized Views) Projection Service Command Handler
Have only one „source of truth“ Avoid double write! • Would need distributed transactions Write Event first then consume it from same micro service • “eat your own dog food” Microservice { } API StateLogic REST Event Store Event Microservice { } API StateConsumer REST Event Store Event Publisher
Using Event Sourcing with Microservices “Event sourcing enables building a forward-compatible application architecture — the ability to add more applications in the future that need to process the same event but create a different materialized view.” Neha Narkhede, Confluent Blog Microservice State Command Handler { } API REST Event Store Projection Handler(s) Query Logic Event Handler(s) Event Subscribe
How many Event Stores do we need ? Microservice { } API StateLogic REST Event Store Event Microservice { } API StateLogic REST Event Store Event Microservice { } API StateLogic REST Event Store Event Microservice { } API StateLogic REST Microservice { } API StateLogic REST Event Store Event Microservice { } API StateLogic REST OR
Technology on its own won't help you. You need to know how to use it properly.

Apache Kafka - Event Sourcing, Monitoring, Librdkafka, Scaling & Partitioning

  • 1.
    BASEL BERN BRUGGDÜSSELDORF FRANKFURT A.M. FREIBURG I.BR. GENF HAMBURG KOPENHAGEN LAUSANNE MÜNCHEN STUTTGART WIEN ZÜRICH Apache Kafka Event Sourcing, Monitoring, Librdkafka, Scaling & Partitioning Guido Schmutz 9.8.2018 @gschmutz guidoschmutz.wordpress.com
  • 2.
    Guido Schmutz Working atTrivadis for more than 21 years Oracle ACE Director for Fusion Middleware and SOA Consultant, Trainer Software Architect for Java, Oracle, SOA and Big Data / Fast Data Head of Trivadis Architecture Board Technology Manager @ Trivadis More than 30 years of software development experience Contact: guido.schmutz@trivadis.com Blog: http://guidoschmutz.wordpress.com Slideshare: http://www.slideshare.net/gschmutz Twitter: gschmutz
  • 3.
    Agenda 1. What isApache Kafka? 2. Kafka Clients 3. Kafka Producer & Kafka Consumer 4. Kafka Connect 5. KSQL & Kafka Streams 6. Message Deduplication 7. Kafka in modern Software Architecture 8. CQRS and Event Sourcing
  • 4.
  • 5.
    Apache Kafka –A Streaming Platform High-Level Architecture Distributed Log at the Core Scale-Out Architecture Logs do not (necessarily) forget
  • 6.
    Apache Kafka History 20122013 2014 2015 2016 2017 Cluster mirroring data compression Intra-cluster replication 0.7 0.8 0.9 Data Processing (Streams API) 0.10 Data Integration (Connect API) 0.11 2018 Exactly Once Semantics Performance Improvements KSQL Developer Preview 1.0 JBOD Support Support Java 9 1.1 Header for Connect Replica movement between log dirs
  • 7.
    Apache Kafka -Architecture Kafka Broker Topic 1 Consumer Topic 1 Topic 2 1 2 3 4 5 6 Topic 2 Consumer1 2 3 4 5 6 Producer
  • 8.
    Apache Kafka -Architecture Kafka Broker Topic 1 Consumer 1 2 3 4 5 6 Topic 2 Consumer Partition 0 1 2 3 4 5 6 Partition 0 1 2 3 4 5 6 Partition 1 Producer Topic 1 Topic 2
  • 9.
    Apache Kafka Kafka Broker 1 Topic1 Consumer Producer Topic 1 P 0 1 2 3 4 5 P 2 1 2 3 4 5 Kafka Broker 2 P 2 1 2 3 4 5 P 1 1 2 3 4 5 Kafka Broker 3 P 0 1 2 3 4 5 P 1 1 2 3 4 5 Topic 1 Topic 1
  • 10.
    Kafka Consumer -Consumer Groups • common to have multiple applications that read data from same topic • each application should get all of the messages • unique consumer group assigned to each application • number of consumers (threads) per group can be different • Kafka scales to large number of consumers without impacting performance Kafka Movement Topic Partition 0 Consumer Group 1 Consumer 1 Partition 1 Partition 2 Partition 3 Consumer 2 Consumer 3 Consumer 4 Consumer Group 2 Consumer 1 Consumer 2
  • 11.
    Kafka Consumer -Consumer Groups Kafka Movement Topic Partition 0 Consumer Group 1 Consumer 1 Partition 1 Partition 2 Partition 3 Kafka Movement Topic Partition 0 Consumer Group 1 Partition 1 Partition 2 Partition 3 Kafka Movement Topic Partition 0 Consumer Group 1 Partition 1 Partition 2 Partition 3 Kafka Movement Topic Partition 0 Consumer Group 1 Partition 1 Partition 2 Partition 3 Consumer 1 Consumer 2 Consumer 3 Consumer 4 Consumer 1 Consumer 2 Consumer 3 Consumer 4 Consumer 5 Consumer 1 Consumer 2 2 Consumers / each get messages from 2 partitions 1 Consumer / get messages from all partitions 5 Consumers / one gets no messages 4 Consumers / each get messages from 1 partition
  • 12.
    Durable and HighlyAvailable Messaging Producer 1 Broker 1 Broker 2 Broker 3 Producer 1 Broker 1 Broker 2 Broker 3 Consumer 1 Consumer 1 Consumer 2Consumer 2 Consumer Group 1 Consumer Group 1
  • 13.
    Durable and HighlyAvailable Messaging (II) Producer 1 Broker 1 Broker 2 Broker 3 Producer 1 Broker 1 Broker 2 Broker 3 Consumer 1 Consumer 1 Consumer 2 Consumer 2 Consumer Group 1 Consumer Group 1
  • 14.
    Hold Data forLong-Term – Data Retention Producer 1 Broker 1 Broker 2 Broker 3 1. Never 2. Time based (TTL) log.retention.{ms | minutes | hours} 3. Size based log.retention.bytes 4. Log compaction based (entries with same key are removed): kafka-topics.sh --zookeeper zk:2181 --create --topic customers --replication-factor 1 --partitions 1 --config cleanup.policy=compact
  • 15.
    Keep Topics inCompacted Form 0 1 2 3 4 5 6 7 8 9 10 11 K1 K2 K1 K1 K3 K2 K4 K5 K5 K2 K6 K2 V1 V2 V3 V4 V5 V6 V7 V8 V9 V10 V11 Offset Key Value 3 4 6 8 9 10 K1 K3 K4 K5 K2 K6 V4 V5 V7 V9 V10 V11 Offset Key Value Compaction V1 V2 V3 V4 V5 V6 V7 V8 V9 V1 0 V1 1 K1 K3 K4 K5 K2 K6
  • 16.
    How to provisiona Kafka environment ? On Premises • Bare Metal Installation • Docker • Mesos / Kubernetes • Hadoop Distributions Cloud • Oracle Event Hub Cloud Service • Azure HDInsight Kafka • Confluent Cloud • …
  • 17.
    Important Broker ConfigurationSettings Name Description broker.id Every broker must have an integer identifier which is unique within the cluster listeners Comma-separated list of URIs we will listen on and the listener names zookeeper.connect Location of the Zookeeper used for storing the broker metadata auto.create.topic.enable Enable automatic creation of topics on the server. default.replication.factor Specifies default replication factors for automatically created topics. num.partitions Specifies the default number of log partitions per topic, for automatically created topics. delete.topic.enable Allows users to delete a topic from Kafka using the admin tool, for Kafka versions 0.9 and later. see also: https://docs.confluent.io/current/installation/configuration/broker-configs.html
  • 18.
    Important Broker ConfigurationSettings (II) Name Description log.dirs Comma-separated list of paths on the local filesystem, where Kafka will persist the log segments log.retentions.[ms|minutes |hours] The number of milliseconds/minutes/hours to keep a log file before deleting it log.retention.bytes The maximum size of the log before deleting it log.segment.bytes The maximum size of a single log file broker.rack Rack of the broker. This will be used in rack aware replication assignment for fault tolerance. message.max.bytes The largest record batch size allowed by Kafka, defaults to 1MB see also: https://docs.confluent.io/current/installation/configuration/broker-configs.html
  • 19.
    How to createa Topic • Command line interface Alternatives: • Using AdminUtils.createTopic method • Auto-create via auto.create.topics.enable = true $ kafka-topics.sh --zookeeper zk1:2181 --create --topic my.topic –-partitions 3 –-replication-factor 2 --config x=y
  • 20.
    Important Topic ConfigurationSettings Name Description cleanup.policy This string designates the retention policy to use on old log segments. Either “compact” or “delete”. min.cleanable.dirty.ration This configuration controls how frequently the log compactor will attempt to clean the log min.compaction.lag.ms The minimum time a message will remain uncompacted in the log segment.ms controls the period of time after which Kafka will force the log to roll even if the segment file isn’t full to ensure that retention can delete or compact old data message.max.bytes The largest record batch size allowed by Kafka, defaults to 1MB see also: https://docs.confluent.io/current/installation/configuration/topic-configs.html
  • 21.
    Demo (I) Truck-2 truck position Truck-1 Truck-3 console consumer Testdata-Generator byHortonworks 1522846456703,101,31,1927624662,Normal,37. 31,-94.31,-4802309397906690837
  • 22.
    Demo (I) –Create Kafka Topic $ kafka-topics --zookeeper zookeeper:2181 --create --topic truck_position --partitions 8 --replication-factor 1 $ kafka-topics --zookeeper zookeeper:2181 –list __consumer_offsets _confluent-metrics _schemas docker-connect-configs docker-connect-offsets docker-connect-status truck_position
  • 23.
    Demo (I) –Run Producer and Kafka-Console-Consumer
  • 24.
    Demo (I) –Java Producer to "truck_position" Constructing a Kafka Producer private Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers","broker-1:9092); kafkaProps.put("key.serializer", "...StringSerializer"); kafkaProps.put("value.serializer", "...StringSerializer"); producer = new KafkaProducer<String, String>(kafkaProps); ProducerRecord<String, String> record = new ProducerRecord<>("truck_position", driverId, eventData); try { metadata = producer.send(record).get(); } catch (Exception e) {}
  • 25.
    Demo (II) –devices send to MQTT instead of Kafka Truck-2 truck/nn/ position Truck-1 Truck-3 1522846456703,101,31,1927624662,Normal,37. 31,-94.31,-4802309397906690837
  • 26.
    Demo (II) –devices send to MQTT instead of Kafka
  • 27.
    Demo (II) -devices send to MQTT instead of Kafka – how to get the data into Kafka? Truck-2 truck/nn/ position Truck-1 Truck-3 truck position raw ? 1522846456703,101,31,1927624662,Normal,37. 31,-94.31,-4802309397906690837
  • 28.
    Apache Kafka –wait there is more! Source Connector trucking_ driver Kafka Broker Sink Connector Stream Processing
  • 29.
  • 30.
    Kafka Client Architecture KafkaBroker and Protocol Librdkafka (C & C++) Confluent REST Proxy Kafka Java API Confluent MQTTT Proxy C#/.NET C++ Node.js PHP PHP … Application / Client Code Client Side Server Side
  • 31.
    Librdkafka Librdkafka is aC library implementation of the Kafka protocol with both producer and consumer support (https://github.com/edenhill/librdkafka) • High-level producer • High-level balanced KafkaConsumer (requires broker >= 0.9) • Simple consumer (legacy) • Compression: snappy, gzip, lz4 • SSL Support • SASL • Broker version support >= 0.8 (broker version compatibility) • Statistic metrics
  • 32.
    PHP Kafka Client 2different versions on top of librdkafka exists, only php-rdkafka is up to date php-rdkafka (https://github.com/arnaud-lb/php-rdkafka) • Thin wrapper of librdkafka • Supports high-level • Supports low-level consumer API • Supports producer API • Supports metadata API
  • 33.
  • 34.
    Apache Kafka –Producer Producer Kafka Broker Consumer Kafka Kafka msgmsg
  • 35.
    Kafka Producer Producers sendrecords to topics Producer picks which partition to send record to per topic • Can be done in a round-robin • Can be done sticky (by key) • Can be done by priority (by manually selecting partition) • Typically based on key of record • Kafka default partitioner for Java uses hash of keys to choose partitions, or a round-robin strategy if no key Important: Producer picks partition P 0 1 2 3 4 5
  • 36.
    Strong Ordering Guarantees mostbusiness systems need strong ordering guarantees messages that require relative ordering need to be sent to the same partition supply same key for all messages that require a relative order To maintain global ordering use a single partition topic Producer 1 Consumer 1 Broker 1 Broker 2 Broker 3 Consumer 2 Consumer 3 Key-1 Key-2 Key-3 Key-4 Key-5 Key-6 Key-3 Key-1 Consumer GroupP1 P2 P3 P4 P5 P6
  • 37.
    Kafka Producer –High Level Overview Producer Client Kafka Broker Movement Topic Partition 0 Partitione r Movement Topic Serializer Producer Record message 1 message 2 message 3 message 4 Batch Movement Topic Partition 1 message 1 message 2 message 3 Batch Partition 0 Partition 1 Retr y? Fail ? Topic Message [ Partition ] [ Key ] Value yes yes if can’t retry: throw exception successful: return metadata Compression(optional)
  • 38.
    Kafka Producer -Java API Constructing a Kafka Producer • bootstrap.servers - List of host:port pairs of brokers that the producer will use to establish initial connection to the Kafka cluster • key.serializer – Name of a class that will be used to serialize the keys of the records we will produce to Kafka • value.serializer - Name of a class that will be used to serialize the values of the records we will produce to Kafka private Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092"); kafkaProps.put("key.serializer", "...StringSerializer"); kafkaProps.put("value.serializer", "...StringSerializer"); producer = new KafkaProducer<String, String>(kafkaProps);
  • 39.
    Kafka Producer -Java API Sending Message Fire-and-Forget (no control if message has been sent successful) ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "Key", "Value"); try { producer.send(record); } catch (Exception e) {}
  • 40.
    Kafka Producer -Java API Sending Message Synchronously (wait until reply from Kafka arrives back) ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "Key", "Value"); try { producer.send(record).get(); } catch (Exception e) {}
  • 41.
    Kafka Producer -Java API Sending Message Asynchronously private class ProducerCallback implements Callback { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { e.printStackTrace(); } } } ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "key", "value"); producer.send(record, new ProducerCallback());
  • 42.
    Kafka Producer -Durability Guarantees Producer can configure acknowledgements (acks property) Value Description Throughpu t Latency Durability 0 • Producer doesn’t wait for leader high low low (no guarantee) 1 (default) • Producer waits for leader • Leader sends ack when message written to log • No wait for followers medium medium medium (leader) all (-1) • Producer waits for leader • Leader sends ack when all In-Sync Replica have acknowledged low high high (ISR)
  • 43.
    Important Producer ConfigurationSettings Name Description key.serializer (Java) Serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface value.serializer (Java) Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. acks The number of acknowledgments the producer requires the leader to have received before considering a request complete. Controls durability of records that are sent. bootstrap.servers List of host:port pairs of brokers that the producer will use to establish initial connection to the Kafka cluster compression.type The compression type for all data generated by the producer (none, gzip, snappy, lz4) Apache Kafka: https://kafka.apache.org/documentation/#configuration Librdkafka: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
  • 44.
    Important Producer ConfigurationSettings (II) Name Description retries Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. retry.backoff.ms amount of time to wait before attempting to retry a failed request to a given topic partition request.timeout.ms maximum amount of time the client will wait for the response of a request. batch.size (Java) controls the default batch size in bytes (batch.num.messages in librdkafka) linger.ms controls if data should be sent before batch is full max.in.flight.requests .per.connection maximum number of unacknowledged requests the client will send on a single connection before blocking Apache Kafka: https://kafka.apache.org/documentation/#configuration Librdkafka: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
  • 45.
  • 46.
    Apache Kafka –Consumer Producer Kafka Broker Consumer Kafka Kafka msgmsg
  • 47.
    Kafka Consumer -Partition offsets Offset: messages in the partitions are each assigned a unique (per partition) and sequential id called the offset • Consumers track their pointers via (offset, partition, topic) tuples • Consumers remember offset where they left of • Consumers groups each have their own offset per partition Consumer Group A Consumer Group B
  • 48.
    How does Consumerwork? Kafka Broker Msg(s) Commit Consumer Input Topic __consumer_offsets Topic Polling Loop Loop Process (msgs) Commit(offsets)
  • 49.
    Consumer Using auto-commit mode •set auto.commit to true • set auto.commit.interval.ms to control the frequency of commits Commit current offsets • set auto.commit to false • Manually invoke commit() to commit the offsets when processing is done consumer.subscribe([topics]) while(true) msgs = consumer.poll(waitForMs) for (msg IN msgs) process(msg) consumer.close() consumer.subscribe([topics]) while(true) msgs = consumer.poll(waitForMs) for (msg IN msgs) process(msg) consumer.commit() consumer.close()
  • 50.
    Kafka Consumer –Java API Constructing a Kafka Consumer private Properties kafkaProps = new Properties(); kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092"); kafkaProps.put("group.id","MovementsConsumerGroup"); kafkaProps.put("key.deserializer", "...StringDeSerializer"); kafkaProps.put("value.deserializer", "...StringDeSerializer"); consumer = new KafkaConsumer<String, String>(kafkaProps);
  • 51.
    Kafka Consumer Subscription- Java API Next step after creating the consumer is to subscribe on one or more topics Subscribe takes a list of topics as a parameter Can also subscribe on regular expression consumer.subscribe( Collections.singletonList(”truck_movement")); consumer.subscribe(“truck.*”);
  • 52.
    Kafka Consumer PollLoop – Java API Kafka Consumer Poll Loop (with synchronous offset commit) consumer.subscribe(Collections.singletonList("topic")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { // process message, available information: // record.topic(), record.partition(), record.offset(), // record.key(), record.value()); } consumer.commitSync(); } } finally { consumer.close(); }
  • 53.
    Run your ownoffset commit store Processed messages stored in a target supporting transactions If using Kafka default offset commitment, potential for duplicates, if commit fails Store processed messages and offset in atomic action into same database consumer.subscribe([topics]) while(true) msgs = consumer.poll(waitForMs) for (msg IN msgs) process(msg) storeInDB(msg) consumer.commit(currentOffsets) consumer.close() consumer.subscribe([topics]) while(true) msgs = consumer.poll(waitForMs) for (msg IN msgs) process(msg) storeInDB(msg) storeInDBV(currentOffsets) consumer.close()
  • 54.
    Run your ownoffset commit store Kafka Broker Msg(s) Commit Consumer Input Topic __consumer_offsets Topic Polling Loop DB Processed Msgs Offset Commits seekTo(offset) Loop storeToDB (msgs) storeToDB(offsets) Msg(s) process (msgs) Transaction
  • 55.
    Important Consumer ConfigurationSettings Name Description key.serializer (Java) Serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface value.serializer (Java) Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface. bootstrap.servers List of host:port pairs of brokers that the producer will use to establish initial connection to the Kafka cluster fetch.min.bytes Minimum number of bytes the broker responds with. If fetch.wait.max.ms expires before the accumulated data will be sent fetch.wait.max.ms The maximum amount of time the server will block before answering the fetch request Apache Kafka: https://kafka.apache.org/documentation/#configuration Librdkafka: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
  • 56.
    Important Consumer ConfigurationSettings (II) Name Description max.partition.fetch.byt es controls the maximum number of bytes the server will return per partition session.timeout.ms amount of time a consumer can be out of contact with the brokers while still considered alive auto.offset.reset behavior of the consumer when it starts reading a partition for which it doesn’t have a committed offset or if the committed offset it has is invalid. “Earliest” or “latest” (default) enable.auto.commit controls whether the consumer will commit offsets automatically, and defaults to true auto.commit.interval.ms controls how frequently offsets will be committed Apache Kafka: https://kafka.apache.org/documentation/#configuration Librdkafka: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
  • 57.
  • 58.
    Kafka Connect -Overview Source Connecto r Sink Connecto r
  • 59.
    Kafka Connect –Single Message Transforms (SMT) Simple Transformations for a single message Defined as part of Kafka Connect • some useful transforms provided out-of-the-box • Easily implement your own Optionally deploy 1+ transforms with each connector • Modify messages produced by source connector • Modify messages sent to sink connectors Makes it much easier to mix and match connectors Some of currently available transforms: • InsertField • ReplaceField • MaskField • ValueToKey • ExtractField • TimestampRouter • RegexRouter • SetSchemaMetaData • Flatten • TimestampConverter
  • 60.
    Kafka Connect –Many Connectors 60+ since first release (0.9+) 20+ from Confluent and Partners Source: http://www.confluent.io/product/connectors Confluent supported Connectors Certified Connectors Community Connectors
  • 61.
  • 62.
    Demo (III) –Create MQTT Connect through REST API #!/bin/bash curl -X "POST" "http://192.168.69.138:8083/connectors" -H "Content-Type: application/json" -d $'{ "name": "mqtt-source", "config": { "connector.class": "com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector", "connect.mqtt.connection.timeout": "1000", "tasks.max": "1", "connect.mqtt.kcql": "INSERT INTO truck_position SELECT * FROM truck/+/position", "name": "MqttSourceConnector", "connect.mqtt.service.quality": "0", "connect.mqtt.client.id": "tm-mqtt-connect-01", "connect.mqtt.converter.throw.on.error": "true", "connect.mqtt.hosts": "tcp://mosquitto:1883" } }'
  • 63.
    Demo (III) –Call REST API and Kafka Console Consumer
  • 64.
    Demo (III) Truck-2 truck/nn/ position Truck-1 Truck-3 mqtt to kafka truck_ position console consumer whatabout some analytics ? 1522846456703,101,31,1927624662,Normal,37. 31,-94.31,-4802309397906690837
  • 65.
  • 66.
    KSQL: a StreamingSQL Engine for Apache Kafka • Enables stream processing with zero coding required • The simples way to process streams of data in real-time • Powered by Kafka and Kafka Streams: scalable, distributed, mature • All you need is Kafka – no complex deployments • available as Developer preview! • STREAM and TABLE as first-class citizens • STREAM = data in motion • TABLE = collected state of a stream • join STREAM and TABLE
  • 67.
  • 68.
    Demo (IV) -Start Kafka KSQL $ docker-compose exec ksql-cli ksql-cli local --bootstrap-server broker-1:9092 ====================================== = _ __ _____ ____ _ = = | |/ // ____|/ __ | | = = | ' /| (___ | | | | | = = | < ___ | | | | | = = | . ____) | |__| | |____ = = |_|______/ __________| = = = = Streaming SQL Engine for Kafka = Copyright 2017 Confluent Inc. CLI v0.1, Server v0.1 located at http://localhost:9098 Having trouble? Type 'help' (case-insensitive) for a rundown of how things work! ksql>
  • 69.
    Demo (IV) -Create Stream ksql> CREATE STREAM truck_position_s (ts VARCHAR, truckId VARCHAR, driverId BIGINT, routeId BIGINT, eventType VARCHAR, latitude DOUBLE, longitude DOUBLE, correlationId VARCHAR) WITH (kafka_topic='truck_position', value_format='DELIMITED'); Message ---------------- Stream created
  • 70.
    Demo (IV) -Create Stream ksql> SELECT * FROM truck_position_s; 1522847870317 | "truck/13/position0 | �1522847870310 | 44 | 13 | 1390372503 | Normal | 41.71 | -91.32 | -2458274393837068406 1522847870376 | "truck/14/position0 | �1522847870370 | 35 | 14 | 1961634315 | Normal | 37.66 | -94.3 | -2458274393837068406 1522847870418 | "truck/21/position0 | �1522847870410 | 58 | 21 | 137128276 | Normal | 36.17 | -95.99 | -2458274393837068406 1522847870397 | "truck/29/position0 | �1522847870390 | 18 | 29 | 1090292248 | Normal | 41.67 | -91.24 | -2458274393837068406 ksql> SELECT * FROM truck_position_s WHERE eventType != 'Normal'; 1522847914246 | "truck/11/position0 | �1522847914240 | 54 | 11 | 1198242881 | Lane Departure | 40.86 | -89.91 | -2458274393837068406 1522847915125 | "truck/10/position0 | �1522847915120 | 93 | 10 | 1384345811 | Overspeed | 40.38 | -89.17 | -2458274393837068406 1522847919216 | "truck/12/position0 | �1522847919210 | 75 | 12 | 24929475 | Overspeed | 42.23 | -91.78 | -2458274393837068406
  • 71.
    Demo (IV) -Create Stream ksql> describe truck_position_s; Field | Type --------------------------------- ROWTIME | BIGINT ROWKEY | VARCHAR(STRING) TS | VARCHAR(STRING) TRUCKID | VARCHAR(STRING) DRIVERID | BIGINT ROUTEID | BIGINT EVENTTYPE | VARCHAR(STRING) LATITUDE | DOUBLE LONGITUDE | DOUBLE CORRELATIONID | VARCHAR(STRING)
  • 72.
    Demo (IV) -Create Stream ksql> CREATE STREAM dangerous_driving_s WITH (kafka_topic= dangerous_driving_s', value_format='JSON') AS SELECT * FROM truck_position_s WHERE eventtype != 'Normal'; Message ---------------------------- Stream created and running ksql> select * from dangerous_driving_s; 1522848286143 | "truck/15/position0 | �1522848286125 | 98 | 15 | 987179512 | Overspeed | 34.78 | -92.31 | -2458274393837068406 1522848295729 | "truck/11/position0 | �1522848295720 | 54 | 11 | 1198242881 | Unsafe following distance | 38.43 | -90.35 | -2458274393837068406 1522848313018 | "truck/11/position0 | �1522848313000 | 54 | 11 | 1198242881 | Overspeed | 41.87 | -87.67 | -2458274393837068406
  • 73.
    Demo (V) Truck-2 truck/nn/ position Truck-1 Truck-3 mqtt- source truck_ position detect_dang erous_drivin g dangerous_ driving Truck Driver jdbc- source trucking_ driver join_dangero us_driving_dr iver dangerous_d riving_driver 27, Walter,Ward, Y, 24-JUL-85, 2017-10-02 15:19:00 console consumer {"id":27,"firstName":"Walter" ,"lastName":"Ward","availab le":"Y","birthdate":"24-JUL- 85","last_update":15069230 52012} 1522846456703,101,31,1927624662,Normal,37. 31,-94.31,-4802309397906690837
  • 74.
    Demo (V) –Create JDBC Connect through REST API #!/bin/bash curl -X "POST" "http://192.168.69.138:8083/connectors" -H "Content-Type: application/json" -d $'{ "name": "jdbc-driver-source", "config": { "connector.class": "JdbcSourceConnector", "connection.url":"jdbc:postgresql://db/sample?user=sample&password=sample", "mode": "timestamp", "timestamp.column.name":"last_update", "table.whitelist":"driver", "validate.non.null":"false", "topic.prefix":"trucking_", "key.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "name": "jdbc-driver-source", "transforms":"createKey,extractInt", "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey", "transforms.createKey.fields":"id", "transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractInt.field":"id" } }'
  • 75.
    Demo (V) –Create JDBC Connect through REST API
  • 76.
    Demo (V) -Create Table with Driver State ksql> CREATE TABLE driver_t (id BIGINT, first_name VARCHAR, last_name VARCHAR, available VARCHAR) WITH (kafka_topic='trucking_driver', value_format='JSON', key='id'); Message ---------------- Table created
  • 77.
    Demo (V) -Create Table with Driver State ksql> CREATE STREAM dangerous_driving_and_driver_s WITH (kafka_topic='dangerous_driving_and_driver_s', value_format='JSON') AS SELECT driverId, first_name, last_name, truckId, routeId, eventtype FROM truck_position_s LEFT JOIN driver_t ON dangerous_driving_and_driver_s.driverId = driver_t.id; Message ---------------------------- Stream created and running ksql> select * from dangerous_driving_and_driver_s; 1511173352906 | 21 | 21 | Lila | Page | 58 | 1594289134 | Unsafe tail distance 1511173353669 | 12 | 12 | Laurence | Lindsey | 93 | 1384345811 | Lane Departure 1511173435385 | 11 | 11 | Micky | Isaacson | 22 | 1198242881 | Unsafe tail distance
  • 78.
  • 79.
    Kafka Streams -Overview • Designed as a simple and lightweight library in Apache Kafka • no external dependencies on systems other than Apache Kafka • Part of open source Apache Kafka, introduced in 0.10+ • Leverages Kafka as its internal messaging layer • Supports fault-tolerant local state • Event-at-a-time processing (not microbatch) with millisecond latency • Windowing with out-of-order data using a Google DataFlow-like model Source: Confluent
  • 80.
    Kafka Stream DSLand Processor Topology KStream<Integer, String> stream1 = builder.stream("in-1"); KStream<Integer, String> stream2= builder.stream("in-2"); KStream<Integer, String> joined = stream1.leftJoin(stream2, …); KTable<> aggregated = joined.groupBy(…).count("store"); aggregated.to("out-1"); 1 2 l j a t State
  • 81.
    Kafka Stream DSLand Processor Topology KStream<Integer, String> stream1 = builder.stream("in-1"); KStream<Integer, String> stream2= builder.stream("in-2"); KStream<Integer, String> joined = stream1.leftJoin(stream2, …); KTable<> aggregated = joined.groupBy(…).count("store"); aggregated.to("out-1"); 1 2 l j a t State
  • 82.
    Kafka Streams Cluster ProcessorTopology Kafka Cluster input-1 input-2 store (changelog) output 1 2 l j a t State
  • 83.
    Kafka Cluster Processor Topology input-1 Partition0 Partition 1 Partition 2 Partition 3 input-2 Partition 0 Partition 1 Partition 2 Partition 3 Kafka Streams 1 Kafka Streams 2
  • 84.
    Kafka Cluster Processor Topology input-1 Partition0 Partition 1 Partition 2 Partition 3 input-2 Partition 0 Partition 1 Partition 2 Partition 3 Kafka Streams 1 Kafka Streams 2 Kafka Streams 3 Kafka Streams 4
  • 85.
    Kafka Streams: KeyFeatures • Native, 100%-compatible Kafka integration • Secure stream processing using Kafka's security features • Elastic and highly scalable • Fault-tolerant • Stateful and stateless computations • Interactive queries • Time model • Windowing • Supports late-arriving and out-of-order data • Millisecond processing latency, no micro-batching • At-least-once and exactly-once processing guarantees
  • 86.
  • 87.
    Demo (IV) -Create Stream final KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> source = builder.stream(stringSerde, stringSerde, "truck_position"); KStream<String, TruckPosition> positions = source.map((key,value) -> new KeyValue<>(key, TruckPosition.create(key,value))); KStream<String, TruckPosition> filtered = positions.filter(TruckPosition::filterNonNORMAL); filtered.map((key,value) -> new KeyValue<>(key,value.toCSV())) .to("dangerous_driving");
  • 88.
  • 89.
    Why/When to weget duplicate messages? Producer: everything works fine on broker side, but ACK is not received by Producer Consumer: message has been processed but offset commit failed. Next consume will re-consume from last committed offset and some duplicate processing will occur. Producer Kafka Broker A Ack A A A retry Kafka Broker A Commit A A B Consumer reconsume Process A Process A
  • 90.
    What to dowith duplicate messages • Avoid it -> not possible if using at-least-once • Do nothing -> implement all final message consumers in an idempotent manner • Use Kafka “Exactly Once Message” (EOS) Processing • Only available within Kafka • Currently only available for Java clients • Planed to make it available to librdkafka in Q3 of 2018 (see issue 1308) • Implement Deduplicator functionality (see next slide)
  • 91.
    Deduplicator Pseudo-code for thededuplication functionality • Each message needs an unique id • A list of “seen messages” has to be stored in an efficient manner (writing, detecting existence, aging out) • Time or space window for removing message-ids after a while (in order to grow for ever) High-level architecture of an efficient Deduplicator • See also: • Delivering billions of messages exactly once • Event Deduplication Example for Kafka Streams def dedupe(stream): for message in stream: if has_seen(message.id): discard(message) else: publish_and_commit(message)
  • 92.
    Kafka in modernSoftware Architecture
  • 93.
    Shop Rich UI ShopBackend Application Traditional Approach Search Facade Customer DAO Order DAO Order Facade Shop UI Product DAO UI Logic DataBusiness GUI Customer Fat Client App Customer BOCustomer UI DataGUI Data Storage Shared Database sync request/response
  • 94.
    Shop UI App Business ActivityService SOA Approach Contract-first Web Services Technical layers offer their own interfaces Reuse on each level Lower layer often wraps legacy code Search BAS Customer DAO Order DAO Order BAS Shop UI Product DAO UI Logic GUI Business Entity ServiceShop Web App Shop UI UI Logic GUI Data Storage Customer Database Customer BES Payment BES Product BES Order BES Custer BAS Order and Product DB SOAP SOAP SOAP SOAP SOAP SOAP SOAP
  • 95.
    Shop UI App Business ActivityService Virtualized SOA Approach Search BAS Customer DAO Order DAO Order BAS Shop UI UI Logic GUI Business Entity Service Shop Web App Shop UI UI Logic GUI Data Storage Customer Database Customer BES Payment BES Product BES Order BES Custer BAS Order and Product DB Service Virtualization Layer Service Bus SOAP SOAP SOAP SOAP SOAP SOAP SOAP
  • 96.
    Microservice Approach Tightly Scopedbehind clear interfaces Responsible for managing their own data (not necessarily the infrastructure) Should be highly decoupled Independently deployable, self- contained and autonomous SOA done right ?! Tightly Scoped Responsible for managing their own data Highly decoupled Independently deployable, self- contained and autonomous { } Customer API Customer Customer Logic Order Microservice { } Order API Order Order Logic Product Microservice { } Product API Product Product Logic Stock Microservice { } Stock API Stock Stock Logic Shop Web App Shop UI UI Logic GUI REST REST REST REST
  • 97.
    Microservice Approach with APIGateway Customer Microservice { } Customer API Customer Customer Logic Order Microservice { } Order API Order Order Logic Product Microservice { } Product API Product Product Logic Stock Microservice { } Stock API Stock Stock Logic Shop Web App Shop UI UI Logic GUI REST REST REST REST API Gateway
  • 98.
    Synchronous World ofRequest-Response leads to tight, point-to-point couplings problem in lower end of chain have a ripple effect on other service • crash of service • overloaded service / slow response time • change of interface Service 2Service 1 { } API Logic { } API Logic StateState Service 3 { } API Logic State Service 4 { } API Logic State Service 5 { } API Logic State Service 7 { } API Logic State Service 6 { } API Logic State
  • 99.
    Three mechanisms throughwhich services can interact Request-Driven (Imperative) Event Driven (Functional) Service Logic State Consume Event “Event Ordered” OrderEvent Command ”Order IPad” boolean order(IPad) Publish Event ”Event Validated” OrderValidatedEvent Query ”Retrieve my Orders List<Orders> getAllOrders(for) Event Broker Service Logic State
  • 100.
    Event-Driven (Async) Microservice Approach CustomerMicroservice { } Customer API Customer Customer Logic Order Microservice { } Order API Order Order Logic Product Microservice { } Product API Product Product Logic Stock Microservice { } Stock API Stock Stock Logic Shop Web App Shop UI UI Logic GUI REST REST REST REST API Gateway Event Store sync request/response async request/response async, event pub/sub
  • 101.
    Hadoop Clusterd Hadoop Cluster BigData Kafka – the Event Hub and more …. ! Billing & Ordering CRM / Profile Marketing Campaigns SQL Search BI Tools Enterprise Data Warehouse Search / Explore Online & Mobile Apps File Import / SQL Import Event Hub Parallel Processing Storage Storage RawRefined Results Microservice State { } API Stream Processor State { } API Event Stream Event Stream Service Location Social Click stream Sensor Data Mobile Apps Weather Data Stream Processing Microservices
  • 102.
    Hadoop Clusterd Hadoop Cluster BigData Kafka – the Event Hub and more …. ! Billing & Ordering CRM / Profile Marketing Campaigns SQL Search BI Tools Enterprise Data Warehouse Search / Explore Online & Mobile Apps File Import / SQL Import Event Hub Parallel Processing Storage Storage RawRefined Results Microservice State { } API Stream Processor State { } API Event Stream Event Stream Service Location Social Click stream Sensor Data Mobile Apps Weather Data Stream Processing Microservices
  • 103.
  • 104.
    Command Query ResponsibilitySegregation (CQRS) Optimize different nonfunctional requirements for read and write behavior split between • commands that trigger changes in state • queries that provide read access to the state of resources support services with higher performance and capacity requirements for reading data than for writing data Data Storage Write Data Store Read Data Store (Materialized Views) Service Command Service Query Service App UI Projection Service UI Logic
  • 105.
    Event Sourcing persists thestate of a business entity as a sequence of state-changing events Whenever state of business entity changes, a new event is appended to the list of events Saving an event is a single operation and is inherently atomic The application reconstructs an entity’s current state by replaying the events Data Storage Event Store Service Event Service Publisher App UI UI Logic Replayer Other App
  • 106.
    Event Sourcing &CQRS Event sourcing is commonly combined with the CQRS pattern materializing views from the stored events Optionally Commands can be stored in event store and transformed into events by the command handler Data Storage Event Store Service Command Service App UI UI Logic Query Service Read Data Store (Materialized Views) Projection Service Command Handler
  • 107.
    Have only one„source of truth“ Avoid double write! • Would need distributed transactions Write Event first then consume it from same micro service • “eat your own dog food” Microservice { } API StateLogic REST Event Store Event Microservice { } API StateConsumer REST Event Store Event Publisher
  • 108.
    Using Event Sourcingwith Microservices “Event sourcing enables building a forward-compatible application architecture — the ability to add more applications in the future that need to process the same event but create a different materialized view.” Neha Narkhede, Confluent Blog Microservice State Command Handler { } API REST Event Store Projection Handler(s) Query Logic Event Handler(s) Event Subscribe
  • 109.
    How many EventStores do we need ? Microservice { } API StateLogic REST Event Store Event Microservice { } API StateLogic REST Event Store Event Microservice { } API StateLogic REST Event Store Event Microservice { } API StateLogic REST Microservice { } API StateLogic REST Event Store Event Microservice { } API StateLogic REST OR
  • 110.
    Technology on itsown won't help you. You need to know how to use it properly.