Skip to content

aasmc/event-driven-microservices-kafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

58 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

Event Driven Microservices With Kafka Streams and ksqlDB

Π”Π΅ΠΌΠΎ ΠΏΡ€ΠΎΠ΅ΠΊΡ‚ ΠΏΠΎ микросСрвисной Π°Ρ€Ρ…ΠΈΡ‚Π΅ΠΊΡ‚ΡƒΡ€Π΅ Π½Π° основС событий. Основан Π½Π° ΡΡ‚Π°Ρ‚ΡŒΡΡ… Confluent:

  1. Tutorial: Introduction to Streaming Application Development

  2. Building a Microservices Ecosystem with Kafka Streams and KSQL

Π‘Ρ‚Π΅ΠΊ Π’Π΅Ρ…Π½ΠΎΠ»ΠΎΠ³ΠΈΠΉ

  1. Kotlin
  2. Spring Boot
  3. Apache Kafka
  4. Kafka Streams
  5. ksqlDB
  6. Gradle
  7. Docker / docker-compose
  8. Avro Serialization
  9. Confluent Schema Registry
  10. Kafka Connect
  11. ElasticSearch
  12. SQLite

Π’Π΅Ρ€Ρ…Π½Π΅ΡƒΡ€ΠΎΠ²Π½Π΅Π²ΠΎΠ΅ описаниС Π°Ρ€Ρ…ΠΈΡ‚Π΅ΠΊΡ‚ΡƒΡ€Ρ‹

architecture_overview.png

Π¦Π΅Π½Ρ‚Ρ€ΠΎΠΌ систСмы микросСрвисов являСтся Orders Service, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ прСдоставляСт ΠΊΠ»ΠΈΠ΅Π½Ρ‚Π°ΠΌ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡ‚ΡŒ ΠΏΠΎΡΡ‹Π»Π°Ρ‚ΡŒ REST запросы Π½Π°:

ΠŸΡ€ΠΈ создании Π·Π°ΠΊΠ°Π·Π° Π² Kafka Ρ‚ΠΎΠΏΠΈΠΊ orders.v1 посылаСтся ΡΠΎΠΎΡ‚Π²Π΅Ρ‚ΡΡ‚Π²ΡƒΡŽΡ‰Π΅Π΅ событиС, ΠΎΠ½ΠΎ вычитываСтся Ρ€Π°Π·Π»ΠΈΡ‡Π½Ρ‹ΠΌΠΈ сСрвисами, ΠΎΡ‚Π²Π΅Ρ‡Π°ΡŽΡ‰ΠΈΠΌΠΈ Π·Π° Π²Π°Π»ΠΈΠ΄Π°Ρ†ΠΈΡŽ Π·Π°ΠΊΠ°Π·Π°: Inventory Service, Fraud Service ΠΈ Order Details Service. Π­Ρ‚ΠΈ сСрвисы ΠΏΠ°Ρ€Π°Π»Π»Π΅Π»ΡŒΠ½ΠΎ ΠΈ нСзависимо Π΄Ρ€ΡƒΠ³ ΠΎΡ‚ Π΄Ρ€ΡƒΠ³Π° ΠΎΡΡƒΡ‰Π΅ΡΡ‚Π²Π»ΡΡŽΡ‚ Π²Π°Π»ΠΈΠ΄Π°Ρ†ΠΈΡŽ Π·Π°ΠΊΠ°Π·Π°, Π° Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ PASS ΠΈΠ»ΠΈ FAIL Π·Π°ΠΏΠΈΡΡ‹Π²Π°ΡŽΡ‚ Π² Ρ‚ΠΎΠΏΠΈΠΊ Kafka order-validations.v1. Validation Aggregator Service Π²Ρ‹Ρ‡ΠΈΡ‚Ρ‹Π²Π°Π΅Ρ‚ этот Ρ‚ΠΎΠΏΠΈΠΊ, Π°Π³Π³Ρ€Π΅Π³ΠΈΡ€ΡƒΠ΅Ρ‚ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Ρ‹ ΠΏΠΎ ΠΊΠ°ΠΆΠ΄ΠΎΠΌΡƒ Π·Π°ΠΊΠ°Π·Ρƒ ΠΈ отправляСт ΠΊΠΎΠ½Π΅Ρ‡Π½Ρ‹ΠΉ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ Π² Ρ‚ΠΎΠΏΠΈΠΊ orders.v1.

Для Ρ€Π΅Π°Π»ΠΈΠ·Π°Ρ†ΠΈΠΈ GET запросов Orders Service ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ ΠΌΠ°Ρ‚Π΅Ρ€ΠΈΠ°Π»ΠΈΠ·ΠΎΠ²Π°Π½Π½ΠΎΠ΅ прСдставлСниС (materialized view), ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ΅ встроСно (embedded) Π² ΠΊΠ°ΠΆΠ΄Ρ‹ΠΉ инстанс Orders Service. Π­Ρ‚ΠΎ ΠΌΠ°Ρ‚Π΅Ρ€ΠΈΠ°Π»ΠΈΠ·ΠΎΠ²Π°Π½Π½ΠΎΠ΅ прСдставлСниС Ρ€Π΅Π°Π»ΠΈΠ·ΠΎΠ²Π°Π½ΠΎ Π½Π° основС Kafka Streams queryable state store. Π’Π°ΠΊ ΠΊΠ°ΠΊ state store основан Π½Π° Ρ‚ΠΎΠΏΠΈΠΊΠ°Ρ… Kafka, ΠΈ встроСн Π² ΠΊΠ°ΠΆΠ΄Ρ‹ΠΉ инстанс, ΠΎΠ½ Ρ…Ρ€Π°Π½ΠΈΡ‚ Π΄Π°Π½Π½Ρ‹Π΅ Ρ‚ΠΎΠ»ΡŒΠΊΠΎ Ρ‚Π΅Ρ… ΠΏΠ°Ρ€Ρ‚ΠΈΡ†ΠΈΠΉ Ρ‚ΠΎΠΏΠΈΠΊΠ°, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Π²Ρ‹Ρ‡ΠΈΡ‚Ρ‹Π²Π°ΡŽΡ‚ΡΡ Π΄Π°Π½Π½Ρ‹ΠΌ инстансом. Однако Kafka Streams прСдоставляСт API (Interactive Queries) для Ρ‚ΠΎΠ³ΠΎ, Ρ‡Ρ‚ΠΎΠ±Ρ‹ "Π½Π°ΠΉΡ‚ΠΈ" инстанс, Π½Π° ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌ хранятся Π΄Π°Π½Π½Ρ‹Π΅, Ссли ΠΈΡ… Π½Π΅Ρ‚ локально. Π­Ρ‚ΠΎ позволяСт Ρ€Π΅Π°Π»ΠΈΠ·ΠΎΠ²Π°Ρ‚ΡŒ Π³Π°Ρ€Π°Π½Ρ‚ΠΈΡŽ для ΠΊΠ»ΠΈΠ΅Π½Ρ‚ΠΎΠ² read-your-own-writes (Ρ‡Ρ‚Π΅Π½ΠΈΠ΅ своих записСй).

Π’Π°ΠΊΠΆΠ΅ Π² систСмС Π΅ΡΡ‚ΡŒ простой сСрвис для ΠΎΡ‚ΠΏΡ€Π°Π²ΠΊΠΈ сообщСний Π½Π° ΠΏΠΎΡ‡Ρ‚Ρƒ Email Service (Π² Ρ‚Π΅ΠΊΡƒΡ‰Π΅ΠΉ Ρ€Π΅Π°Π»ΠΈΠ·Π°Ρ†ΠΈΠΈ просто осущСствляСтся Π»ΠΎΠ³ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠ΅ отправляСмых сообщСний).

БСрвис Order Enrichment ΠΎΡ‚Π²Π΅Ρ‡Π°Π΅Ρ‚ Π·Π° созданиС orders_enriched_stream Π² ksqlDB, ΠΈΠ· ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ³ΠΎ ΠΌΠΎΠΆΠ½ΠΎ ΠΏΠΎΠ»ΡƒΡ‡ΠΈΡ‚ΡŒ Π΄Π°Π½Π½Ρ‹Π΅ ΠΎ ΠΏΠΎΠΊΡƒΠΏΠ°Ρ‚Π΅Π»Π΅ ΠΈ Π΅Π³ΠΎ Π·Π°ΠΊΠ°Π·Π΅. Π’Π°ΠΊΠΆΠ΅ этот сСрвис прСдоставляСт REST Ρ€ΡƒΡ‡ΠΊΡƒ для получСния ΠΈΠ½Ρ„ΠΎΡ€ΠΌΠ°Ρ†ΠΈΠΈ ΠΎ Ρ„ΠΎΠ·ΠΌΠΎΠΆΠ½Ρ‹Ρ… ΠΌΠΎΡˆΠ΅Π½Π½ΠΈΡ‡Π΅ΡΠΊΠΈΡ… дСйствиях (Ссли ΠΏΠΎΠΊΡƒΠΏΠ°Ρ‚Π΅Π»ΡŒ осущСствил Π±ΠΎΠ»Π΅Π΅ 2 Π·Π°ΠΊΠ°Π·ΠΎΠ² Π·Π° 30 сСкунд):

Π”Π°Π½Π½Ρ‹Π΅ ΠΎ покупатСлях Π²Ρ‹Ρ‡ΠΈΡ‚Ρ‹Π²Π°ΡŽΡ‚ΡΡ ΠΈΠ· Π‘Π” SQLite с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ Kafka Connect io.confluent.connect.jdbc.JdbcSourceConnector Π² Ρ‚ΠΎΠΏΠΈΠΊ Kafka customers. ΠšΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΡ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Π°: connector_jdbc_customers_template.config

Π”Π°Π½Π½Ρ‹Π΅ ΠΎ Π·Π°ΠΊΠ°Π·Π°Ρ… Π·Π°ΠΏΠΈΡΡ‹Π²Π°ΡŽΡ‚ΡΡ ΠΈΠ· Ρ‚ΠΎΠΏΠΈΠΊΠ° Kafka orders.v1 Π² индСкс orders.v1 ElasticSearch с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ Kafka Connect io.confluent.connect.elasticsearch.ElasticsearchSinkConnector. ΠšΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΡ ΠΊΠΎΠ½Π½Π΅ΠΊΡ‚ΠΎΡ€Π°: connector_elasticsearch_template.config

elastic-search-kafka.png

Π”ΠΈΠ°Π³Ρ€Π°ΠΌΠΌΠ° микросСрвисов ΠΈ Ρ‚ΠΎΠΏΠΈΠΊΠΎΠ² Kafka:

microservices-diagram.png

ВсС микросСрвисы написаны Π½Π° Kotlin с использованиСм Kafka Streams ΠΈ Spring for Apache Kafka.

Π”Π΅Ρ‚Π°Π»ΠΈ Ρ€Π΅Π°Π»ΠΈΠ·Π°Ρ†ΠΈΠΈ

Common

Π”Π°Π½Π½Ρ‹ΠΉ ΠΌΠΎΠ΄ΡƒΠ»ΡŒ собран ΠΊΠ°ΠΊ Π±ΠΈΠ±Π»ΠΈΠΎΡ‚Π΅ΠΊΠ°, ΠΊΠΎΡ‚ΠΎΡ€ΡƒΡŽ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡŽΡ‚ микросСрвисы. Π‘ΠΎΠ΄Π΅Ρ€ΠΆΠΈΡ‚ сгСнСрированныС AVRO классы Π° Ρ‚Π°ΠΊΠΆΠ΅ ΡƒΡ‚ΠΈΠ»Π»ΠΈΡ‚Π½Ρ‹Π΅ классы, ΠΎΠ±Π»Π΅Π³Ρ‡Π°ΡŽΡ‰ΠΈΠ΅ Ρ€Π°Π±ΠΎΡ‚Ρƒ с Ρ‚ΠΎΠΏΠΈΠΊΠ°ΠΌΠΈ, сСриализаторами ΠΈ дСсСриализаторами.

Бущности

ГСнСрация сущностСй, с ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΌΠΈ Ρ€Π°Π±ΠΎΡ‚Π°Π΅Ρ‚ Kafka осущСствляСтся Π½Π° основС схСм Avro с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ Gradle plugin id 'com.github.davidmc24.gradle.plugin.avro' version "1.9.1", Π½Π° этапС сборки Π±ΠΈΠ±Π»ΠΈΠΎΡ‚Π΅ΠΊΠΈ common. НиТС ΠΏΡ€ΠΈΠ²Π΅Π΄Π΅Π½ΠΎ описаниС сущностСй Π±Π΅Π· Π΄ΠΎΠΏΠΎΠ»Π½ΠΈΡ‚Π΅Π»ΡŒΠ½ΠΎΠ³ΠΎ сгСнСрированного ΠΊΠΎΠ΄Π°.

class Customer( val id: Long, val firstName: String, val lastName: String, val email: String, val address: String, val level: String = "bronze" // possible values: platinum, gold, silver, bronze ) enum class OrderState { CREATED, VALIDATED, FAILED, SHIPPED } enum class Product { JUMPERS, UNDERPANTS, STOCKINGS } class Order( val id: String, val customerId: Long, val state: OrderState, val product: Product, val quantity: Int, val price: Double ) class OrderValue( val order: Order, val value: Double ) class OrderEnriched( val id: String, val customerId: Long, val customerLevel: String ) enum class OrderValidationType { INVENTORY_CHECK, FRAUD_CHECK, ORDER_DETAILS_CHECK } enum class OrderValidationResult { PASS, FAIL, ERROR } class OrderValidation( val orderId: String, val checkType: OrderValidationType, val validationResult: OrderValidationResult ) class Payment( val id: String, val orderId: String, val ccy: String, val amount: Double )

РСгистрация схСм Avro Π² Confluent Schema Registry осущСствляСтся с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ Gradle Plugin: id 'com.github.imflog.kafka-schema-registry-gradle-plugin' version "1.12.0" послС Ρ‚ΠΎΠ³ΠΎ, ΠΊΠ°ΠΊ поднят docker ΠΊΠΎΠ½Ρ‚Π΅ΠΉΠ½Π΅Ρ€ schema-registry. Π§Ρ‚ΠΎΠ±Ρ‹ Π·Π°Ρ€Π΅Π³ΠΈΡΡ‚Ρ€ΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ схСмы Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ Π½Π°ΡΡ‚Ρ€ΠΎΠΈΡ‚ΡŒ ΠΏΠ»Π°Π³ΠΈΠ½. Минимальная настройка:

schemaRegistry { url = 'http://localhost:8081' quiet = true register { subject('customers-value', 'common/src/main/avro/customer.avsc', 'AVRO') subject('orders.v1-value', 'common/src/main/avro/order.avsc', 'AVRO') subject('orders-enriched.v1-value', 'common/src/main/avro/orderenriched.avsc', 'AVRO') subject('order-validations.v1-value', 'common/src/main/avro/ordervalidation.avsc', 'AVRO') subject('payments.v1-value', 'common/src/main/avro/payment.avsc', 'AVRO') } }

ΠŸΡ€ΠΈ рСгистрации ΡƒΠΊΠ°Π·Ρ‹Π²Π°ΡŽΡ‚ΡΡ:

  1. НазваниС схСмы. Одним ΠΈΠ· ΠΏΠ°Ρ‚Ρ‚Π΅Ρ€Π½ΠΎΠ² намСнования являСтся TopicName-value / TopicName-key
  2. ΠŸΡƒΡ‚ΡŒ Π΄ΠΎ Ρ„Π°ΠΉΠ»Π° со схСмой
  3. Π€ΠΎΡ€ΠΌΠ°Ρ‚ сСриализации, Π² нашСм случаС - AVRO.

Π”Π°Π»Π΅Π΅ Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ Π·Π°ΠΏΡƒΡΡ‚ΠΈΡ‚ΡŒ таску Gradle ΠΈΠ· ΠΊΠΎΡ€Π½Π΅Π²ΠΎΠΉ Π΄ΠΈΡ€Π΅ΠΊΡ‚ΠΎΡ€ΠΈΠΈ:

./gradlew registerSchemaTask

Для Ρ€Π°Π±ΠΎΡ‚Ρ‹ с записями Π² Ρ‚ΠΎΠΏΠΈΠΊΠ°Ρ… ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠ΅ Kafka Streams Π΄ΠΎΠ»ΠΆΠ½ΠΎ Π·Π½Π°Ρ‚ΡŒ ΠΎ Ρ„ΠΎΡ€ΠΌΠ°Ρ‚Π΅ сСриализации Π΄Π°Π½Π½Ρ‹Ρ…. Для этого Π½Π°ΡΡ‚Ρ€Π°ΠΈΠ²Π°ΡŽΡ‚ΡΡ ΡΠΏΠ΅Ρ†ΠΈΠ°Π»ΡŒΠ½Ρ‹Π΅ org.apache.kafka.common.serialization.Serde классы, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Π·Π½Π°ΡŽΡ‚ ΠΊΠ°ΠΊ ΡΠ΅Ρ€ΠΈΠ°Π»ΠΈΠ·ΠΎΠ²Π°Ρ‚ΡŒ/Π΄Π΅ΡΠ΅Ρ€ΠΈΠ°Π»ΠΈΠ·ΠΎΠ²Π°Ρ‚ΡŒ ΠΊΠ»ΡŽΡ‡/Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅. Для удобства всС настройки ΠΏΠΎΠΌΠ΅Ρ‰Π΅Π½Ρ‹ Π² ΠΎΡ‚Π΄Π΅Π»ΡŒΠ½Ρ‹ΠΉ класс Schemas. Для сгСнСрированных AVRO классов ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ΡΡ io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde, ΠΏΡ€ΠΈ настройкС ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ³ΠΎ указываСтся адрСс Schema Registry.

fun configureSerdes() { val config = hashMapOf<String, Any>( AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to topicsProps.schemaRegistryUrl ) for (topic in ALL.values) { configureSerde(topic.keySerde, config, true) configureSerde(topic.valueSerde, config, false) } configureSerde(ORDER_VALUE_SERDE, config, false) } fun configureSerde(serde: Serde<*>, config: Map<String, Any>, isKey: Boolean) { if (serde is SpecificAvroSerde) { serde.configure(config, isKey) } }

Orders Service

Зависимости:

dependencies { implementation project(':common') implementation 'org.apache.kafka:kafka-streams' implementation 'org.jetbrains.kotlin:kotlin-reflect' implementation 'org.springframework.kafka:spring-kafka' implementation("io.confluent:kafka-avro-serializer:7.5.2") implementation("io.confluent:kafka-streams-avro-serde:7.5.2") implementation("org.apache.avro:avro:1.11.0") annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.boot:spring-boot-testcontainers' testImplementation 'org.testcontainers:junit-jupiter' testImplementation 'org.testcontainers:kafka' }

НСобходимо ΠΎΠ±Ρ€Π°Ρ‰Π°Ρ‚ΡŒ Π²Π½ΠΈΠΌΠ°Π½ΠΈΠ΅ Π½Π° соотвСтствиС вСрсий io.confluent:kafka-avro-serializer:7.5.2 ΠΈ io.confluent:kafka-streams-avro-serde:7.5.2.

Π’ Spring Boot Π½Π΅Ρ‚ нСобходимости Π²Ρ€ΡƒΡ‡Π½ΡƒΡŽ ΡƒΠΏΡ€Π°Π²Π»ΡΡ‚ΡŒ ΠΆΠΈΠ·Π½Π΅Π½Π½Ρ‹ΠΌ Ρ†ΠΈΠΊΠ»ΠΎΠΌ сущности KafkaStreams. Для этого Π΅ΡΡ‚ΡŒ абстракция org.springframework.kafka.config.StreamsBuilderFactoryBean, которая создаСт ΠΈ управляСт org.apache.kafka.streams.KafkaStreams ΠΈ org.apache.kafka.streams.StreamsBuilder. Π’Π΅ΠΌ Π½Π΅ ΠΌΠ΅Π½Π΅Π΅, Π½Π°ΠΌ Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ ΡΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€ΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ Π±ΠΈΠ½ org.springframework.kafka.config.KafkaStreamsConfiguration:

@EnableKafkaStreams @Configuration class KafkaConfig( private val kafkaProps: KafkaProps, private val serviceUtils: ServiceUtils, private val topicProps: TopicsProps ) { @Bean fun outstandingRequests(): MutableMap<String, FilteredResponse<String, Order, OrderDto>> { return ConcurrentHashMap() } @Bean(name = [KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME]) fun kStreamsConfig(): KafkaStreamsConfiguration { val props = hashMapOf<String, Any>( StreamsConfig.APPLICATION_ID_CONFIG to kafkaProps.appId, // must be specified to enable InteractiveQueries and checking metadata of Kafka Cluster StreamsConfig.APPLICATION_SERVER_CONFIG to serviceUtils.getServerAddress(), StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProps.bootstrapServers, StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG to Serdes.String().javaClass.name, // instances MUST have different stateDir StreamsConfig.STATE_DIR_CONFIG to kafkaProps.stateDir, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to kafkaProps.autoOffsetReset, StreamsConfig.PROCESSING_GUARANTEE_CONFIG to kafkaProps.processingGuarantee, StreamsConfig.COMMIT_INTERVAL_MS_CONFIG to kafkaProps.commitInterval, AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to topicProps.schemaRegistryUrl, StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG) to kafkaProps.sessionTimeout ) return KafkaStreamsConfiguration(props) } }

Для Ρ‚ΠΎΠ³ΠΎ, Ρ‡Ρ‚ΠΎΠ±Ρ‹ Ρ€Π°Π±ΠΎΡ‚Π°Π»ΠΈ Interactive Queries (Ссли ΠΌΡ‹ Ρ…ΠΎΡ‚ΠΈΠΌ Π·Π°ΠΏΡ€Π°ΡˆΠΈΠ²Π°Ρ‚ΡŒ Π΄Π°Π½Π½Ρ‹Π΅ Π½Π΅ Ρ‚ΠΎΠ»ΡŒΠΊΠΎ ΠΈΠ· локального state store, Π½ΠΎ ΠΈ с Π΄Ρ€ΡƒΠ³ΠΈΡ… инстансов), Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ Π² конфигурациях ΡƒΠΊΠ°Π·Π°Ρ‚ΡŒ StreamsConfig.APPLICATION_SERVER_CONFIG - адрСс Ρ‚Π΅ΠΊΡƒΡ‰Π΅Π³ΠΎ инстанса Π² Ρ„ΠΎΡ€ΠΌΠ°Ρ‚Π΅ host:port. Π’Π°ΠΊΠΆΠ΅ ΠΌΡ‹ ΡƒΠΊΠ°Π·Ρ‹Π²Π°Π΅ΠΌ адрСс Schema Registry, ΠΏΡƒΡ‚ΡŒ Π΄ΠΎ ΠΊΠ°Ρ‚Π°Π»ΠΎΠ³Π°, Π³Π΄Π΅ Π±ΡƒΠ΄Π΅Ρ‚ хранится state store ΠΈ Ρ‚.Π΄. По ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ Kafka Streams ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ встроСнноС key-value Ρ…Ρ€Π°Π½ΠΈΠ»ΠΈΡ‰Π΅ RocksDB Π² качСствС state store.

ΠŸΡ€ΠΈ стартС прилоТСния Orders Service Π½Π° основС Ρ‚ΠΎΠΏΠΈΠΊΠ° Kafka orders.v1 создаСтся KTable - абстракция Kafka Streams, которая прСдставляСт ΠΈΠ· сСбя snapshot состояния (ΠΌΠΎΠΆΠ½ΠΎ провСсти аналогию с compacted topic) - хранятся послСдниС записи ΠΏΠΎ ΠΊΠ»ΡŽΡ‡Ρƒ Π² Ρ‚ΠΎΠΏΠΈΠΊΠ΅. ΠŸΡ€ΠΈ поступлСнии Π½ΠΎΠ²Ρ‹Ρ… записСй Π΄Π°Π½Π½Ρ‹Π΅ Π² KTable ΠΎΠ±Π½ΠΎΠ²Π»ΡΡŽΡ‚ΡΡ, фактичСски производится опСрация UPSERT - Ссли ΠΊΠ»ΡŽΡ‡ Π΅ΡΡ‚ΡŒ Π² Ρ‚Π°Π±Π»ΠΈΡ†Π΅, Π΄Π°Π½Π½Ρ‹Π΅ ΠΎΠ±Π½ΠΎΠ²Π»ΡΡŽΡ‚ΡΡ, Ссли Π½Π΅Ρ‚ - ΡΠΎΠ·Π΄Π°ΡŽΡ‚ΡΡ, Ссли ΠΏΡ€ΠΈΡ…ΠΎΠ΄ΠΈΡ‚ ΠΊΠ»ΡŽΡ‡ - null - Ρ‚ΠΎ это ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚ ΡƒΠ΄Π°Π»Π΅Π½ΠΈΠ΅ Π΄Π°Π½Π½Ρ‹Ρ…, Π±ΠΎΠ»Π΅Π΅ ΠΎΠ½ΠΈ Π½Π΅ Π±ΡƒΠ΄ΡƒΡ‚ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒΡΡ ΠΏΡ€ΠΈ Π²Ρ‹Π±ΠΎΡ€ΠΊΠ΅ ΠΏΠΎ ΠΊΠ»ΡŽΡ‡Ρƒ ΠΈΠ»ΠΈ соСдинСнии Ρ‚Π°Π±Π»ΠΈΡ†Ρ‹ с Π΄Ρ€ΡƒΠ³ΠΈΠΌΠΈ сущностями KafkaStreams (KStream, GlobalKTable). ΠŸΡ€ΠΈ этом Kafka Streams Π³Π°Ρ€Π°Π½Ρ‚ΠΈΡ€ΡƒΠ΅Ρ‚, Ρ‡Ρ‚ΠΎ Π² случаС падСния инстанса, Π΄Π°Π½Π½Ρ‹Π΅ Π½Π΅ Π±ΡƒΠ΄ΡƒΡ‚ потСряны, Ρ‚Π°ΠΊ ΠΊΠ°ΠΊ ΠΎΠ½ΠΈ ΠΏΠΎΠΌΠΈΠΌΠΎ локального state store, хранятся Π² change log Ρ‚ΠΎΠΏΠΈΠΊΠ΅ Kafka -для сокращСния потрСблСния рСсурсов ΠΏΠΎ памяти этот Ρ‚ΠΎΠΏΠΈΠΊ - compacted.

Для Ρ‚ΠΎΠ³ΠΎ, Ρ‡Ρ‚ΠΎΠ±Ρ‹ ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠ΅ ΠΈΠΌΠ΅Π»ΠΎ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡ‚ΡŒ ΠΏΠΎΠ»ΡƒΡ‡Π°Ρ‚ΡŒ Π΄Π°Π½Π½Ρ‹Π΅ ΠΈΠ· state store, Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ Π΅Π³ΠΎ ΠΌΠ°Ρ‚Π΅Ρ€ΠΈΠ°Π»ΠΈΠ·ΠΎΠ²Π°Ρ‚ΡŒ, ΡƒΠΊΠ°Π·Π°Π² Π½Π°Π·Π²Π°Π½ΠΈΠ΅, ΠΏΠΎ ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌΡƒ Π² дальнСйшСм ΠΊ Π½Π΅ΠΌΡƒ ΠΌΠΎΠΆΠ½ΠΎ ΠΎΠ±Ρ€Π°Ρ‰Π°Ρ‚ΡΡŒΡ.

@Component class OrdersServiceTopology( private val orderProps: OrdersProps, private val mapper: OrderMapper, private val schemas: Schemas, private val outstandingRequests: MutableMap<String, FilteredResponse<String, Order, OrderDto>> ) { @Autowired fun ordersTableTopology(builder: StreamsBuilder) { log.info("Calling OrdersServiceTopology.ordersTableTopology()") builder.table( orderProps.topic, Consumed.with(schemas.ORDERS.keySerde, schemas.ORDERS.valueSerde), Materialized.`as`(orderProps.storeName) ).toStream().foreach(::maybeCompleteLongPollGet) } private fun maybeCompleteLongPollGet(id: String, order: Order) { val callback = outstandingRequests[id] if (callback?.asyncResponse?.isSetOrExpired == true) { outstandingRequests.remove(id) } else if (callback != null && callback.predicate(id, order)) { callback.asyncResponse.setResult(mapper.toDto(order)) outstandingRequests.remove(id) } } }

ΠœΠ΅Ρ‚ΠΎΠ΄ OrdersServiceTopology.maybeCompleteLongPollGet(String, Order) ΠΏΠΎΠ»ΡƒΡ‡Π°Π΅Ρ‚ приходящиС Π² KTable Π΄Π°Π½Π½Ρ‹Π΅ ΠΎ Π·Π°ΠΊΠ°Π·Π°Ρ…, ΠΈ Ссли Π½ΠΎΠ²Ρ‹ΠΉ Π·Π°ΠΊΠ°Π· хранится Π² ConcurrentHashMap, ΠΈ ΠΎΠ½ удовлСтворяСт ΡƒΡΠ»ΠΎΠ²ΠΈΡŽ FilteredResponse.predicate, Ρ‚ΠΎ Ρ‚ΠΎΠ³Π΄Π° ΠΌΡ‹ ΠΊΠΎΠΌΠΏΠ»ΠΈΡ‚ΠΈΠΌ DeferredResult ΠΈ удаляСм Π΅Π³ΠΎ ΠΈΠ· ConcurrentHashMap.

Класс org.springframework.web.context.request.async.DeferredResult позволяСт асинхронно ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ REST запрос. Π’ Orders Service ΠΎΠ½ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ΡΡ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠΌ ΠΎΠ±Ρ€Π°Π·ΠΎΠΌ: Π² качСствС Π²ΠΎΠ·Π²Ρ€Π°Ρ‰Π°Π΅ΠΌΠΎΠ³ΠΎ значСния Π² ΠΊΠΎΠ½Ρ‚Ρ€ΠΎΠ»Π»Π΅Ρ€Π΅ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ΡΡ DeferredResult. ΠŸΠΎΡ‚ΠΎΠΊ, ΠΏΡ€ΠΈΠ½ΡΠ²ΡˆΠΈΠΉ запрос ΠΌΠΎΠΆΠ½ΠΎ ΠΎΡΠ²ΠΎΠ±ΠΎΠ΄ΠΈΡ‚ΡŒ, Π° ΠΏΡ€ΠΎΡΡ‚Π°Π²ΠΈΡ‚ΡŒ Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ Π² DeferredResult ΠΌΠΎΠΆΠ½ΠΎ ΠΈΠ· Π΄Ρ€ΡƒΠ³ΠΎΠ³ΠΎ ΠΏΠΎΡ‚ΠΎΠΊΠ°. ΠŸΡ€ΠΈ этом ΠΌΠΎΠΆΠ½ΠΎ ΡƒΠΊΠ°Π·Π°Ρ‚ΡŒ Ρ‚Π°ΠΉΠΌΠ°ΡƒΡ‚, послС ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ³ΠΎ DeferredResult Π²Π΅Ρ€Π½Π΅Ρ‚ ΠΎΡˆΠΈΠ±ΠΊΡƒ. ΠŸΡ€ΠΈ ΠΏΠΎΠ»ΡƒΡ‡Π΅Π½ΠΈΠΈ Π·Π°ΠΊΠ°Π·Π° - getOrder(id) - Ссли Π½ΠΈ Π² локальном state store, Π½ΠΈ Π² state store Π½Π° Π΄Ρ€ΡƒΠ³ΠΈΡ… инстансах Π½Π΅Ρ‚ Π·Π°ΠΊΠ°Π·Π° с Π½ΡƒΠΆΠ½Ρ‹ΠΌ ID, ΠΌΡ‹ ΠΏΠΎΠΌΠ΅Ρ‰Π°Π΅ΠΌ DeferredResult Π² ConcurrentHashMap.

ΠŸΡ€ΠΈΠΌΠ΅Ρ€ ΠΌΠ΅Ρ‚ΠΎΠ΄Π° Π² ΠΊΠΎΠ½Ρ‚Ρ€ΠΎΠ»Π»Π΅Ρ€Π΅:

@GetMapping("/{id}") fun getOrder( @PathVariable("id") id: String, @RequestParam("timeout", defaultValue = "2000") timeout: Long ): DeferredResult<OrderDto> { log.info("Received request to GET order with id = {}", id) val deferredResult = DeferredResult<OrderDto>(timeout) ordersService.getOrderDto(id, deferredResult, timeout) return deferredResult }

ΠŸΡ€ΠΈΠΌΠ΅Ρ€ Ρ€Π°Π±ΠΎΡ‚Ρ‹ с DeferredResult:

 fun getOrderDto(id: String, asyncResponse: DeferredResult<OrderDto>, timeout: Long) { initOrderStore() CompletableFuture.runAsync { val hostForKey = getKeyLocationOrBlock(id, asyncResponse) ?: return@runAsync //request timed out so return if (thisHost(hostForKey)) { fetchLocal(id, asyncResponse) { _, _ -> true } } else { val path = Paths(hostForKey.host, hostForKey.port).urlGet(id) fetchFromOtherHost(path, asyncResponse, timeout) } } } private fun fetchLocal( id: String, asyncResponse: DeferredResult<OrderDto>, predicate: (String, Order) -> Boolean ) { log.info("running GET on this node") try { val order = orderStore.get(id) if (order == null || !predicate(id, order)) { log.info("Delaying get as order not present for id $id") outstandingRequests[id] = FilteredResponse( asyncResponse = asyncResponse, predicate = predicate ) } else { asyncResponse.setResult(mapper.toDto(order)) } } catch (e: InvalidStateStoreException) { log.error("Exception while querying local state store. {}", e.message) outstandingRequests[id] = FilteredResponse(asyncResponse, predicate) } } private fun fetchFromOtherHost( path: String, asyncResponse: DeferredResult<OrderDto>, timeout: Long ) { log.info("Chaining GET to a different instance: {}", path) try { val order = webClientBuilder.build() .get() .uri("$path?timeout=$timeout") .retrieve() .bodyToMono<OrderDto>() .block() ?: throw NotFoundException("FetchFromOtherHost returned null for request: $path") asyncResponse.setResult(order) } catch (swallowed: Exception) { log.warn("FetchFromOtherHost failed.", swallowed) } }

Для Ρ‚ΠΎΠ³ΠΎ, Ρ‡Ρ‚ΠΎΠ±Ρ‹ ΠΎΠ±Ρ€Π°Ρ‚ΠΈΡ‚ΡŒΡΡ ΠΊ state store, Π½Π°ΠΌ Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ Π·Π°ΠΏΡ€ΠΎΡΠΈΡ‚ΡŒ Π΅Π³ΠΎ ΠΈΠ· KafkaStreams. Π’ΡƒΡ‚ стоит ΠΎΠ±Ρ€Π°Ρ‚ΠΈΡ‚ΡŒ Π²Π½ΠΈΠΌΠ°Π½ΠΈΠ΅ Π½Π° Ρ‚ΠΎ, Ρ‡Ρ‚ΠΎ Π½Π°ΠΌ прСдоставляСтся Ρ‚ΠΎΠ»ΡŒΠΊΠΎ read-only Ρ…Ρ€Π°Π½ΠΈΠ»ΠΈΡ‰Π΅.

private lateinit var orderStore: ReadOnlyKeyValueStore<String, Order> fun initOrderStore() { if (!::orderStore.isInitialized) { val streams = factoryBean.kafkaStreams ?: throw RuntimeException("Cannot obtain KafkaStreams instance.") orderStore = streams.store( StoreQueryParameters.fromNameAndType( ordersProps.storeName, QueryableStoreTypes.keyValueStore() ) ) } }

Для получСния ΠΈΠ½Ρ„ΠΎΡ€ΠΌΠ°Ρ†ΠΈΠΈ ΠΎ Ρ‚ΠΎΠΌ, Π½Π° ΠΊΠ°ΠΊΠΎΠΌ инстансС хранятся Π΄Π°Π½Π½Ρ‹Π΅ ΠΎ Π·Π°ΠΊΠ°Π·Π΅ с ΠΊΠΎΠ½ΠΊΡ€Π΅Ρ‚Π½Ρ‹ΠΌ ID, Kafka Streams прСдоставляСт API:

 /**  * Find the metadata for the instance of this Kafka Streams Application that has the given  * store and would have the given key if it exists.  * @param store Store to find  * @param key The key to find  * @return {@link HostStoreInfo}  */ fun <K> streamsMetadataForStoreAndKey( store: String, key: K, serializer: Serializer<K> ): HostStoreInfo { // Get metadata for the instances of this Kafka Streams application hosting the store and // potentially the value for key val metadata = factoryBean.kafkaStreams?.queryMetadataForKey(store, key, serializer) ?: throw NotFoundException("Metadata for store $store not found!") return HostStoreInfo( host = metadata.activeHost().host(), port = metadata.activeHost().port(), storeNames = hashSetOf(store) ) }

ΠŸΡ€ΠΈ создании Π·Π°ΠΊΠ°Π·Π°, Orders Service отправляСт сообщСниС Π² Ρ‚ΠΎΠΏΠΈΠΊ Kafka orders.v1 с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ KafkaTemplate<String, Order>. Настройки KafkaProducer Π² Π΄Π°Π½Π½ΠΎΠΌ случаС ΡƒΠΊΠ°Π·Ρ‹Π²Π°ΡŽΡ‚ΡΡ Π² application.yml Ρ„Π°ΠΉΠ»Π΅:

spring: application: name: order-service kafka: bootstrap-servers: ${kafkaprops.bootstrapServers} producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer bootstrap-servers: ${kafkaprops.bootstrapServers} acks: ${kafkaprops.acks} client-id: ${kafkaprops.clientId} properties: enable.idempotence: ${kafkaprops.enableIdempotence} schema.registry.url: ${kafkaprops.schemaRegistryUrl} auto.register.schemas: false use.latest.version: true

ОсобоС Π²Π½ΠΈΠΌΠ°Π½ΠΈΠ΅ стоит ΠΎΠ±Ρ€Π°Ρ‚ΠΈΡ‚ΡŒ Π½Π° настройки auto.register.schema: false ΠΈ use.latest.version: true. По ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ ΠΎΠ½ΠΈ выставлСны Π½Π°ΠΎΠ±ΠΎΡ€ΠΎΡ‚. И Ссли ΠΎΡΡ‚Π°Π²ΠΈΡ‚ΡŒ ΠΈΡ… Π±Π΅Π· измСнСния, Ρ‚ΠΎ ΠΌΠΎΠ³ΡƒΡ‚ Π²ΠΎΠ·Π½ΠΈΠΊΠ½ΡƒΡ‚ΡŒ ошибки ΠΏΡ€ΠΈ сСриализации / дСсСриализации Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ·-Π·Π° Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΠΉ нСвосмСстимости схСм AVRO. Confluent Ρ€Π΅ΠΊΠΎΠΌΠ΅Π½Π΄ΡƒΠ΅Ρ‚ Π² ΠΏΡ€ΠΎΠ΄Π΅ Π½Π΅ ΠΏΡ€Π΅Π΄ΠΎΡΡ‚Π°Π²Π»ΡΡ‚ΡŒ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡ‚ΡŒ ΠΏΡ€ΠΎΠ΄ΡŠΡŽΡΠ΅Ρ€Π°ΠΌ автоматичски Ρ€Π΅Π³ΠΈΡΡ‚Ρ€ΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ схСму.

БСрвис ΠΎΡ‚Π²Π΅Ρ‡Π°Π΅Ρ‚ Π·Π° ΠΏΡ€ΠΎΠ²Π΅Ρ€ΠΊΡƒ Π΄Π΅Ρ‚Π°Π»Π΅ΠΉ Π·Π°ΠΊΠ°Π·Π°:

  • количСство Ρ‚ΠΎΠ²Π°Ρ€ΠΎΠ² Π½Π΅ мСньшС 0
  • ΡΡ‚ΠΎΠΈΠΌΠΎΡΡ‚ΡŒ Π·Π°ΠΊΠ°Π·Π° Π½Π΅ мСньшС 0
  • Ρ‚ΠΎΠ²Π°Ρ€ присутствуСт Π² Π·Π°ΠΊΠ°Π·Π΅

БообщСния Π²Ρ‹Ρ‡ΠΈΡ‚Ρ‹Π²Π°ΡŽΡ‚ΡΡ ΠΈΠ· Ρ‚ΠΎΠΏΠΈΠΊΠ° Kafka orders.v1, Π·Π°ΠΊΠ°Π· провСряСтся, Π° Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ ΠΏΡ€ΠΎΠ²Π΅Ρ€ΠΊΠΈ отправляСтся Π² Ρ‚ΠΎΠΏΠΈΠΊ Kafka order-validations.v1. Π’ этом сСрвисС Π½Π΅ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ΡΡ Kafka Streams, Π° ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡŽΡ‚ΡΡ стандартныС Kafka Producer / Consumer.

ΠšΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΡ Kafka Consumer:

 @Bean fun kafkaListenerContainerFactory( consumerFactory: ConsumerFactory<String, Order> ): ConcurrentKafkaListenerContainerFactory<String, Order> { val factory = ConcurrentKafkaListenerContainerFactory<String, Order>() factory.consumerFactory = consumerFactory return factory } @Bean fun consumerFactory(): ConsumerFactory<String, Order> { return DefaultKafkaConsumerFactory(consumerProps()) } private fun consumerProps(): Map<String, Any> { val props = hashMapOf<String, Any>( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProps.bootstrapServers, ConsumerConfig.GROUP_ID_CONFIG to kafkaProps.appId, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to kafkaProps.autoOffsetReset, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to !kafkaProps.enableExactlyOnce, ConsumerConfig.CLIENT_ID_CONFIG to kafkaProps.appId, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to schemas.ORDERS.keySerde.deserializer()::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to schemas.ORDERS.valueSerde.deserializer()::class.java, AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to kafkaProps.schemaRegistryUrl, "specific.avro.reader" to true ) return props }

Π—Π΄Π΅ΡΡŒ Π²Π°ΠΆΠ½ΠΎ ΠΎΠ±Ρ€Π°Ρ‚ΠΈΡ‚ΡŒ Π²Π½ΠΈΠΌΠ°Π½ΠΈΠ΅ Π½Π° ΠΊΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΡŽ specific.avro.reader: true. Она сообщаСт ΠΊΠΎΠ½ΡΡŠΡŽΠΌΠ΅Ρ€Ρƒ, Ρ‡Ρ‚ΠΎ ΠΎΠ½ Π²Ρ‹Ρ‡ΠΈΡ‚Ρ‹Π²Π°Π΅Ρ‚ Π½Π΅ ΠΎΠ±ΠΎΠ±Ρ‰Π΅Π½Π½Ρ‹ΠΉ AVRO Ρ‚ΠΈΠΏ записи (GenericAvroRecord), Π° ΠΊΠΎΠ½ΠΊΡ€Π΅Ρ‚Π½Ρ‹ΠΉ Ρ‚ΠΈΠΏ SpecificAvroRecord, ΠΈ ΠΈΠΌΠ΅Π΅Ρ‚ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡ‚ΡŒ ΠΏΠΎΠ»ΡƒΡ‡Π°Ρ‚ΡŒ доступ ΠΊ полям ΠΈ ΠΌΠ΅Ρ‚ΠΎΠ΄Π°ΠΌ Π±Π΅Π· нСобходимости ΠΊΠ°ΡΡ‚ΠΈΡ‚ΡŒ ΠΊ ΠΊΠΎΠ½ΠΊΡ€Π΅Ρ‚Π½ΠΎΠΌΡƒ Ρ‚ΠΈΠΏΡƒ.

ΠšΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΡ Kafka Producer:

 @Bean fun producerFactory(): ProducerFactory<String, OrderValidation> { return DefaultKafkaProducerFactory(senderProps()) } private fun senderProps(): Map<String, Any> { val props = hashMapOf<String, Any>( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProps.bootstrapServers, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to kafkaProps.enableIdempotence, ProducerConfig.RETRIES_CONFIG to Int.MAX_VALUE.toString(), ProducerConfig.ACKS_CONFIG to kafkaProps.acks, ProducerConfig.CLIENT_ID_CONFIG to kafkaProps.appId, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to schemas.ORDER_VALIDATIONS.valueSerde.serializer().javaClass.name, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to schemas.ORDER_VALIDATIONS.keySerde.serializer().javaClass.name, AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to kafkaProps.schemaRegistryUrl, AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS to false, AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION to true, ) if (kafkaProps.enableExactlyOnce) { // attempt to provide a unique transactional ID which is a recommended practice props[ProducerConfig.TRANSACTIONAL_ID_CONFIG] = "${kafkaProps.appId}-${applicationProps.port}" } return props } @Bean fun kafkaTemplate( producerFactory: ProducerFactory<String, OrderValidation> ): KafkaTemplate<String, OrderValidation> { return KafkaTemplate(producerFactory) }

Π’ΡƒΡ‚ стоит ΠΎΠ±Ρ€Π°Ρ‚ΠΈΡ‚ΡŒ Π²Π½ΠΈΠΌΠ°Π½ΠΈΠ΅ Π½Π° Π΄Π²Π΅ Π²Π΅Ρ‰ΠΈ. ΠŸΠ΅Ρ€Π²Π°Ρ, ΠΊΠ°ΠΊ ΡƒΠΆΠ΅ Π±Ρ‹Π»ΠΎ сказано Π²Ρ‹ΡˆΠ΅ - ΠΌΡ‹ Π·Π°ΠΏΡ€Π΅Ρ‰Π°Π΅ΠΌ ΠΏΡ€ΠΎΠ΄ΡŠΡŽΡΠ΅Ρ€Ρƒ автоматичСски Ρ€Π΅Π³ΠΈΡΡ‚Ρ€ΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ схСмы Π² Schema Registry, Π° Ρ‚Π°ΠΊΠΆΠ΅ ΡƒΠΊΠ°Π·Ρ‹Π²Π°Π΅ΠΌ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒ послСднюю Π²Π΅Ρ€ΡΠΈΡŽ схСмы ΠΏΡ€ΠΈ сСриализации Π΄Π°Π½Π½Ρ‹Ρ…. Π’Ρ‚ΠΎΡ€ΠΎΠΉ ΠΌΠΎΠΌΠ΅Π½Ρ‚ касаСтся настройки ProducerConfig.TRANSACTIONAL_ID_CONFIG, эта настройка Π΄ΠΎΠ»ΠΆΠ½Π° Π±Ρ‹Ρ‚ΡŒ ΡƒΠ½ΠΈΠΊΠ°Π»ΡŒΠ½ΠΎΠΉ для ΠΊΠ°ΠΆΠ΄ΠΎΠ³ΠΎ инстанса прилоТСния, Ρ‡Ρ‚ΠΎΠ±Ρ‹ Kafka ΠΌΠΎΠ³Π»Π° Π³Π°Ρ€Π°Π½Ρ‚ΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ ΠΊΠΎΡ€Ρ€Π΅ΠΊΡ‚Π½ΡƒΡŽ Ρ€Π°Π±ΠΎΡ‚Ρƒ с транзакциями Π² случаС падСния прилоТСния. ΠŸΡ€ΠΈ этом Π²Π°ΠΆΠ½ΠΎ ΠΏΠΎΠ½ΠΈΠΌΠ°Ρ‚ΡŒ, Ρ‡Ρ‚ΠΎ ΡƒΠ½ΠΈΠΊΠ°Π»ΡŒΠ½Ρ‹ΠΉ - Π½Π΅ Π·Π½Π°Ρ‡ΠΈΡ‚ Ρ€Π°Π½Π΄ΠΎΠΌΠ½Ρ‹ΠΉ. Π’ΠΎ Π΅ΡΡ‚ΡŒ, Π·Π° инстансом Π΄ΠΎΠ»ΠΆΠ΅Π½ Π±Ρ‹Ρ‚ΡŒ Π·Π°ΠΊΡ€Π΅ΠΏΠ»Π΅Π½ свой TRANSACTIONAL_ID. Π’ Π΄Π°Π½Π½ΠΎΠΌ случаС для простоты Ρ€Π΅Π°Π»ΠΈΠ·Π°Ρ†ΠΈΠΈ я ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡŽ суффикс Π² Π²ΠΈΠ΄Π΅ ΠΏΠΎΡ€Ρ‚Π°, Π½Π° ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌ поднято ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠ΅. Однако, эта лишь Π΄Π΅ΠΌΠΎ рСализация.

email-service-join.png

БСрвис Π²Ρ‹Ρ‡ΠΈΡ‚Ρ‹Π²Π°Π΅Ρ‚ Π΄Π°Π½Π½Ρ‹Π΅ ΠΈΠ· Π½Π΅ΡΠΊΠΎΠ»ΡŒΠΊΠΈΡ… Ρ‚ΠΎΠΏΠΈΠΊΠΎΠ² Kafka: customers, payments.v1, orders.v1, ΠΎΠ±ΡŠΠ΅Π΄ΠΈΠ½ΡΠ΅Ρ‚ ΠΈΡ… с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ Kafka Streams, ΠΈ отправляСт сообщСниС Ρ‡Π΅Ρ€Π΅Π· интСрфСйс Emailer. ΠšΡ€ΠΎΠΌΠ΅ Ρ‚ΠΎΠ³ΠΎ, Π΄Π°Π½Π½Ρ‹Π΅ ΠΎ Π·Π°ΠΊΠ°Π·Π΅ ΠΈ ΠΏΠΎΠΊΡƒΠΏΠ°Ρ‚Π΅Π»Π΅ ΠΎΡ‚ΠΏΡ€Π°Π²Π»ΡΡŽΡ‚ΡΡ Π² Ρ‚ΠΎΠΏΠΈΠΊ Kafka, ΡΠΎΠ²ΠΏΠ°Π΄Π°ΡŽΡ‰ΠΈΠΉ с Ρ€Π΅ΠΉΡ‚ΠΈΠ½Π³ΠΎΠΌ покупатСля: platinum, gold, silver, bronze.

microservices-exercise-3.png Настройки Kafka Streams Ρ‚ΡƒΡ‚ Π°Π½Π°Π»ΠΎΠ³ΠΈΡ‡Π½Ρ‹ Orders Service.

Класс EmailService ΠΎΡ‚Π²Π΅Ρ‡Π°Π΅Ρ‚ Π·Π° созданиС ΠΈ ΠΎΠ±Ρ€Π°Π±ΠΎΡ‚ΠΊΡƒ KStream ΠΈ KTable. Для Ρ‚ΠΎΠ³ΠΎ, Ρ‡Ρ‚ΠΎΠ±Ρ‹ послС старта прилоТСния создалась ΠΈ Π·Π°Ρ€Π°Π±ΠΎΡ‚Π°Π»Π° топология стримов Kafka Streams, Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ Π΅Π΅ Π½Π°ΡΡ‚Ρ€ΠΎΠΈΡ‚ΡŒ с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ StreamsBuilder. Для этого достаточно Π² класс, ΠΏΠΎΠΌΠ΅Ρ‡Π΅Π½Π½Ρ‹ΠΉ Π°Π½Π½ΠΎΡ‚Π°Ρ†ΠΈΠ΅ΠΉ @Component ΠΈΠ»ΠΈ @Service Π² ΠΎΠ΄ΠΈΠ½ ΠΈΠ· ΠΌΠ΅Ρ‚ΠΎΠ΄ΠΎΠ² Π·Π°ΠΈΠ½ΠΆΠ΅ΠΊΡ‚ΠΈΡ‚ΡŒ StreamsBuilder ΠΈ Π² этом ΠΌΠ΅Ρ‚ΠΎΠ΄Π΅ Π²Ρ‹ΡΡ‚Ρ€ΠΎΠΈΡ‚ΡŒ Π½ΡƒΠΆΠ½ΡƒΡŽ Ρ‚ΠΎΠΏΠΎΠ»ΠΎΠ³ΠΈΡŽ.

@Service class EmailService( private val emailer: Emailer, private val schemas: Schemas ) { @Autowired fun processStreams(builder: StreamsBuilder) { val orders: KStream<String, Order> = builder.stream( schemas.ORDERS.name, Consumed.with(schemas.ORDERS.keySerde, schemas.ORDERS.valueSerde) ) //Create the streams/tables for the join val payments: KStream<String, Payment> = builder.stream( schemas.PAYMENTS.name, Consumed.with(schemas.PAYMENTS.keySerde, schemas.PAYMENTS.valueSerde) ) //Rekey payments to be by OrderId for the windowed join .selectKey { s, payment -> payment.orderId } val customers: GlobalKTable<Long, Customer> = builder.globalTable( schemas.CUSTOMERS.name, Consumed.with(schemas.CUSTOMERS.keySerde, schemas.CUSTOMERS.valueSerde) ) val serdes: StreamJoined<String, Order, Payment> = StreamJoined .with(schemas.ORDERS.keySerde, schemas.ORDERS.valueSerde, schemas.PAYMENTS.valueSerde) //Join the two streams and the table then send an email for each orders.join( payments, ::EmailTuple, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(1)), serdes ) //Next join to the GKTable of Customers .join( customers, { _, value -> value.order.customerId }, // note how, because we use a GKtable, we can join on any attribute of the Customer. EmailTuple::setCustomer ) //Now for each tuple send an email. .peek { _, emailTuple -> emailer.sendEmail(emailTuple) } //Send the order to a topic whose name is the value of customer level orders.join( customers, { orderId, order -> order.customerId }, { order, customer -> OrderEnriched(order.id, order.customerId, customer.level) } ) //TopicNameExtractor to get the topic name (i.e., customerLevel) from the enriched order record being sent .to( TopicNameExtractor { orderId, orderEnriched, record -> orderEnriched.customerLevel }, Produced.with(schemas.ORDERS_ENRICHED.keySerde, schemas.ORDERS_ENRICHED.valueSerde) ) } }

Π’ Π΄Π°Π½Π½ΠΎΠΌ ΠΏΡ€ΠΈΠΌΠ΅Ρ€Π΅ слСдуСт ΠΎΠ±Ρ€Π°Ρ‚ΠΈΡ‚ΡŒ Π²Π½ΠΈΠΌΠ°Π½ΠΈΠ΅ Π½Π° нСсколько ΠΌΠΎΠΌΠ΅Π½Ρ‚ΠΎΠ².

  1. KStream<String, Payment> создаСтся Π½Π° основС Ρ‚ΠΎΠΏΠΈΠΊΠ° payments.v1, Π² ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌ ΠΊΠ»ΡŽΡ‡ΠΎΠΌ являСтся ID ΠΏΠ»Π°Ρ‚Π΅ΠΆΠ°, ΠΏΡ€ΠΈ этом Π² ΠΈΠ½Ρ„ΠΎΡ€ΠΌΠ°Ρ†ΠΈΠΈ ΠΎ ΠΏΠ»Π°Ρ‚Π΅ΠΆΠ΅ присутствуСт ΠΏΠΎΠ»Π΅ orderId, ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ΅ фактичСски ссылаСтся Π½Π° ΡΡƒΡ‰Π½ΠΎΡΡ‚ΡŒ Order. Kafka Streams ΠΏΠΎΠ΄Π΄Π΅Ρ€ΠΆΠΈΠ²Π°Π΅Ρ‚ объСдинСниС KStream ΠΏΠΎ ΠΊΠ»ΡŽΡ‡Ρƒ Ρ‚ΠΎΠΏΠΈΠΊΠ°, ΠΈΠ· ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ³ΠΎ Π²Ρ‹Ρ‡ΠΈΡ‚Ρ‹Π²Π°ΡŽΡ‚ΡΡ сообщСния. Π­Ρ‚ΠΎ называСтся equi join. Но Ρ‚Π°ΠΊ ΠΊΠ°ΠΊ Π² исходных Ρ‚ΠΎΠΏΠΈΠΊΠ°Ρ… Ρ€Π°Π·Π½Ρ‹Π΅ ΠΊΠ»ΡŽΡ‡ΠΈ, приходится провСсти re-keying - Π²Ρ‹Π±ΠΎΡ€ Π½ΠΎΠ²ΠΎΠ³ΠΎ ΠΊΠ»ΡŽΡ‡Π° Π² "ΠΏΡ€Π°Π²ΠΎΠΌ" стримС с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ ΠΌΠ΅Ρ‚ΠΎΠ΄Π° KStream.selectKey(), Π² нашСм случаС Π² KStream<String, Payment>. Под ΠΊΠ°ΠΏΠΎΡ‚ΠΎΠΌ Kafka Streams создаст Π΅Ρ‰Π΅ ΠΎΠ΄ΠΈΠ½ Ρ‚ΠΎΠΏΠΈΠΊ с Ρ‚Π°ΠΊΠΈΠΌ ΠΆΠ΅ количСством ΠΏΠ°Ρ€Ρ‚ΠΈΡ†ΠΈΠΉ, ΠΊΠ°ΠΊ ΠΈ Π² исходном Ρ‚ΠΎΠΏΠΈΠΊΠ΅ ΠΈ Π±ΡƒΠ΄Π΅Ρ‚ ΠΎΡ‚ΠΏΡ€Π°Π²Π»ΡΡ‚ΡŒ Π΄Π°Π½Π½Ρ‹Π΅ ΠΈΠ· ΠΏΠ΅Ρ€Π²ΠΎΠ½Π°Ρ‡Π°Π»ΡŒΠ½ΠΎΠ³ΠΎ Ρ‚ΠΎΠΏΠΈΠΊΠ° (payments.v1) Π² Π½ΠΎΠ²Ρ‹ΠΉ Ρ‚ΠΎΠΏΠΈΠΊ. Π’Π΅ΠΏΠ΅Ρ€ΡŒ этот KStream, основанный Π½Π° Π²Π½ΡƒΡ‚Ρ€Π΅Π½Π½Π΅ΠΌ Ρ‚ΠΎΠΏΠΈΠΊΠ΅ ΠΌΠΎΠΆΠ½ΠΎ спокойно ΠΎΠ±ΡŠΠ΅Π΄ΠΈΠ½ΡΡ‚ΡŒ со стримом KStream<String, Order>, Ρ‚Π°ΠΊ ΠΊΠ°ΠΊ Ρƒ Π½ΠΈΡ… ΠΎΠ΄ΠΈΠ½Π°ΠΊΠΎΠ²Ρ‹Π΅ ΠΊΠ»ΡŽΡ‡ΠΈ записСй.

NB! Π’ΠΠ–ΠΠž Π’ΡƒΡ‚ Π΅ΡΡ‚ΡŒ ΠΎΠ΄ΠΈΠ½ ΠΎΡ‡Π΅Π½ΡŒ Π²Π°ΠΆΠ½Ρ‹ΠΉ ньюанс: для Ρ‚ΠΎΠ³ΠΎ, Ρ‡Ρ‚ΠΎΠ±Ρ‹ Kafka Streams ΠΌΠΎΠ³Π»Π° провСсти объСдинСниС Π΄Π°Π½Π½Ρ‹Ρ… исходныС Ρ‚ΠΎΠΏΠΈΠΊΠΈ Π΄ΠΎΠ»ΠΆΠ½Ρ‹ Π±Ρ‹Ρ‚ΡŒ ΠΊΠΎ-ΠΏΠ°Ρ€Ρ‚ΠΈΡ†ΠΈΠΎΠ½ΠΈΡ€ΠΎΠ²Π°Π½Ρ‹:

ΠŸΠΎΠ΄Ρ€ΠΎΠ±Π½Π΅Π΅ ΠΎΠ± этом Π² ΡΡ‚Π°Ρ‚ΡŒΠ΅ Co-Partitioning with Apache Kafka.

  1. Π˜Π½Ρ„ΠΎΡ€ΠΌΠ°Ρ†ΠΈΡ ΠΎ покупатСлях собираСтся Π² GlobalKTable<Long, Customer>. Π­Ρ‚ΠΎ ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚, Ρ‡Ρ‚ΠΎ ΠΊΠ°ΠΆΠ΄Ρ‹ΠΉ инстанс прилоТСния Email Service Π±ΡƒΠ΄Π΅Ρ‚ ΠΈΠΌΠ΅Ρ‚ΡŒ Π² своСм распоряТСнии Π΄Π°Π½Π½Ρ‹Π΅ ΠΎΠ±ΠΎ всСх покупатСлях, Π² ΠΎΡ‚Π»ΠΈΡ‡ΠΈΠ΅ ΠΎΡ‚ ΠΎΠ±Ρ‹Ρ‡Π½ΠΎΠΉ KTable, которая Ρ…Ρ€Π°Π½ΠΈΡ‚ Π½Π° инстансС Ρ‚ΠΎΠ»ΡŒΠΊΠΎ Π΄Π°Π½Π½Ρ‹Π΅ ΠΈΠ· Π²Ρ‹Ρ‡ΠΈΡ‚Ρ‹Π²Π°Π΅ΠΌΡ‹Ρ… этим инстансом ΠΏΠ°Ρ€Ρ‚ΠΈΡ†ΠΈΠΉ. Π’Π°ΠΊΠΈΠΌ ΠΎΠ±Ρ€Π°Π·ΠΎΠΌ ΠΌΡ‹ ΠΌΠΎΠΆΠ΅ΠΌ ΠΎΠ±ΡŠΠ΅Π΄ΠΈΠ½ΡΡ‚ΡŒ KStream с GlobalKTable ΠΏΠΎ Π»ΡŽΠ±ΠΎΠΌΡƒ полю, плюс Π½Π΅Ρ‚ нСобходимости ΡΠΎΠ±Π»ΡŽΠ΄Π°Ρ‚ΡŒ ΠΏΡ€Π°Π²ΠΈΠ»ΠΎ ΠΎ ΠΊΠΎ-ΠΏΠ°Ρ€Ρ‚ΠΈΡ†ΠΈΠΎΠ½ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠΈ Ρ‚ΠΎΠΏΠΈΠΊΠΎΠ². Однако Ρ‚Π΅ΠΏΠ΅Ρ€ΡŒ наш инстанс ΠΌΠΎΠΆΠ΅Ρ‚ Π±Ρ‹Ρ‚ΡŒ ΠΏΠ΅Ρ€Π΅Π³Ρ€ΡƒΠΆΠ΅Π½ ΠΊΠ°ΠΊ ΠΏΠΎ памяти, Ρ‚Π°ΠΊ ΠΈ ΠΏΠΎ сСтСвому Ρ‚Ρ€Π°Ρ„ΠΈΠΊΡƒ, Ссли Ρ‚ΠΎΠΏΠΈΠΊ customers Π±ΡƒΠ΄Π΅Ρ‚ Ρ…Ρ€Π°Π½ΠΈΡ‚ΡŒ ΠΎΡ‡Π΅Π½ΡŒ ΠΌΠ½ΠΎΠ³ΠΎ Π΄Π°Π½Π½Ρ‹Ρ… ΠΈ постоянно ΠΏΠΎΠΏΠΎΠ»Π½ΡΡ‚ΡŒΡΡ. Π’Π°ΠΊΠΆΠ΅ стоит ΠΎΡ‚ΠΌΠ΅Ρ‚ΠΈΡ‚ΡŒ, Ρ‡Ρ‚ΠΎ GlobalKTable инициализируСтся Π½Π° стартС прилоТСния, Ρ‚ΠΎ Π΅ΡΡ‚ΡŒ Π²Ρ‹Ρ‡ΠΈΡ‚Ρ‹Π²Π°ΡŽΡ‚ΡΡ всС Π΄Π°Π½Π½Ρ‹Π΅ ΠΈΠ· ΡΠΎΠΎΡ‚Π²Π΅Ρ‚ΡΡ‚Π²ΡƒΡŽΡ‰Π΅Π³ΠΎ Ρ‚ΠΎΠΏΠΈΠΊΠ°. Π”ΠΎ Ρ‚ΠΎΠ³ΠΎ, ΠΊΠ°ΠΊ ΠΏΡ€ΠΎΠΈΠ·ΠΎΠΉΠ΄Π΅Ρ‚ инициализация, объСдинСниС KStream с GlobalKTable Π½Π΅ начнСтся.

  2. ΠŸΡ€ΠΈ объСдинСнии Π΄Π²ΡƒΡ… стримов Π²Π°ΠΆΠ½ΠΎ ΠΏΠΎΠ½ΠΈΠΌΠ°Ρ‚ΡŒ, Ρ‡Ρ‚ΠΎ Π΄Π°Π½Π½Ρ‹Π΅ Π² Ρ‚ΠΎΠΏΠΈΠΊΠΈ, Π½Π° основС ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Ρ… ΡΠΎΠ·Π΄Π°ΡŽΡ‚ΡΡ стримы ΠΌΠΎΠ³ΡƒΡ‚ ΠΏΠΎΠΏΠ°Π΄Π°Ρ‚ΡŒ Π½Π΅ ΠΎΠ΄Π½ΠΎΠ²Ρ€Π΅ΠΌΠ΅Π½Π½ΠΎ. ΠŸΠΎΡΡ‚ΠΎΠΌΡƒ ΠΏΡ€ΠΈ объСдинСнии стримов ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΡŽΡ‚ΡΡ "ΠΎΠΊΠ½Π°" объСдинСния - JoinWindow. Π’ нашСм случаС ΠΏΡ€ΠΈ объСдинСнии стримов orders ΠΈ payments ΠΌΡ‹ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅ΠΌ ΠΎΠΊΠ½ΠΎ Π² 1 ΠΌΠΈΠ½ΡƒΡ‚Ρƒ, Π±Π΅Π· ΠΊΠ°ΠΊΠΎΠ³ΠΎ-Π»ΠΈΠ±ΠΎ грСйс ΠΏΠ΅Ρ€ΠΈΠΎΠ΄Π°. JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(1)) Π­Ρ‚ΠΎ ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚, Ρ‡Ρ‚ΠΎ записи, timestamp ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Ρ… Π² ΠΏΡ€Π΅Π΄Π΅Π»Π°Ρ… 1 ΠΌΠΈΠ½ΡƒΡ‚Ρ‹ Π΄Ρ€ΡƒΠ³ ΠΎΡ‚ Π΄Ρ€ΡƒΠ³Π°, ΠΌΠΎΠ³ΡƒΡ‚ Π±Ρ‹Ρ‚ΡŒ ΠΎΠ±ΡŠΠ΅Π΄Π΅Π½ΠΈΠ½Ρ‹, Ρ‚ΠΎ Π΅ΡΡ‚ΡŒ ΠΌΡ‹ ΠΏΠΎΠ΄Ρ€Π°Π·ΡƒΠΌΠ΅Π²Π°Π΅ΠΌ, Ρ‡Ρ‚ΠΎ созданиС Π·Π°ΠΊΠ°Π·Π° ΠΈ Π΅Π³ΠΎ ΠΎΠΏΠ»Π°Ρ‚Π° ΠΏΠΎ Π²Ρ€Π΅ΠΌΠ΅Π½ΠΈ Π½Π΅ Ρ€Π°Π·Π»ΠΈΡ‡Π°ΡŽΡ‚ΡΡ Π½Π° +-1 ΠΌΠΈΠ½ΡƒΡ‚Ρƒ. Записи, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Π½Π΅ ΠΏΠΎΠΏΠ°Π΄Π°ΡŽΡ‚ Π² это ΠΎΠΊΠ½ΠΎ, Π½Π΅ Π±ΡƒΠ΄ΡƒΡ‚ ΡƒΡ‡ΠΈΡ‚Ρ‹Π²Π°Ρ‚ΡŒΡΡ ΠΏΡ€ΠΈ объСдинСнии стримов. ИспользованиС "ΠΎΠΊΠΎΠ½" Ρ‚Π°ΠΊΠΆΠ΅ обусловлСно Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎΡΡ‚ΡŒΡŽ ΠΏΠΎΠ΄Π΄Π΅Ρ€ΠΆΠΈΠ²Π°Ρ‚ΡŒ Π±ΡƒΡ„Π΅Ρ€ для объСдинСния Π΄Π°Π½Π½Ρ‹Ρ…. Если Π±Ρ‹ Π½Π΅ Π±Ρ‹Π»ΠΎ "ΠΎΠΊΠΎΠ½" этот Π±ΡƒΡ„Π΅Ρ€ ΠΌΠΎΠ³ Π±Ρ‹ ΠΏΠΎΠ³Π»ΠΎΡ‚ΠΈΡ‚ΡŒ всю ΠΏΠ°ΠΌΡΡ‚ΡŒ. "Окна" хранят свои Π΄Π°Π½Π½Ρ‹Π΅ Π² windowing state store, старыС записи Π² ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌ ΡƒΠ΄Π°Π»ΡΡŽΡ‚ΡΡ послС настраиваСмого window retention period, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ ΠΏΠΎ ΡƒΠΌΠΎΠ»Ρ‡Π°Π½ΠΈΡŽ Ρ€Π°Π²Π΅Π½ 1 дню. Π˜Π·ΠΌΠ΅Π½ΠΈΡ‚ΡŒ это Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ ΠΌΠΎΠΆΠ½ΠΎ с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ: Materialized#withRetention().

 /**  * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference},  * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} before or after  * the timestamp of the record from the primary stream.  * <p>  * CAUTION: Using this method implicitly sets the grace period to zero, which means that any out-of-order  * records arriving after the window ends are considered late and will be dropped.  *  * @param timeDifference join window interval  * @return a new JoinWindows object with the window definition and no grace period. Note that this means out-of-order records arriving after the window end will be dropped  * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds}  */ public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration timeDifference) { return ofTimeDifferenceAndGrace(timeDifference, Duration.ofMillis(NO_GRACE_PERIOD)); }

Π‘ΠΎΠ»Π΅Π΅ ΠΏΠΎΠ΄Ρ€ΠΎΠ±Π½ΠΎ ΠΎ Ρ‚ΠΎΠΌ, ΠΊΠ°ΠΊΠΈΠ΅ Π±Ρ‹Π²Π°ΡŽΡ‚ Π²ΠΈΠ΄Ρ‹ объСдинСний ΠΌΠΎΠΆΠ½ΠΎ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Ρ‚ΡŒ Π² Π΄ΠΎΠΊΡƒΠΌΠ΅Π½Ρ‚Π°Ρ†ΠΈΠΈ.

microservices-fraud-service.png

БСрвис ΠΎΡ‚Π²Π΅Ρ‡Π°Π΅Ρ‚ Π·Π° поиск ΠΏΠΎΡ‚Π΅Π½Ρ†ΠΈΠ°Π»ΡŒΠ½ΠΎ ΠΌΠΎΡˆΠ΅Π½Π½ΠΈΡ‡Π΅ΡΠΊΠΈΡ… Ρ‚Ρ€Π°Π½Π·Π°ΠΊΡ†ΠΈΠΉ: Ссли ΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚Π΅Π»ΡŒ сдСлал Π·Π°ΠΊΠ°Π·Ρ‹ Π½Π° Π±ΠΎΠ»Π΅Π΅ Ρ‡Π΅ΠΌ 2 тыс Π² Ρ‚Π΅Ρ‡Π΅Π½ΠΈΠ΅ сСссии взаимодСйствия с сСрвСром (ΠΏΠΎΠ΄Ρ€ΠΎΠ±Π½Π΅Π΅ Π½ΠΈΠΆΠ΅ ΠΏΠΎ тСксту), Ρ‚ΠΎ всС Ρ‚Π°ΠΊΠΈΠ΅ Π·Π°ΠΊΠ°Π·Ρ‹ Π½Π΅ проходят Π²Π°Π»ΠΈΠ΄Π°Ρ†ΠΈΡŽ.

ΠšΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€Π°Ρ†ΠΈΡ Kafka Streams Π½Π΅ΠΌΠ½ΠΎΠ³ΠΎ отличаСтся ΠΎΡ‚ ΠΏΡ€Π΅Π΄Ρ‹Π΄ΡƒΡ‰ΠΈΡ…:

 @Bean(name = [KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME]) fun kStreamsConfig(): KafkaStreamsConfiguration { val props = hashMapOf<String, Any>( StreamsConfig.APPLICATION_ID_CONFIG to kafkaProps.appId, // must be specified to enable InteractiveQueries and checking metadata of Kafka Cluster StreamsConfig.APPLICATION_SERVER_CONFIG to serviceUtils.getServerAddress(), StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProps.bootstrapServers, StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG to Serdes.String().javaClass.name, // instances MUST have different stateDir StreamsConfig.STATE_DIR_CONFIG to kafkaProps.stateDir, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to kafkaProps.autoOffsetReset, StreamsConfig.PROCESSING_GUARANTEE_CONFIG to kafkaProps.processingGuarantee, StreamsConfig.COMMIT_INTERVAL_MS_CONFIG to kafkaProps.commitInterval, AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to topicProps.schemaRegistryUrl, StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG) to kafkaProps.sessionTimeout, // disable caching to ensure a complete aggregate changelog. // This is a little trick we need to apply // as caching in Kafka Streams will conflate subsequent updates for the same key. // Disabling caching ensures // we get a complete "changelog" from the aggregate(...) // (i.e. every input event will have a corresponding output event. // https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390 StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG to "0" ) return KafkaStreamsConfiguration(props) }

Π‘Ρ‚ΠΎΠΈΡ‚ ΠΏΠΎΡΡΠ½ΠΈΡ‚ΡŒ, Ρ‡Ρ‚ΠΎ Π² этом сСрвисС Π±ΡƒΠ΄Π΅Ρ‚ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚ΡŒΡΡ аггрСгация Π΄Π°Π½Π½Ρ‹Ρ…, вслСдствиС Ρ‡Π΅Π³ΠΎ Kafka Streams Π±ΡƒΠ΄Π΅Ρ‚ Π·Π°Π΄Π΅ΠΉΡΡ‚Π²ΠΎΠ²Π°Ρ‚ΡŒ state store для хранСния состояния Π°Π³Π³Ρ€Π΅Π³ΠΈΡ€ΠΎΠ²Π°Π½Π½Ρ‹Ρ… Π΄Π°Π½Π½Ρ‹Ρ…. Π’ этом случаС ΠΌΡ‹ ΠΎΡ‚ΠΊΠ»ΡŽΡ‡Π°Π΅ΠΌ ΠΊΡΡˆΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠ΅ Π΄Π°Π½Π½Ρ‹Ρ… Π½Π° инстансС, Ρ‚Π°ΠΊ ΠΊΠ°ΠΊ ΠΊΡΡˆΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠ΅ ΠΌΠΎΠΆΠ΅Ρ‚ привСсти ΠΊ объСдинСнию ΠΏΠΎΡΠ»Π΅Π΄ΠΎΠ²Π°Ρ‚Π΅Π»ΡŒΠ½Ρ‹Ρ… событий ΠΏΡ€ΠΈ Ρ€Π°Π±ΠΎΡ‚Π΅ со state store Π²ΠΎ врСмя Π°Π³Π³Ρ€Π΅Π³Π°Ρ†ΠΈΠΈ Π΄Π°Π½Π½Ρ‹Ρ…, Π² Ρ‚ΠΎ врСмя ΠΊΠ°ΠΊ ΠΌΡ‹ Ρ…ΠΎΡ‚ΠΈΠΌ ΠΎΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°Ρ‚ΡŒ ΠΊΠ°ΠΆΠ΄ΡƒΡŽ запись Π² процСссС Π°Π³Π³Ρ€Π΅Π³Π°Ρ†ΠΈΠΈ. Π‘ΠΎΠ»Π΅Π΅ ΠΏΠΎΠ΄Ρ€ΠΎΠ±Π½ΠΎ ΠΎΠ± этом Π² ΡΡ‚Π°Ρ‚ΡŒΠ΅ Kafka Streams Memory Management.

На ΠΏΠ΅Ρ€Π²ΠΎΠΌ этапС Ρ€Π°Π±ΠΎΡ‚Ρ‹ сСрвиса создаСтся ΡƒΠ·Π΅Π» Π°Π³Π³Ρ€Π΅Π³Π°Ρ†ΠΈΠΈ стрима KStream<String, Order>:

 val orders: KStream<String, Order> = builder .stream( schemas.ORDERS.name, Consumed.with(schemas.ORDERS.keySerde, schemas.ORDERS.valueSerde) ) .peek { key, value -> log.info("Processing Order {} in Fraud Service.", value) } .filter { _, order -> OrderState.CREATED == order.state } // Create an aggregate of the total value by customer and hold it with the order. // We use session windows to detect periods of activity. val aggregate: KTable<Windowed<Long>, OrderValue> = orders // creates a repartition internal topic if the value to be grouped by differs from // the key and downstream nodes need the new key .groupBy( { id, order -> order.customerId }, Grouped.with(Serdes.Long(), schemas.ORDERS.valueSerde) ) .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofHours(1))) .aggregate( ::OrderValue, { custId, order, total -> OrderValue(order, total.value + order.quantity * order.price) }, { k, a, b -> simpleMerge(a, b) },//include a merger as we're using session windows., Materialized.with(null, schemas.ORDER_VALUE_SERDE) ) private fun simpleMerge(a: OrderValue?, b: OrderValue): OrderValue { return OrderValue(b.order, (a?.value ?: 0.0) + b.value) }

Π‘Ρ‚ΠΎΠΈΡ‚ ΠΎΠ±Ρ€Π°Ρ‚ΠΈΡ‚ΡŒ Π²Π½ΠΈΠΌΠ°Π½ΠΈΠ΅ Π½Π° Ρ‚ΠΎ, Ρ‡Ρ‚ΠΎ ΠΏΡ€ΠΈ Π³Ρ€ΡƒΠΏΠΏΠΈΡ€ΠΎΠ²ΠΊΠ΅ ΠΏΠΎ ΠΊΠ»ΡŽΡ‡Ρƒ, ΠΎΡ‚Π»ΠΈΡ‡Π°ΡŽΡ‰Π΅ΠΌΡƒΡΡ ΠΎΡ‚ ΠΊΠ»ΡŽΡ‡Π° исходного Ρ‚ΠΎΠΏΠΈΠΊΠ° Π±ΡƒΠ΄Π΅Ρ‚ создан Π²Π½ΡƒΡ‚Ρ€Π΅Π½Π½ΠΈΠΉ слуТСбный Ρ‚ΠΎΠΏΠΈΠΊ Kafka, с Ρ‚Π°ΠΊΠΈΠΌ ΠΆΠ΅ количСством ΠΏΠ°Ρ€Ρ‚ΠΈΡ†ΠΈΠΉ, Ρ‡Ρ‚ΠΎ ΠΈ исходный Ρ‚ΠΎΠΏΠΈΠΊ. Π’ нашСм ΠΏΡ€ΠΈΠΌΠ΅Ρ€Π΅ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ΡΡ Π΅Ρ‰Π΅ ΠΎΠ΄ΠΈΠ½ Ρ‚ΠΈΠΏ ΠΎΠΊΠ½Π° для Π°Π³Π³Ρ€Π΅Π³Π°Ρ†ΠΈΠΈ Π΄Π°Π½Π½Ρ‹Ρ… - SessionWindows.ofInactivityGapWithNoGrace(Duration.ofHours(1)) Π­Ρ‚ΠΎ ΠΎΠ·Π½Π°Ρ‡Π°Π΅Ρ‚, Ρ‡Ρ‚ΠΎ Π΄Π°Π½Π½Ρ‹Π΅ Π±ΡƒΠ΄ΡƒΡ‚ ΠΏΠΎΠΏΠ°Π΄Π°Ρ‚ΡŒ Π² Π°Π³Π³Ρ€Π΅Π³Π°Ρ†ΠΈΡŽ Π² Ρ‚Π΅Ρ‡Π΅Π½ΠΈΠ΅ сСссии активности ΠΏΠΎΠ»ΡŒΠ·ΠΎΠ²Π°Ρ‚Π΅Π»Ρ. БСссия закрываСтся, ΠΊΠΎΠ³Π΄Π° Π² Ρ‚ΠΎΠΏΠΈΠΊ Π½Π΅ ΠΏΠΎΡΡ‚ΡƒΠΏΠ°ΡŽΡ‚ Π΄Π°Π½Π½Ρ‹Π΅ Π² Ρ‚Π΅Ρ‡Π΅Π½ΠΈΠ΅ ΡƒΠΊΠ°Π·Π°Π½Π½ΠΎΠ³ΠΎ ΠΏΠ΅Ρ€ΠΈΠΎΠ΄Π° Π²Ρ€Π΅ΠΌΠ΅Π½ΠΈ. Π‘ΠΎΠ»Π΅Π΅ ΠΏΠΎΠ΄Ρ€ΠΎΠ±Π½ΠΎ Π² ΡΡ‚Π°Ρ‚ΡŒΠ΅ Create session windows.

На Π²Ρ‚ΠΎΡ€ΠΎΠΌ этапС ΠΌΡ‹ избавляСмся ΠΎΡ‚ Π΄Π°Π½Π½Ρ‹Ρ… ΠΎΠ± "ΠΎΠΊΠ½Π°Ρ…" ΠΈ Π΄Π΅Π»Π°Π΅ΠΌ re-keying записСй ΠΏΠΎ orderId:

 val ordersWithTotals: KStream<String, OrderValue> = aggregate .toStream { windowedKey, orderValue -> windowedKey.key() } //When elements are evicted from a session window they create delete events. Filter these out. .filter { k, v -> v != null } .selectKey { id, orderValue -> orderValue.order.id }

На Ρ‚Ρ€Π΅Ρ‚ΡŒΠ΅ΠΌ этапС ΠΌΡ‹ Ρ„ΠΎΡ€ΠΌΠΈΡ€ΡƒΠ΅ΠΌ Π΄Π²Π΅ "Π²Π΅Ρ‚ΠΊΠΈ" стримов:

  • ΠΎΠ΄Π½Π° содСрТит значСния большС FRAUD_LIMIT
  • вторая - мСньшС FRAUD_LIMIT
 //Now branch the stream into two, for pass and fail, based on whether the windowed // total is over Fraud Limit val forks: Map<String, KStream<String, OrderValue>> = ordersWithTotals .peek { key, value -> log.info("Processing OrderValue: {} in FraudService BEFORE branching.", value) } .split(Named.`as`("limit-")) .branch( { id, orderValue -> orderValue.value >= FRAUD_LIMIT }, Branched.`as`("above") ) .branch( { id, orderValue -> orderValue.value < FRAUD_LIMIT }, Branched.`as`("below") ) .noDefaultBranch()

На ΡΠ»Π΅Π΄ΡƒΡŽΡ‰Π΅ΠΌ этапС значСния ΠΈΠ· ΠΊΠ°ΠΆΠ΄ΠΎΠΉ "Π²Π΅Ρ‚ΠΊΠΈ" Π±ΡƒΠ΄ΡƒΡ‚ ΠΎΡ‚ΠΏΡ€Π°Π²Π»Π΅Π½Ρ‹ Π² Ρ‚ΠΎΠΏΠΈΠΊ order-validations.v1, с ΡΠΎΠΎΡ‚Π²Π΅Ρ‚ΡΡ‚Π²ΡƒΡŽΡ‰ΠΈΠΌ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ΠΎΠΌ: FAIL ΠΈΠ»ΠΈ PASS.

val keySerde = schemas.ORDER_VALIDATIONS.keySerde val valueSerde = schemas.ORDER_VALIDATIONS.valueSerde val config = hashMapOf<String, Any>( AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to topicProps.schemaRegistryUrl, AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS to false, AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION to true, ) keySerde.configure(config, true) valueSerde.configure(config, false) forks["limit-above"]?.mapValues { orderValue -> OrderValidation( orderValue.order.id, OrderValidationType.FRAUD_CHECK, OrderValidationResult.FAIL ) }?.peek { key, value -> log.info("Sending OrderValidation for failed check in Fraud Service to Kafka. Order: {}", value) }?.to( schemas.ORDER_VALIDATIONS.name, Produced.with( keySerde, valueSerde ) ) forks["limit-below"]?.mapValues { orderValue -> OrderValidation( orderValue.order.id, OrderValidationType.FRAUD_CHECK, OrderValidationResult.PASS ) }?.peek { key, value -> log.info("Sending OrderValidation for passed check in Fraud Service to Kafka. Order: {}", value) }?.to( schemas.ORDER_VALIDATIONS.name, Produced.with( schemas.ORDER_VALIDATIONS.keySerde, schemas.ORDER_VALIDATIONS.valueSerde ) )

ΠŸΡ€ΠΈΠΌΠ΅Ρ‡Π°Π½ΠΈΠ΅.

Π’ процСссС тСстирования Ρ€Π΅Π°Π»ΠΈΠ·Π°Ρ†ΠΈΠΈ Ρƒ мСня Π²ΠΎΠ·Π½ΠΈΠΊΠ»Π° ΠΏΡ€ΠΎΠ±Π»Π΅ΠΌΠ° с Ρ‚Π΅ΠΌ, Ρ‡Ρ‚ΠΎ Π² Π½Π΅ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Ρ… случаях Kafka Streams ΠΏΡ‹Ρ‚Π°Π»Π°ΡΡŒ ΠΎΡ‚ΠΏΡ€Π°Π²ΠΈΡ‚ΡŒ Π΄Π°Π½Π½Ρ‹Π΅ Π² Ρ‚ΠΎΠΏΠΈΠΊ, ΠΏΡ€Π΅Π΄Π²Π°Ρ€ΠΈΡ‚Π΅Π»ΡŒΠ½ΠΎ автоматичСски зарСгистрировав Π½ΠΎΠ²ΡƒΡŽ схСму Π΄Π°Π½Π½Ρ‹Ρ… Π² Schema Registry, ΠΏΡ€ΠΈ этом новая схСма Π±Ρ‹Π»Π° нСсовмСстима со старой, Ρ‡Ρ‚ΠΎ останавливало Kafka Streams. ΠŸΠΎΠΏΡ‹Ρ‚ΠΊΠΈ ΠΎΡ‚ΠΊΠ»ΡŽΡ‡ΠΈΡ‚ΡŒ Π°Π²Ρ‚ΠΎΠΌΠ°Ρ‚ΠΈΡ‡Π΅ΡΠΊΡƒΡŽ Ρ€Π΅Π³ΠΈΡΡ‚Ρ€Π°Ρ†ΠΈΡŽ схСм для сСриализаторов / дСсСриализаторов Kafka Streams Ρ‚Π°ΠΊΠΆΠ΅ ΠΏΡ€ΠΈΠ²ΠΎΠ΄ΠΈΠ»ΠΈ ΠΊ остановкС прилоТСния ΠΈΠ·-Π·Π° Ρ‚ΠΎΠ³ΠΎ, Ρ‡Ρ‚ΠΎ Kafka Streams рСгулярно ΠΏΠΎΠ΄ ΠΊΠ°ΠΏΠΎΡ‚ΠΎΠΌ создаСт слуТСбныС Ρ‚ΠΎΠΏΠΈΠΊΠΈ ΠΈ Π΄ΠΎΠ»ΠΆΠ½Π° ΠΈΠΌΠ΅Ρ‚ΡŒ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡ‚ΡŒ Ρ€Π΅Π³ΠΈΡΡ‚Ρ€ΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ ΠΈΡ… схСмы. ΠŸΡ€ΠΈ этом стоит Π·Π°ΠΌΠ΅Ρ‚ΠΈΡ‚ΡŒ, Ρ‡Ρ‚ΠΎ остановка прилоТСния Kafka Streams Π½Π΅ ΠΏΡ€ΠΈΠ²ΠΎΠ΄ΠΈΡ‚ ΠΊ падСнию инстанса - ΠΎΠ½-Ρ‚ΠΎ ΠΊΠ°ΠΊ Ρ€Π°Π· ΠΈ ΠΏΡ€ΠΎΠ΄ΠΎΠ»ΠΆΠ°Π΅Ρ‚ Ρ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ, поэтому Π² ΠΏΡ€ΠΎΠ΄Π΅ придСтся Π½Π°ΡΡ‚Ρ€Π°ΠΈΠ²Π°Ρ‚ΡŒ ΠΎΡ‚Π΄Π΅Π»ΡŒΠ½Ρ‹ΠΉ ΠΌΠΎΠ½ΠΈΡ‚ΠΎΡ€ΠΈΠ½Π³ ТизнСспособности ΠΏΡ€ΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠΉ Kafka Streams, Ρ‡Ρ‚ΠΎΠ±Ρ‹ Π² случаС остановки стримов ΠΏΠ΅Ρ€Π΅Π·Π°ΠΏΡƒΡΡ‚ΠΈΡ‚ΡŒ инстанс.

ΠŸΡƒΡ‚Π΅ΠΌ Π΄ΠΎΠ»Π³ΠΈΡ… мытарств Ρ€Π΅ΡˆΠ΅Π½ΠΈΠ΅ (скорСС Π·Π°ΠΏΠ»Π°Ρ‚ΠΊΠ°) Π±Ρ‹Π»ΠΎ Π½Π°ΠΉΠ΄Π΅Π½ΠΎ: Ρ‚Π°ΠΌ, Π³Π΄Π΅ Kafka Streams отправляСт Π°Π³Π³Ρ€Π΅Π³ΠΈΡ€ΠΎΠ²Π°Π½Π½Ρ‹Π΅ ΠΈΠ»ΠΈ ΠΎΠ±ΡŠΠ΅Π΄ΠΈΠ½Π΅Π½Π½Ρ‹Π΅ Π΄Π°Π½Π½Ρ‹Π΅ Π² ΡƒΠΆΠ΅ созданный Ρ‚ΠΎΠΏΠΈΠΊ Kafka с зарСгистрированной Π² Schema Registry схСмой, я ΠΎΡ‚ΠΊΠ»ΡŽΡ‡ΠΈΠ» Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡ‚ΡŒ Serdes автоматичСски Ρ€Π΅Π³ΠΈΡΡ‚Ρ€ΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ Π½ΠΎΠ²Ρ‹Π΅ схСмы.

Π’Ρ‹Ρ‡ΠΈΡ‚Ρ‹Π²Π°Π΅Ρ‚ Π΄Π°Π½Π½Ρ‹Π΅ ΠΎ Π·Π°ΠΊΠ°Π·Π°Ρ… ΠΈΠ· Ρ‚ΠΎΠΏΠΈΠΊΠ° orders.v1 ΠΈ Π²Π°Π»ΠΈΠ΄ΠΈΡ€ΡƒΠ΅Ρ‚ Π½Π° ΠΏΡ€Π΅Π΄ΠΌΠ΅Ρ‚ наличия Ρ‚Ρ€Π΅Π±ΡƒΠ΅ΠΌΠΎΠ³ΠΎ количСства Ρ‚ΠΎΠ²Π°Ρ€ΠΎΠ² Π² Π·Π°ΠΊΠ°Π·Π΅. Валидация ΡƒΡ‡ΠΈΡ‚Ρ‹Π²Π°Π΅Ρ‚ ΠΊΠ°ΠΊ Ρ‚Π΅ Ρ‚ΠΎΠ²Π°Ρ€Ρ‹, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Π΅ΡΡ‚ΡŒ Π½Π° "складС", Ρ‚Π°ΠΊ ΠΈ Ρ‚Π΅, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Π·Π°Ρ€Π΅Π·Π΅Ρ€Π²ΠΈΡ€ΠΎΠ²Π°Π½Ρ‹ (Π½Π° Π½ΠΈΡ… Π΅ΡΡ‚ΡŒ Π΄Π΅ΠΉΡΡ‚Π²ΡƒΡŽΡ‰ΠΈΠ΅ Π·Π°ΠΊΠ°Π·Ρ‹). На Ρ‚Π΅ΠΊΡƒΡ‰ΠΈΠΉ ΠΌΠΎΠΌΠ΅Π½Ρ‚, Π½Π΅ Ρ€Π΅Π°Π»ΠΈΠ·ΠΎΠ²Π°Π½Π° Π»ΠΎΠ³ΠΈΠΊΠ° сниТСния количСства Ρ‚ΠΎΠ²Π°Ρ€ΠΎΠ² Π½Π° "складС" - это Π΄ΠΎΠ»ΠΆΠ½ΠΎ ΠΏΡ€ΠΎΠΈΡΡ…ΠΎΠ΄ΠΈΡ‚ΡŒ, Π½Π°ΠΏΡ€ΠΈΠΌΠ΅Ρ€, ΠΏΡ€ΠΈ ΠΎΡ‚ΠΏΡ€Π°Π²ΠΊΠ΅ Π·Π°ΠΊΠ°Π·Π° ΠΏΠΎΠΊΡƒΠΏΠ°Ρ‚Π΅Π»ΡŽ Ρ‡Π΅Ρ€Π΅Π· слуТбу доставки. "Π‘ΠΊΠ»Π°Π΄" - прСдставляСт ΠΈΠ· сСбя Ρ‚ΠΎΠΏΠΈΠΊ Kafka warehouse-inventory.v1, Π³Π΄Π΅ ΠΊΠ»ΡŽΡ‡Π΅ΠΌ записи являСтся Π½Π°Π·Π²Π°Π½ΠΈΠ΅ Ρ‚ΠΎΠ²Π°Ρ€Π°, Π° Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ΠΌ - количСство.

Основной сСрвис InventoryKafkaService создаСт KStream<String, Order> Π½Π° основС Ρ‚ΠΎΠΏΠΈΠΊΠ° orders.v1:

val orders: KStream<String, Order> = builder .stream( schemas.ORDERS.name, Consumed.with(schemas.ORDERS.keySerde, schemas.ORDERS.valueSerde) ) .peek { key, value -> log.info("Orders streams record. Key: {}, Order: {}", key, value) }

KTable<Product, Int> Π½Π° основС Ρ‚ΠΎΠΏΠΈΠΊΠ° warehouse-inventory.v1:

val warehouseInventory: KTable<Product, Int> = builder .table( schemas.WAREHOUSE_INVENTORY.name, Consumed.with( schemas.WAREHOUSE_INVENTORY.keySerde, schemas.WAREHOUSE_INVENTORY.valueSerde ) )

Π° Ρ‚Π°ΠΊΠΆΠ΅ ΠΌΠ°Ρ‚Π΅Ρ€ΠΈΠ°Π»ΠΈΠ·ΠΎΠ²Π°Π½Π½Ρ‹ΠΉ KeyValueStore<Product, Long>, хранящий ΠΈΠ½Ρ„ΠΎΡ€ΠΌΠ°Ρ†ΠΈΡŽ ΠΎ Π·Π°Ρ€Π΅Π·Π΅Ρ€Π²ΠΈΡ€ΠΎΠ²Π°Π½Π½Ρ‹Ρ… Ρ‚ΠΎΠ²Π°Ρ€Π°Ρ…:

val reservedStock: StoreBuilder<KeyValueStore<Product, Long>> = Stores .keyValueStoreBuilder( Stores.persistentKeyValueStore(inventoryProps.reservedStockStoreName), schemas.WAREHOUSE_INVENTORY.keySerde, Serdes.Long() ) .withLoggingEnabled(hashMapOf()) builder.addStateStore(reservedStock)

ПослС этого ΠΌΡ‹ Π΄Π΅Π»Π°Π΅ΠΌ re-keying стрима KStream<String, Order> ΠΏΠΎ ΠΊΠ»ΡŽΡ‡Ρƒ product. Π’ΡƒΡ‚ стоит Π½Π°ΠΏΠΎΠΌΠ½ΠΈΡ‚ΡŒ, Ρ‡Ρ‚ΠΎ ΠΏΠΎΠ΄ ΠΊΠ°ΠΏΠΎΡ‚ΠΎΠΌ Kafka Streams ΠΏΠΎΠΌΠ΅Ρ‚ΠΈΡ‚ этот KStream ΠΊΠ°ΠΊ ΠΏΠΎΠ΄Π»Π΅ΠΆΠ°Ρ‰ΠΈΠΉ Ρ€Π΅ΠΏΠ°Ρ€Ρ‚ΠΈΡ†ΠΈΠΎΠ½ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΡŽ, ΠΈ Π² случаС нСобходимости (Π½ΠΈΠΆΠ΅ Π±ΡƒΠ΄Π΅Ρ‚ join ΠΈΠ»ΠΈ аггрСгация), ΠΏΡ€ΠΎΠΈΠ·ΠΎΠΉΠ΄Π΅Ρ‚ Ρ€Π΅ΠΏΠ°Ρ€Ρ‚ΠΈΡ†ΠΈΠΎΠ½ΠΈΡ€ΠΎΠ²Π°Π½ΠΈΠ΅. Π”Π°Π»Π΅Π΅ Π² стрим ΠΏΠΎΠΏΠ°Π΄Π°ΡŽΡ‚ Ρ‚ΠΎΠ»ΡŒΠΊΠΎ Π·Π°ΠΊΠ°Π·Ρ‹ Π² состоянии CREATED, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ ΠΎΠ±ΡŠΠ΅Π΄ΠΈΠ½ΡΡŽΡ‚ΡΡ с Ρ‚Π°Π±Π»ΠΈΡ†Π΅ΠΉ Π·Π°Ρ€Π΅Π·Π΅Ρ€Π²ΠΈΡ€ΠΎΠ²Π°Π½Π½Ρ‹Ρ… Ρ‚ΠΎΠ²Π°Ρ€ΠΎΠ². ПослС объСдинСния ΠΏΠΎΠ»ΡƒΡ‡Π°Π΅ΠΌ Π½ΠΎΠ²Ρ‹ΠΉ KStream, каТдая запись ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠ³ΠΎ ΠΏΡ€ΠΎΡ…ΠΎΠ΄ΠΈΡ‚ Π²Π°Π»ΠΈΠ΄Π°Ρ†ΠΈΡŽ ΠΈ отправляСтся Π² Ρ‚ΠΎΠΏΠΈΠΊ order-validations.v1.

 orders.selectKey { id, order -> order.product } .peek { key, value -> log.info("Orders stream record after SELECT KEY. New key: {}. \nNew value: {}", key, value) } // Limit to newly created orders .filter { id, order -> OrderState.CREATED == order.state } //Join Orders to Inventory so we can compare each order to its corresponding stock value .join( warehouseInventory, ::KeyValue, Joined.with( schemas.WAREHOUSE_INVENTORY.keySerde, schemas.ORDERS.valueSerde, Serdes.Integer() ) ) //Validate the order based on how much stock we have both in the warehouse // and locally 'reserved' stock .transform( TransformerSupplier { InventoryValidator(inventoryProps) }, inventoryProps.reservedStockStoreName ) .peek { key, value -> log.info( "Pushing the result of validation Order record to topic: {} with key: {}. \nResult: {}", schemas.ORDER_VALIDATIONS.name, key, value ) } //Push the result into the Order Validations topic .to( schemas.ORDER_VALIDATIONS.name, Produced.with( orderValidationsKeySerde.apply { configureSerde(this, true) }, orderValidationsValueSerde.apply { configureSerde(this, false) } ) )

Полная рСализация класса:

private val log = LoggerFactory.getLogger(InventoryKafkaService::class.java) @Service class InventoryKafkaService( private val schemas: Schemas, private val inventoryProps: InventoryProps, private val topicProps: TopicsProps ) { @Autowired fun processStreams(builder: StreamsBuilder) { // Latch onto instances of the orders and inventory topics val orders: KStream<String, Order> = builder .stream( schemas.ORDERS.name, Consumed.with(schemas.ORDERS.keySerde, schemas.ORDERS.valueSerde) ) .peek { key, value -> log.info("Orders streams record. Key: {}, Order: {}", key, value) } val warehouseInventory: KTable<Product, Int> = builder .table( schemas.WAREHOUSE_INVENTORY.name, Consumed.with( schemas.WAREHOUSE_INVENTORY.keySerde, schemas.WAREHOUSE_INVENTORY.valueSerde ) ) // Create a store to reserve inventory whilst the order is processed. // This will be prepopulated from Kafka before the service starts processing val reservedStock: StoreBuilder<KeyValueStore<Product, Long>> = Stores .keyValueStoreBuilder( Stores.persistentKeyValueStore(inventoryProps.reservedStockStoreName), schemas.WAREHOUSE_INVENTORY.keySerde, Serdes.Long() ) .withLoggingEnabled(hashMapOf()) builder.addStateStore(reservedStock) val orderValidationsKeySerde = Serdes.String() val orderValidationsValueSerde = SpecificAvroSerde<OrderValidation>() //First change orders stream to be keyed by Product (so we can join with warehouse inventory) orders.selectKey { id, order -> order.product } .peek { key, value -> log.info("Orders stream record after SELECT KEY. New key: {}. \nNew value: {}", key, value) } // Limit to newly created orders .filter { id, order -> OrderState.CREATED == order.state } //Join Orders to Inventory so we can compare each order to its corresponding stock value .join( warehouseInventory, ::KeyValue, Joined.with( schemas.WAREHOUSE_INVENTORY.keySerde, schemas.ORDERS.valueSerde, Serdes.Integer() ) ) //Validate the order based on how much stock we have both in the warehouse // and locally 'reserved' stock .transform( TransformerSupplier { InventoryValidator(inventoryProps) }, inventoryProps.reservedStockStoreName ) .peek { key, value -> log.info( "Pushing the result of validation Order record to topic: {} with key: {}. \nResult: {}", schemas.ORDER_VALIDATIONS.name, key, value ) } //Push the result into the Order Validations topic .to( schemas.ORDER_VALIDATIONS.name, Produced.with( orderValidationsKeySerde.apply { configureSerde(this, true) }, orderValidationsValueSerde.apply { configureSerde(this, false) } ) ) } private fun configureSerde(serde: Serde<*>, isKey: Boolean) { val serdesConfig = hashMapOf<String, Any>( AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to topicProps.schemaRegistryUrl, AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS to false, AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION to true, ) schemas.configureSerde(serde, serdesConfig, isKey) } } 

Order Enrichment Service

БСрвис ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅Ρ‚ Java ΠΊΠ»ΠΈΠ΅Π½Ρ‚ ksqlDB, для Ρ‚ΠΎΠ³ΠΎ, Ρ‡Ρ‚ΠΎΠ±Ρ‹ ΡΠΎΠ·Π΄Π°Ρ‚ΡŒ Π² ksqlDB stream, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹ΠΉ содСрТит ΠΈΠ½Ρ„ΠΎΡ€ΠΌΠ°Ρ†ΠΈΡŽ ΠΊΠ°ΠΊ ΠΎ ΠΏΠΎΠΊΡƒΠΏΠ°Ρ‚Π΅Π»Π΅, Ρ‚Π°ΠΊ ΠΈ ΠΎ Π·Π°ΠΊΠ°Π·Π΅. Π”Π°Π½Π½Ρ‹Π΅ ΠΈΠ· этого стрима ΠΌΠΎΠΆΠ½ΠΎ ΠΏΠΎΠ»ΡƒΡ‡ΠΈΡ‚ΡŒ, нСпосрСдствСнно ΠΎΠ±Ρ€Π°Ρ‚ΠΈΠ²ΡˆΠΈΡΡŒ ΠΊ ksqlDB ΠΊΠ°ΠΊ Ρ‡Π΅Ρ€Π΅Π· Java ΠΊΠ»ΠΈΠ΅Π½Ρ‚, Ρ‚Π°ΠΊ ΠΈ Π΄Ρ€ΡƒΠ³ΠΈΠΌΠΈ способами.

 private val CREATE_ORDERS_STREAM = "CREATE SOURCE STREAM IF NOT EXISTS ${ksqlDBProps.ordersStream} (" + "ID STRING KEY, " + "CUSTOMERID BIGINT, " + "STATE STRING, " + "PRODUCT STRING, " + "QUANTITY INT, " + "PRICE DOUBLE) WITH (" + "kafka_topic='${topicProps.ordersTopic}'," + " value_format='AVRO'" + ");" private val CREATE_CUSTOMERS_TABLE = """  CREATE SOURCE TABLE IF NOT EXISTS ${ksqlDBProps.customersTable} (  CUSTOMERID BIGINT PRIMARY KEY,   FIRSTNAME STRING,   LASTNAME STRING,   EMAIL STRING,   ADDRESS STRING,   LEVEL STRING  ) WITH (  KAFKA_TOPIC='${topicProps.customersTopic}',  VALUE_FORMAT='AVRO'  );  """ private val CREATE_ORDERS_ENRICHED_STREAM = """  CREATE STREAM IF NOT EXISTS ${ksqlDBProps.ordersEnrichedStream}   AS SELECT ${ksqlDBProps.customersTable}.CUSTOMERID AS customerId,   ${ksqlDBProps.customersTable}.FIRSTNAME,   ${ksqlDBProps.customersTable}.LASTNAME,   ${ksqlDBProps.customersTable}.LEVEL,   ${ksqlDBProps.ordersStream}.PRODUCT,   ${ksqlDBProps.ordersStream}.QUANTITY,   ${ksqlDBProps.ordersStream}.PRICE   FROM ${ksqlDBProps.ordersStream}   LEFT JOIN ${ksqlDBProps.customersTable}   ON ${ksqlDBProps.ordersStream}.CUSTOMERID = ${ksqlDBProps.customersTable}.CUSTOMERID;  """

Помимо этого Ρ‚Π°ΠΊΠΆΠ΅ создаСтся Ρ‚Π°Π±Π»ΠΈΡ†Π°, содСрТащая ΠΈΠ½Ρ„ΠΎΡ€ΠΌΠ°Ρ†ΠΈΡŽ ΠΎ ΠΏΠΎΡ‚Π΅Π½Ρ†ΠΈΠ°Π»ΡŒΠ½ΠΎ ΠΌΠΎΡˆΠ΅Π½Π½ΠΈΡ‡Π΅ΡΠΊΠΈΡ… дСйствиях ΠΊΠ»ΠΈΠ΅Π½Ρ‚ΠΎΠ²:

 private val CREATE_FRAUD_ORDER_TABLE = """  CREATE TABLE IF NOT EXISTS ${ksqlDBProps.fraudOrderTable}   WITH (KEY_FORMAT='json')   AS SELECT CUSTOMERID,   LASTNAME,   FIRSTNAME,   COUNT(*) AS COUNTS   FROM ${ksqlDBProps.ordersEnrichedStream}   WINDOW TUMBLING (SIZE 30 SECONDS)   GROUP BY CUSTOMERID, LASTNAME, FIRSTNAME   HAVING COUNT(*)>2;   """

БСрвис выставляСт Π½Π°Ρ€ΡƒΠΆΡƒ REST endpoint, ΠΏΠΎ ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌΡƒ ΠΌΠΎΠΆΠ½ΠΎ ΠΏΠΎΠ»ΡƒΡ‡ΠΈΡ‚ΡŒ Π΄Π°Π½Π½Ρ‹Π΅ ΠΎ ΠΏΠΎΡ‚Π΅Π½Ρ†ΠΈΠ°Π»ΡŒΠ½Ρ‹Ρ… ΠΌΠΎΡˆΠ΅Π½Π½ΠΈΠΊΠ°Ρ…:

data class FraudDto( val customerId: Long, val lastName: String, val firstName: String, val counts: Long ) @RestController @RequestMapping("/fraud") class FraudController( private val fraudService: FraudService ) { @GetMapping fun getFraudulentOrders( @RequestParam(name = "limit", defaultValue = "10") limit: Int ): List<FraudDto> { log.info("Received request to GET $limit number of fraudulent orders.") return fraudService.getFraudulentOrders(limit) } } @Service class FraudServiceImpl( private val client: Client, private val ksqlDBProps: KsqlDBProps ) : FraudService { override fun getFraudulentOrders(limit: Int): List<FraudDto> { log.info("Executing getFraudulentOrders stream query.") val frauds = client.executeQuery("SELECT * FROM ${ksqlDBProps.fraudOrderTable} LIMIT $limit;").get() return frauds.map(::mapRowToResponse) } private fun mapRowToResponse(row: Row): FraudDto { log.info("Mapping ksqlDB Query row: {}", row) val cusId = row.getLong("CUSTOMERID") val lastName = row.getString("LASTNAME") val firstName = row.getString("FIRSTNAME") val count = row.getLong("COUNTS") return FraudDto(cusId, lastName, firstName, count) } }

ΠŸΡ€ΠΎΡΡ‚ΠΎΠΉ сСрвис, ΠΎΡ‚Π²Π΅Ρ‡Π°ΡŽΡ‰ΠΈΠΉ Π·Π° Π²Ρ‹Ρ‡ΠΈΡ‚Ρ‹Π²Π°Π½ΠΈΠ΅ Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ· Ρ‚ΠΎΠΏΠΈΠΊΠ° order-validations.v1, ΠΊΡƒΠ΄Π° ΠΎΡ‚ΠΏΡ€Π°Π²Π»ΡΡŽΡ‚ сообщСния сСрвисы ΠΏΡ€ΠΎΠ²Π΅Ρ€ΠΊΠΈ Π·Π°ΠΊΠ°Π·ΠΎΠ²: Inventory Service, Fraud Service ΠΈ Order Details Service. ПослС Ρ‚ΠΎΠ³ΠΎ, ΠΊΠ°ΠΊ ΠΎΡ‚ ΠΊΠ°ΠΆΠ΄ΠΎΠ³ΠΎ Π²Π°Π»ΠΈΠ΄Π°Ρ‚ΠΎΡ€Π° Π±ΡƒΠ΄Π΅Ρ‚ ΠΏΠΎΠ»ΡƒΡ‡Π΅Π½ ΠΎΡ‚Π²Π΅Ρ‚ ΠΏΠΎ ΠΊΠΎΠ½ΠΊΡ€Π΅Ρ‚Π½ΠΎΠΌΡƒ Π·Π°ΠΊΠ°Π·Ρƒ, Π² Ρ‚ΠΎΠΏΠΈΠΊ orders.v1 отправляСтся сообщСниС с ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Π½Ρ‹ΠΌ статусом Π·Π°ΠΊΠ°Π·Π°.

Настройки Kafka Streams Π°Π½Π°Π»ΠΎΠ³ΠΈΡ‡Π½Ρ‹ Orders Service.

РСализация класса ValidationAggregatorService:

@Service class ValidationAggregatorService( private val schemas: Schemas, private val topicProps: TopicsProps ) { private val serdes1: Consumed<String, OrderValidation> = Consumed.with( schemas.ORDER_VALIDATIONS.keySerde, schemas.ORDER_VALIDATIONS.valueSerde ) private val serdes2: Consumed<String, Order> = Consumed .with(schemas.ORDERS.keySerde, schemas.ORDERS.valueSerde) private val serdes3: Grouped<String, OrderValidation> = Grouped .with(schemas.ORDER_VALIDATIONS.keySerde, schemas.ORDER_VALIDATIONS.valueSerde) private val serdes4: StreamJoined<String, Long, Order> = StreamJoined .with(schemas.ORDERS.keySerde, Serdes.Long(), schemas.ORDERS.valueSerde) private val serdes5: Grouped<String, Order> = Grouped .with(schemas.ORDERS.keySerde, schemas.ORDERS.valueSerde) private val serdes6: StreamJoined<String, OrderValidation, Order> = StreamJoined .with(schemas.ORDERS.keySerde, schemas.ORDER_VALIDATIONS.valueSerde, schemas.ORDERS.valueSerde) // disables auto registering schemas for specified Serde<*> private fun configureSerdes(serde: Serde<*>, isKey: Boolean) { val config = hashMapOf<String, Any>( AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to topicProps.schemaRegistryUrl, AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS to false, AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION to true, ) schemas.configureSerde(serde, config, isKey) } @Autowired fun aggregateOrderValidations(builder: StreamsBuilder) { // 1. ΠšΠΎΠ»ΠΈΡ‡Π΅ΡΡ‚Π²ΠΎ ΠΏΡ€ΠΎΠ²Π΅Ρ€ΠΎΠΊ, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ ΠΏΡ€ΠΎΡ…ΠΎΠ΄ΠΈΡ‚ ΠΊΠ°ΠΆΠ΄Ρ‹ΠΉ Π·Π°ΠΊΠ°Π·. Для простоты Ρ€Π΅Π°Π»ΠΈΠ·Π°Ρ†ΠΈΠΈ Π·Π°Ρ…Π°Ρ€Π΄ΠΊΠΎΠΆΠ΅Π½ΠΎ. val numberOfRules = 3 // 2. Настройка Serdes, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Π±ΡƒΠ΄ΡƒΡ‚ ΡƒΡ‡Π°ΡΡ‚Π²ΠΎΠ²Π°Ρ‚ΡŒ Π² ΠΎΡ‚ΠΏΡ€Π°Π²ΠΊΠ΅ сообщСний Π² Ρ‚ΠΎΠΏΠΈΠΊ orders.v1.  // Π—Π°ΠΏΡ€Π΅Ρ‰Π°Π΅ΠΌ Π°Π²Ρ‚ΠΎΠΌΠ°Ρ‚ΠΈΡ‡Π΅ΡΠΊΡƒΡŽ Ρ€Π΅Π³ΠΈΡΡ‚Ρ€Π°Ρ†ΠΈΡŽ схСм Π² Schema Registry, Ρ‡Ρ‚ΠΎΠ±Ρ‹ ΠΈΠ·Π±Π΅ΠΆΠ°Ρ‚ΡŒ ΠΊΠΎΠ½Ρ„Π»ΠΈΠΊΡ‚ΠΎΠ². val ordersKeySerde = Serdes.String() val ordersValueSerde = SpecificAvroSerde<Order>() configureSerdes(ordersKeySerde, true) configureSerdes(ordersValueSerde, false) // 3. стрим ΠΈΠ· Ρ‚ΠΎΠΏΠΈΠΊΠ° order-validations.v1 val validations: KStream<String, OrderValidation> = builder .stream(schemas.ORDER_VALIDATIONS.name, serdes1) // 4. стрим ΠΈΠ· Ρ‚ΠΎΠΏΠΈΠΊΠ° orders.v1, Π² ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌ ΠΏΡ€ΠΈΡΡƒΡ‚ΡΡ‚Π²ΡƒΡŽΡ‚ Ρ‚ΠΎΠ»ΡŒΠΊΠΎ Π·Π°ΠΊΠ°Π·Ρ‹ Π² статусС CREATED val orders = builder .stream(schemas.ORDERS.name, serdes2) .filter { id, order -> OrderState.CREATED == order.state } validations .peek { key, value -> log.info("Starting validation for key: {}, value: {}", key, value) } // 5. Π³Ρ€ΡƒΠΏΠΏΠΈΡ€ΡƒΠ΅ΠΌ сообщСния ΠΎ Π²Π°Π»ΠΈΠ΄Π°Ρ†ΠΈΠΈ ΠΏΠΎ orderId .groupByKey(serdes3) // 6. БообщСния Ρ‚Π°ΠΊΠΆΠ΅ Ρ€Π°Π·Π±ΠΈΠ²Π°ΡŽΡ‚ΡΡ ΠΏΠΎ ΠΎΠΊΠ½Π°ΠΌ сСссионной активности с ΠΏΠ΅Ρ€ΠΈΠΎΠ΄ΠΈΠΌ нСактивности  // 5 ΠΌΠΈΠ½ΡƒΡ‚ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5))) // 7. АггрСгируСм Π΄Π°Π½Π½Ρ‹Π΅ ΠΎ ΠΏΡ€ΠΎΠ²Π΅Ρ€ΠΊΠ΅ Π·Π°ΠΊΠ°Π·ΠΎΠ², Ссли Π·Π°ΠΊΠ°Π· ΠΏΡ€ΠΎΡˆΠ΅Π» ΠΏΡ€ΠΎΠ²Π΅Ρ€ΠΊΡƒ (PASS), // Ρ‚ΠΎΠ³Π΄Π° ΡƒΠ²Π΅Π»ΠΈΡ‡ΠΈΠ²Π°Π΅ΠΌ счСтчик ΠΏΡ€ΠΎΠ²Π΅Ρ€Π΅Π½Π½Ρ‹Ρ… Π·Π°ΠΊΠ°Π·ΠΎΠ²  .aggregate( { 0L }, { id, result, total -> if (PASS == result.validationResult) total + 1 else total }, // Ρ‚Π°ΠΊ ΠΊΠ°ΠΊ ΠΌΡ‹ ΠΈΡΠΏΠΎΠ»ΡŒΠ·ΡƒΠ΅ΠΌ SessionWindow, Π½Π°ΠΌ Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ Π΄ΠΎΠ±Π°Π²ΠΈΡ‚ΡŒ merger для сСссий // ΠΈΠ·-Π·Π° особСнностСй Ρ€Π°Π±ΠΎΡ‚Ρ‹ этих ΠΎΠΊΠΎΠ½. По сути, ΠΊΠΎΠ³Π΄Π° Π² стрим ΠΏΡ€ΠΈΡ…ΠΎΠ΄ΠΈΡ‚ ΠΏΠ΅Ρ€Π²ΠΎΠ΅ сообщСниС // сразу создаСтся сСссия, послС этого, Ссли ΠΏΡ€ΠΈΡ…ΠΎΠ΄ΠΈΡ‚ Π΅Ρ‰Π΅ ΠΎΠ±Π½ΠΎ сообщСниС, ΠΏΠΎΠΏΠ°Π΄Π°ΡŽΡ‰Π΅Π΅ Π² // inactivity gap ΠΏΠ΅Ρ€Π²ΠΎΠ³ΠΎ сообщСния, Ρ‚ΠΎ для Π½Π΅Π³ΠΎ Ρ‚ΠΎΠΆΠ΅ создаСтся сСссия, которая мСрТится // с сСссиСй ΠΏΠ΅Ρ€Π²ΠΎΠ³ΠΎ сообщСния. ΠŸΠΎΠ΄Ρ€ΠΎΠ±Π½Π΅Π΅ Ρ‚ΡƒΡ‚: // https://kafka.apache.org/36/documentation/streams/developer-guide/dsl-api.html#windowing-session { k, a, b -> b ?: a }, Materialized.with(null, Serdes.Long()) ) // 8. избавляСмся ΠΎΡ‚ "ΠΎΠΊΠΎΠ½" ΠΈ Ρ„ΠΎΡ€ΠΌΠΈΡ€ΡƒΠ΅ΠΌ Π½ΠΎΠ²Ρ‹ΠΉ стрим, Π³Π΄Π΅: key = orderId, value = количСство ΠΏΡ€ΠΎΠΉΠ΄Π΅Π½Π½Ρ‹Ρ… ΠΏΡ€ΠΎΠ²Π΅Ρ€ΠΎΠΊ .toStream { windowedKey, total -> windowedKey.key() } // Когда сообщСния ΡƒΠ΄Π°Π»ΡΡŽΡ‚ΡΡ ΠΈΠ· Session Window, публикуСтся событиС delete (null-value), избавляСмся ΠΎΡ‚ Π½ΠΈΡ… .filter { _, v -> v != null } // 9. Π’Π°ΠΊΠΆΠ΅ Π±Π΅Ρ€Π΅ΠΌ Ρ‚ΠΎΠ»ΡŒΠΊΠΎ Ρ‚Π΅ записи, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ ΠΏΡ€ΠΎΡˆΠ»ΠΈ всС ΠΏΡ€ΠΎΠ²Π΅Ρ€ΠΊΠΈ .filter { _, total -> total >= numberOfRules } // 9. // 10. объСдиняСм со стримом orders ΠΈ обновляСм статус Π·Π°ΠΊΠ°Π·Π° Π½Π° VALIDATED .join( orders, { id, order -> Order.newBuilder(order).setState(OrderState.VALIDATED).build() }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)), serdes4 ) .peek { _, value -> log.info("Order {} has been validated and is being sent to Kafka Topic: {}", value, topicProps.ordersTopic) } // 11. ΠžΡ‚ΠΏΡ€Π°Π²Π»ΡΠ΅ΠΌ Π·Π°ΠΊΠ°Π· с ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Π½Ρ‹ΠΌ статусом Π² Ρ‚ΠΎΠΏΠΈΠΊ orders.v1 .to(schemas.ORDERS.name, Produced.with(ordersKeySerde, ordersValueSerde)) // 12. Ρ‚Π°ΠΊΠΆΠ΅ ΠΎΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°Π΅ΠΌ случай, ΠΊΠΎΠ³Π΄Π° Π·Π°ΠΊΠ°Π· ΠΏΡ€ΠΎΠ²Π°Π»ΠΈΠ» хотябы ΠΎΠ΄Π½Ρƒ ΠΏΡ€ΠΎΠ²Π΅Ρ€ΠΊΡƒ validations.filter { id, rule -> FAIL == rule.validationResult } // 13. объСдиняСм со стримом orders ΠΈ обновляСм статус Π·Π°ΠΊΠ°Π·Π° Π½Π° FAILED .join( orders, { id, order -> Order.newBuilder(order).setState(OrderState.FAILED).build() }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)), serdes6 ) .peek { key, value -> log.info("Order {} failed validation. BEFORE groupByKey.", value) } // 14. Π“Ρ€ΡƒΠΏΠΏΠΈΡ€ΡƒΠ΅ΠΌ ΠΏΠΎ ΠΊΠ»ΡŽΡ‡Ρƒ - orderId .groupByKey(serdes5) // 15. Ρ‚Π°ΠΊ ΠΊΠ°ΠΊ Π·Π°ΠΊΠ°Π· ΠΌΠΎΠ³ Π½Π΅ ΠΏΡ€ΠΎΠΉΡ‚ΠΈ нСсколько ΠΏΡ€ΠΎΠ²Π΅Ρ€ΠΎΠΊ, Π² нашСм стримС ΠΌΠΎΠ³ΡƒΡ‚ ΠΏΡ€ΠΈΡΡƒΡ‚ΡΡ‚Π²ΠΎΠ²Π°Ρ‚ΡŒ // нСсколько записСй со статусом FAIL, Ρ‚ΡƒΡ‚ ΠΌΡ‹ избавляСмся ΠΎΡ‚ Π΄ΡƒΠ±Π»ΠΈΠΊΠ°Ρ‚ΠΎΠ², Ρ‚Π°ΠΊ ΠΊΠ°ΠΊ Π½Π°ΠΌ // Π½Π΅ Π²Π°ΠΆΠ½ΠΎ сколько ΠΏΡ€ΠΎΠ²Π΅Ρ€ΠΎΠΊ Π½Π΅ ΠΏΡ€ΠΎΡˆΠ΅Π» Π·Π°ΠΊΠ°Π· .reduce { order, v1 -> order } .toStream() .peek { key, value -> log.info("Order {} failed validation. AFTER groupByKey.", value) } .to( // 16. ΠžΡ‚ΠΏΡ€Π°Π²Π»ΡΠ΅ΠΌ Π·Π°ΠΊΠ°Π· с ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Π½Ρ‹ΠΌ статусом Π² Ρ‚ΠΎΠΏΠΈΠΊ orders.v1 schemas.ORDERS.name, Produced.with(ordersKeySerde, ordersValueSerde) ) } }

ВрСбования для запуска:

  1. JDK 17
  2. Docker + docker-compose
  3. ΠžΠ—Π£ для Docker Π½Π΅ ΠΌΠ΅Π½Π΅Π΅ 8 Π“Π±
  4. ΠžΠ—Π£ для микросСрвисов Π½Π΅ ΠΌΠ΅Π½Π΅Π΅ 8 Π“Π±. (Π’ΠΎΡ‡Π½ΠΎ запускаСтся Π½Π° Ubuntu 22.04 с 64 Π“Π± ΠžΠ—Π£ ΠΈ 8 ядрами процСссора)

Как Π·Π°ΠΏΡƒΡΡ‚ΠΈΡ‚ΡŒ микросСрвисы

Π—Π°ΠΏΡƒΡΡ‚ΠΈΡ‚ΡŒ скрипт ΠΈΠ· ΠΊΠΎΡ€Π½Π΅Π²ΠΎΠΉ ΠΏΠ°ΠΏΠΊΠΈ:

  • Если установлСн Docker V2, Ρ‚ΠΎ:
./prepare_script_docker_new.sh
  • Если Π±ΠΎΠ»Π΅Π΅ стара вСрсия, Ρ‚ΠΎ:
./prepare_script_docker_old.sh

Π‘ΠΊΡ€ΠΈΠΏΡ‚ осущСствляСт ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠ΅ дСйствия:

  1. ΠŸΠΎΠ΄Π½ΠΈΠΌΠ°Π΅Ρ‚ docker ΠΊΠΎΠ½Ρ‚Π΅ΠΉΠ½Π΅Ρ€Ρ‹:

    • Zookeeper
    • Kafka
    • Schema Registry (Confluent)
    • Connect (Kafka Connect)
    • ksqlDB
    • Elasticsearch
    • SQLite
    • Kibana
  2. РСгистрируСт Avro схСму сущностСй, ΠΊΠΎΡ‚ΠΎΡ€Ρ‹Π΅ Π±ΡƒΠ΄ΡƒΡ‚ ΠΎΠ±Ρ€Π°Π±Π°Ρ‚Ρ‹Π²Π°Ρ‚ΡŒΡΡ Π² Kafka. РСгистрация осущСствляСтся с ΠΏΠΎΠΌΠΎΡ‰ΡŒΡŽ Gradle Plugin:
    id 'com.github.imflog.kafka-schema-registry-gradle-plugin' version "1.12.0"

  3. ΠŸΡ€ΠΎΠ²ΠΎΠ΄ΠΈΡ‚ Ρ‡ΠΈΡΡ‚ΡƒΡŽ сборку микросСрвисов (ΠΏΠΎΠΊΠ° Π±Π΅Π· тСстов, Ρ‚Π°ΠΊ ΠΊΠ°ΠΊ ΠΈΡ… Π΅Ρ‰Π΅ Π½Π΅ написал).

  4. Π‘ΠΎΠ·Π΄Π°Π΅Ρ‚ индСкс orders.v1 Π² ElasticSearch

  5. ΠšΠΎΠ½Ρ„ΠΈΠ³ΡƒΡ€ΠΈΡ€ΡƒΠ΅Ρ‚ Kafka Connect для:

    • получСния Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ· SQLite (Ρ‚Π°Π±Π»ΠΈΡ†Π° customers) ΠΈ записи Π»ΠΎΠ³Π° ΠΈΠ·ΠΌΠ΅Π½Π΅Π½ΠΈΠΉ Π² Kafka (ΠΏΠ°Ρ‚Ρ‚Π΅Ρ€Π½ Change Data Capture)
    • записи Π΄Π°Π½Π½Ρ‹Ρ… ΠΈΠ· Ρ‚ΠΎΠΏΠΈΠΊΠ° Kafka orders.v1 Π² индСкс orders.v1 ElasticSearch
  6. НастраиваСт Kibana для просмотра записСй ΠΈΠ· ElasticSearch.

Π’ΠΎΠ·ΠΌΠΎΠΆΠ½Ρ‹Π΅ ΠΏΡ€ΠΎΠ±Π»Π΅ΠΌΡ‹

Π£ мСня Π½Π° Π²ΠΈΡ€Ρ‚ΡƒΠ°Π»ΡŒΠ½ΠΎΠΉ машинС Ubuntu 22.04 ΠΈΠ·Π½Π°Ρ‡Π°Π»ΡŒΠ½ΠΎ Π½Π΅ запустился ΠΊΠΎΠ½Ρ‚Π΅ΠΉΠ½Π΅Ρ€ elasticsearch, Ρ‚Π°ΠΊ ΠΊΠ°ΠΊ Ρ‚Ρ€Π΅Π±ΠΎΠ²Π°Π»ΠΎΡΡŒ ΡƒΡΡ‚Π°Π½ΠΎΠ²ΠΈΡ‚ΡŒ Π·Π½Π°Ρ‡Π΅Π½ΠΈΠ΅ vm.max_map_count Π½Π΅ Π½ΠΈΠΆΠ΅ 262144. Если Π²ΠΎΠ·Π½ΠΈΠΊΠ°Π΅Ρ‚ такая ΠΏΡ€ΠΎΠ±Π»Π΅ΠΌΠ°, Ρ‚ΠΎ ΠΌΠΎΠΆΠ½ΠΎ Π²Ρ‹ΠΏΠΎΠ»Π½ΠΈΡ‚ΡŒ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΡƒΡŽ ΠΊΠΎΠΌΠ°Π½Π΄Ρƒ:

sudo sysctl -w vm.max_map_count=262144

ПослС Ρ‚ΠΎΠ³ΠΎ, ΠΊΠ°ΠΊ скрипт ΠΎΡ‚Ρ€Π°Π±ΠΎΡ‚Π°Π΅Ρ‚, ΠΌΠΎΠΆΠ½ΠΎ Π·Π°ΠΏΡƒΡΠΊΠ°Ρ‚ΡŒ микросСрвисы. Π‘Π°ΠΌΡ‹ΠΉ простой Π²Π°Ρ€ΠΈΠ°Π½Ρ‚ - Π·Π°ΠΏΡƒΡΡ‚ΠΈΡ‚ΡŒ Π² IntellijIdea всС микросСрвисы, ΠΊΡ€ΠΎΠΌΠ΅ populator. Populator слСдуСт Π·Π°ΠΏΡƒΡΡ‚ΠΈΡ‚ΡŒ Π² послСднюю ΠΎΡ‡Π΅Ρ€Π΅Π΄ΡŒ, Ρ‚Π°ΠΊ ΠΊΠ°ΠΊ ΠΎΠ½ Π±ΡƒΠ΄Π΅Ρ‚ постоянно Π³Π΅Π½Π΅Ρ€ΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ Π΄Π°Π½Π½Ρ‹Π΅ ΠΈ ΠΎΡ‚ΠΏΡ€Π°Π²Π»ΡΡ‚ΡŒ POST запросы Π² orders-service Π½Π° созданиС Π·Π°ΠΊΠ°Π·Π° (1 Π·Π°ΠΊΠ°Π· Π² сСкунду), Π° Ρ‚Π°ΠΊΠΆΠ΅ ΠΏΠΎΡΡ‹Π»Π°Ρ‚ΡŒ сообщСния Π² Kafka Ρ‚ΠΎΠΏΠΈΠΊΠΈ: warehouse-inventory.v1 ΠΈ payments.v1.

МоТно Π΄Π°Ρ‚ΡŒ сСрвису populator ΠΏΠΎΡ€Π°Π±ΠΎΡ‚Π°Ρ‚ΡŒ нСсколько сСкунд (15-30) ΠΈ послС этого ΠΎΡΡ‚Π°Π½ΠΎΠ²ΠΈΡ‚ΡŒ. Π§Ρ‚ΠΎΠ±Ρ‹ ΠΏΡ€ΠΎΡ‡ΠΈΡ‚Π°Ρ‚ΡŒ содСрТимоС Ρ‚ΠΎΠΏΠΈΠΊΠΎΠ² Kafka (послСдниС 5 сообщСний Π² Ρ‚ΠΎΠΏΠΈΠΊΠ΅) Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ Π²Ρ‹ΠΏΠΎΠ»Π½ΠΈΡ‚ΡŒ ΡΠ»Π΅Π΄ΡƒΡŽΡ‰ΠΈΠΉ скрипт (ΠΎΠΏΡΡ‚ΡŒ ΠΆΠ΅ Π² зависимости ΠΎΡ‚ Ρ‚ΠΎΠ³ΠΎ, ΠΊΠ°ΠΊΠΎΠΉ docker установлСн):

./read-topics-docker-new.sh

ΠΈΠ»ΠΈ

./read-dopics-docker-old.sh

Π’ ΠΊΠΎΠ½Ρ†Π΅ Π½Π΅ΠΎΠ±Ρ…ΠΎΠ΄ΠΈΠΌΠΎ ΠΎΡΡ‚Π°Π½ΠΎΠ²ΠΈΡ‚ΡŒ docker ΠΊΠΎΠ½Ρ‚Π΅ΠΉΠ½Π΅Ρ€Ρ‹:

docker compose down

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published