ΠΠ΅ΠΌΠΎ ΠΏΡΠΎΠ΅ΠΊΡ ΠΏΠΎ ΠΌΠΈΠΊΡΠΎΡΠ΅ΡΠ²ΠΈΡΠ½ΠΎΠΉ Π°ΡΡ ΠΈΡΠ΅ΠΊΡΡΡΠ΅ Π½Π° ΠΎΡΠ½ΠΎΠ²Π΅ ΡΠΎΠ±ΡΡΠΈΠΉ. ΠΡΠ½ΠΎΠ²Π°Π½ Π½Π° ΡΡΠ°ΡΡΡΡ Confluent:
- Kotlin
- Spring Boot
- Apache Kafka
- Kafka Streams
- ksqlDB
- Gradle
- Docker / docker-compose
- Avro Serialization
- Confluent Schema Registry
- Kafka Connect
- ElasticSearch
- SQLite
Π¦Π΅Π½ΡΡΠΎΠΌ ΡΠΈΡΡΠ΅ΠΌΡ ΠΌΠΈΠΊΡΠΎΡΠ΅ΡΠ²ΠΈΡΠΎΠ² ΡΠ²Π»ΡΠ΅ΡΡΡ Orders Service, ΠΊΠΎΡΠΎΡΡΠΉ ΠΏΡΠ΅Π΄ΠΎΡΡΠ°Π²Π»ΡΠ΅Ρ ΠΊΠ»ΠΈΠ΅Π½ΡΠ°ΠΌ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡΡ ΠΏΠΎΡΡΠ»Π°ΡΡ REST Π·Π°ΠΏΡΠΎΡΡ Π½Π°:
- ΡΠΎΠ΄Π½Π°Π½ΠΈΠ΅ Π·Π°ΠΊΠ°Π·ΠΎΠ² (POST http://localhost:8000/v1/orders)
- ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΠ΅ Π·Π°ΠΊΠ°Π·Π° ΠΏΠΎ Π΅Π³ΠΎ ID (GET http://localhost:8000/v1/orders/{id})
- ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΠ΅ Π²Π°Π»ΠΈΠ΄ΠΈΡΠΎΠ²Π°Π½Π½ΠΎΠ³ΠΎ Π·Π°ΠΊΠ°Π·Π° ΠΏΠΎ Π΅Π³ΠΎ ID (GET http://localhost:8000/v1/orders/{id}/validated)
ΠΡΠΈ ΡΠΎΠ·Π΄Π°Π½ΠΈΠΈ Π·Π°ΠΊΠ°Π·Π° Π² Kafka ΡΠΎΠΏΠΈΠΊ orders.v1
ΠΏΠΎΡΡΠ»Π°Π΅ΡΡΡ ΡΠΎΠΎΡΠ²Π΅ΡΡΡΠ²ΡΡΡΠ΅Π΅ ΡΠΎΠ±ΡΡΠΈΠ΅, ΠΎΠ½ΠΎ Π²ΡΡΠΈΡΡΠ²Π°Π΅ΡΡΡ ΡΠ°Π·Π»ΠΈΡΠ½ΡΠΌΠΈ ΡΠ΅ΡΠ²ΠΈΡΠ°ΠΌΠΈ, ΠΎΡΠ²Π΅ΡΠ°ΡΡΠΈΠΌΠΈ Π·Π° Π²Π°Π»ΠΈΠ΄Π°ΡΠΈΡ Π·Π°ΠΊΠ°Π·Π°: Inventory Service, Fraud Service ΠΈ Order Details Service. ΠΡΠΈ ΡΠ΅ΡΠ²ΠΈΡΡ ΠΏΠ°ΡΠ°Π»Π»Π΅Π»ΡΠ½ΠΎ ΠΈ Π½Π΅Π·Π°Π²ΠΈΡΠΈΠΌΠΎ Π΄ΡΡΠ³ ΠΎΡ Π΄ΡΡΠ³Π° ΠΎΡΡΡΠ΅ΡΡΠ²Π»ΡΡΡ Π²Π°Π»ΠΈΠ΄Π°ΡΠΈΡ Π·Π°ΠΊΠ°Π·Π°, Π° ΡΠ΅Π·ΡΠ»ΡΡΠ°Ρ PASS ΠΈΠ»ΠΈ FAIL Π·Π°ΠΏΠΈΡΡΠ²Π°ΡΡ Π² ΡΠΎΠΏΠΈΠΊ Kafka order-validations.v1
. Validation Aggregator Service Π²ΡΡΠΈΡΡΠ²Π°Π΅Ρ ΡΡΠΎΡ ΡΠΎΠΏΠΈΠΊ, Π°Π³Π³ΡΠ΅Π³ΠΈΡΡΠ΅Ρ ΡΠ΅Π·ΡΠ»ΡΡΠ°ΡΡ ΠΏΠΎ ΠΊΠ°ΠΆΠ΄ΠΎΠΌΡ Π·Π°ΠΊΠ°Π·Ρ ΠΈ ΠΎΡΠΏΡΠ°Π²Π»ΡΠ΅Ρ ΠΊΠΎΠ½Π΅ΡΠ½ΡΠΉ ΡΠ΅Π·ΡΠ»ΡΡΠ°Ρ Π² ΡΠΎΠΏΠΈΠΊ orders.v1
.
ΠΠ»Ρ ΡΠ΅Π°Π»ΠΈΠ·Π°ΡΠΈΠΈ GET Π·Π°ΠΏΡΠΎΡΠΎΠ² Orders Service ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΠ΅Ρ ΠΌΠ°ΡΠ΅ΡΠΈΠ°Π»ΠΈΠ·ΠΎΠ²Π°Π½Π½ΠΎΠ΅ ΠΏΡΠ΅Π΄ΡΡΠ°Π²Π»Π΅Π½ΠΈΠ΅ (materialized view), ΠΊΠΎΡΠΎΡΠΎΠ΅ Π²ΡΡΡΠΎΠ΅Π½ΠΎ (embedded) Π² ΠΊΠ°ΠΆΠ΄ΡΠΉ ΠΈΠ½ΡΡΠ°Π½Ρ Orders Service. ΠΡΠΎ ΠΌΠ°ΡΠ΅ΡΠΈΠ°Π»ΠΈΠ·ΠΎΠ²Π°Π½Π½ΠΎΠ΅ ΠΏΡΠ΅Π΄ΡΡΠ°Π²Π»Π΅Π½ΠΈΠ΅ ΡΠ΅Π°Π»ΠΈΠ·ΠΎΠ²Π°Π½ΠΎ Π½Π° ΠΎΡΠ½ΠΎΠ²Π΅ Kafka Streams queryable state store. Π’Π°ΠΊ ΠΊΠ°ΠΊ state store ΠΎΡΠ½ΠΎΠ²Π°Π½ Π½Π° ΡΠΎΠΏΠΈΠΊΠ°Ρ Kafka, ΠΈ Π²ΡΡΡΠΎΠ΅Π½ Π² ΠΊΠ°ΠΆΠ΄ΡΠΉ ΠΈΠ½ΡΡΠ°Π½Ρ, ΠΎΠ½ Ρ ΡΠ°Π½ΠΈΡ Π΄Π°Π½Π½ΡΠ΅ ΡΠΎΠ»ΡΠΊΠΎ ΡΠ΅Ρ ΠΏΠ°ΡΡΠΈΡΠΈΠΉ ΡΠΎΠΏΠΈΠΊΠ°, ΠΊΠΎΡΠΎΡΡΠ΅ Π²ΡΡΠΈΡΡΠ²Π°ΡΡΡΡ Π΄Π°Π½Π½ΡΠΌ ΠΈΠ½ΡΡΠ°Π½ΡΠΎΠΌ. ΠΠ΄Π½Π°ΠΊΠΎ Kafka Streams ΠΏΡΠ΅Π΄ΠΎΡΡΠ°Π²Π»ΡΠ΅Ρ API (Interactive Queries) Π΄Π»Ρ ΡΠΎΠ³ΠΎ, ΡΡΠΎΠ±Ρ "Π½Π°ΠΉΡΠΈ" ΠΈΠ½ΡΡΠ°Π½Ρ, Π½Π° ΠΊΠΎΡΠΎΡΠΎΠΌ Ρ ΡΠ°Π½ΡΡΡΡ Π΄Π°Π½Π½ΡΠ΅, Π΅ΡΠ»ΠΈ ΠΈΡ Π½Π΅Ρ Π»ΠΎΠΊΠ°Π»ΡΠ½ΠΎ. ΠΡΠΎ ΠΏΠΎΠ·Π²ΠΎΠ»ΡΠ΅Ρ ΡΠ΅Π°Π»ΠΈΠ·ΠΎΠ²Π°ΡΡ Π³Π°ΡΠ°Π½ΡΠΈΡ Π΄Π»Ρ ΠΊΠ»ΠΈΠ΅Π½ΡΠΎΠ² read-your-own-writes (ΡΡΠ΅Π½ΠΈΠ΅ ΡΠ²ΠΎΠΈΡ Π·Π°ΠΏΠΈΡΠ΅ΠΉ).
Π’Π°ΠΊΠΆΠ΅ Π² ΡΠΈΡΡΠ΅ΠΌΠ΅ Π΅ΡΡΡ ΠΏΡΠΎΡΡΠΎΠΉ ΡΠ΅ΡΠ²ΠΈΡ Π΄Π»Ρ ΠΎΡΠΏΡΠ°Π²ΠΊΠΈ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠΉ Π½Π° ΠΏΠΎΡΡΡ Email Service (Π² ΡΠ΅ΠΊΡΡΠ΅ΠΉ ΡΠ΅Π°Π»ΠΈΠ·Π°ΡΠΈΠΈ ΠΏΡΠΎΡΡΠΎ ΠΎΡΡΡΠ΅ΡΡΠ²Π»ΡΠ΅ΡΡΡ Π»ΠΎΠ³ΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ ΠΎΡΠΏΡΠ°Π²Π»ΡΠ΅ΠΌΡΡ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠΉ).
Π‘Π΅ΡΠ²ΠΈΡ Order Enrichment ΠΎΡΠ²Π΅ΡΠ°Π΅Ρ Π·Π° ΡΠΎΠ·Π΄Π°Π½ΠΈΠ΅ orders_enriched_stream
Π² ksqlDB
, ΠΈΠ· ΠΊΠΎΡΠΎΡΠΎΠ³ΠΎ ΠΌΠΎΠΆΠ½ΠΎ ΠΏΠΎΠ»ΡΡΠΈΡΡ Π΄Π°Π½Π½ΡΠ΅ ΠΎ ΠΏΠΎΠΊΡΠΏΠ°ΡΠ΅Π»Π΅ ΠΈ Π΅Π³ΠΎ Π·Π°ΠΊΠ°Π·Π΅. Π’Π°ΠΊΠΆΠ΅ ΡΡΠΎΡ ΡΠ΅ΡΠ²ΠΈΡ ΠΏΡΠ΅Π΄ΠΎΡΡΠ°Π²Π»ΡΠ΅Ρ REST ΡΡΡΠΊΡ Π΄Π»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ ΠΈΠ½ΡΠΎΡΠΌΠ°ΡΠΈΠΈ ΠΎ ΡΠΎΠ·ΠΌΠΎΠΆΠ½ΡΡ
ΠΌΠΎΡΠ΅Π½Π½ΠΈΡΠ΅ΡΠΊΠΈΡ
Π΄Π΅ΠΉΡΡΠ²ΠΈΡΡ
(Π΅ΡΠ»ΠΈ ΠΏΠΎΠΊΡΠΏΠ°ΡΠ΅Π»Ρ ΠΎΡΡΡΠ΅ΡΡΠ²ΠΈΠ» Π±ΠΎΠ»Π΅Π΅ 2 Π·Π°ΠΊΠ°Π·ΠΎΠ² Π·Π° 30 ΡΠ΅ΠΊΡΠ½Π΄):
ΠΠ°Π½Π½ΡΠ΅ ΠΎ ΠΏΠΎΠΊΡΠΏΠ°ΡΠ΅Π»ΡΡ
Π²ΡΡΠΈΡΡΠ²Π°ΡΡΡΡ ΠΈΠ· ΠΠ SQLite Ρ ΠΏΠΎΠΌΠΎΡΡΡ Kafka Connect io.confluent.connect.jdbc.JdbcSourceConnector
Π² ΡΠΎΠΏΠΈΠΊ Kafka customers. ΠΠΎΠ½ΡΠΈΠ³ΡΡΠ°ΡΠΈΡ ΠΊΠΎΠ½Π½Π΅ΠΊΡΠΎΡΠ°: connector_jdbc_customers_template.config
ΠΠ°Π½Π½ΡΠ΅ ΠΎ Π·Π°ΠΊΠ°Π·Π°Ρ
Π·Π°ΠΏΠΈΡΡΠ²Π°ΡΡΡΡ ΠΈΠ· ΡΠΎΠΏΠΈΠΊΠ° Kafka orders.v1
Π² ΠΈΠ½Π΄Π΅ΠΊΡ orders.v1
ElasticSearch Ρ ΠΏΠΎΠΌΠΎΡΡΡ Kafka Connect io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
. ΠΠΎΠ½ΡΠΈΠ³ΡΡΠ°ΡΠΈΡ ΠΊΠΎΠ½Π½Π΅ΠΊΡΠΎΡΠ°: connector_elasticsearch_template.config
ΠΡΠ΅ ΠΌΠΈΠΊΡΠΎΡΠ΅ΡΠ²ΠΈΡΡ Π½Π°ΠΏΠΈΡΠ°Π½Ρ Π½Π° Kotlin Ρ ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π½ΠΈΠ΅ΠΌ Kafka Streams ΠΈ Spring for Apache Kafka.
ΠΠ°Π½Π½ΡΠΉ ΠΌΠΎΠ΄ΡΠ»Ρ ΡΠΎΠ±ΡΠ°Π½ ΠΊΠ°ΠΊ Π±ΠΈΠ±Π»ΠΈΠΎΡΠ΅ΠΊΠ°, ΠΊΠΎΡΠΎΡΡΡ ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΡΡ ΠΌΠΈΠΊΡΠΎΡΠ΅ΡΠ²ΠΈΡΡ. Π‘ΠΎΠ΄Π΅ΡΠΆΠΈΡ ΡΠ³Π΅Π½Π΅ΡΠΈΡΠΎΠ²Π°Π½Π½ΡΠ΅ AVRO ΠΊΠ»Π°ΡΡΡ Π° ΡΠ°ΠΊΠΆΠ΅ ΡΡΠΈΠ»Π»ΠΈΡΠ½ΡΠ΅ ΠΊΠ»Π°ΡΡΡ, ΠΎΠ±Π»Π΅Π³ΡΠ°ΡΡΠΈΠ΅ ΡΠ°Π±ΠΎΡΡ Ρ ΡΠΎΠΏΠΈΠΊΠ°ΠΌΠΈ, ΡΠ΅ΡΠΈΠ°Π»ΠΈΠ·Π°ΡΠΎΡΠ°ΠΌΠΈ ΠΈ Π΄Π΅ΡΠ΅ΡΠΈΠ°Π»ΠΈΠ·Π°ΡΠΎΡΠ°ΠΌΠΈ.
ΠΠ΅Π½Π΅ΡΠ°ΡΠΈΡ ΡΡΡΠ½ΠΎΡΡΠ΅ΠΉ, Ρ ΠΊΠΎΡΠΎΡΡΠΌΠΈ ΡΠ°Π±ΠΎΡΠ°Π΅Ρ Kafka ΠΎΡΡΡΠ΅ΡΡΠ²Π»ΡΠ΅ΡΡΡ Π½Π° ΠΎΡΠ½ΠΎΠ²Π΅ ΡΡ
Π΅ΠΌ Avro Ρ ΠΏΠΎΠΌΠΎΡΡΡ Gradle plugin id 'com.github.davidmc24.gradle.plugin.avro' version "1.9.1"
, Π½Π° ΡΡΠ°ΠΏΠ΅ ΡΠ±ΠΎΡΠΊΠΈ Π±ΠΈΠ±Π»ΠΈΠΎΡΠ΅ΠΊΠΈ common
. ΠΠΈΠΆΠ΅ ΠΏΡΠΈΠ²Π΅Π΄Π΅Π½ΠΎ ΠΎΠΏΠΈΡΠ°Π½ΠΈΠ΅ ΡΡΡΠ½ΠΎΡΡΠ΅ΠΉ Π±Π΅Π· Π΄ΠΎΠΏΠΎΠ»Π½ΠΈΡΠ΅Π»ΡΠ½ΠΎΠ³ΠΎ ΡΠ³Π΅Π½Π΅ΡΠΈΡΠΎΠ²Π°Π½Π½ΠΎΠ³ΠΎ ΠΊΠΎΠ΄Π°.
class Customer( val id: Long, val firstName: String, val lastName: String, val email: String, val address: String, val level: String = "bronze" // possible values: platinum, gold, silver, bronze ) enum class OrderState { CREATED, VALIDATED, FAILED, SHIPPED } enum class Product { JUMPERS, UNDERPANTS, STOCKINGS } class Order( val id: String, val customerId: Long, val state: OrderState, val product: Product, val quantity: Int, val price: Double ) class OrderValue( val order: Order, val value: Double ) class OrderEnriched( val id: String, val customerId: Long, val customerLevel: String ) enum class OrderValidationType { INVENTORY_CHECK, FRAUD_CHECK, ORDER_DETAILS_CHECK } enum class OrderValidationResult { PASS, FAIL, ERROR } class OrderValidation( val orderId: String, val checkType: OrderValidationType, val validationResult: OrderValidationResult ) class Payment( val id: String, val orderId: String, val ccy: String, val amount: Double )
Π Π΅Π³ΠΈΡΡΡΠ°ΡΠΈΡ ΡΡ
Π΅ΠΌ Avro Π² Confluent Schema Registry ΠΎΡΡΡΠ΅ΡΡΠ²Π»ΡΠ΅ΡΡΡ Ρ ΠΏΠΎΠΌΠΎΡΡΡ Gradle Plugin: id 'com.github.imflog.kafka-schema-registry-gradle-plugin' version "1.12.0"
ΠΏΠΎΡΠ»Π΅ ΡΠΎΠ³ΠΎ, ΠΊΠ°ΠΊ ΠΏΠΎΠ΄Π½ΡΡ docker ΠΊΠΎΠ½ΡΠ΅ΠΉΠ½Π΅Ρ schema-registry. Π§ΡΠΎΠ±Ρ Π·Π°ΡΠ΅Π³ΠΈΡΡΡΠΈΡΠΎΠ²Π°ΡΡ ΡΡ
Π΅ΠΌΡ Π½Π΅ΠΎΠ±Ρ
ΠΎΠ΄ΠΈΠΌΠΎ Π½Π°ΡΡΡΠΎΠΈΡΡ ΠΏΠ»Π°Π³ΠΈΠ½. ΠΠΈΠ½ΠΈΠΌΠ°Π»ΡΠ½Π°Ρ Π½Π°ΡΡΡΠΎΠΉΠΊΠ°:
schemaRegistry { url = 'http://localhost:8081' quiet = true register { subject('customers-value', 'common/src/main/avro/customer.avsc', 'AVRO') subject('orders.v1-value', 'common/src/main/avro/order.avsc', 'AVRO') subject('orders-enriched.v1-value', 'common/src/main/avro/orderenriched.avsc', 'AVRO') subject('order-validations.v1-value', 'common/src/main/avro/ordervalidation.avsc', 'AVRO') subject('payments.v1-value', 'common/src/main/avro/payment.avsc', 'AVRO') } }
ΠΡΠΈ ΡΠ΅Π³ΠΈΡΡΡΠ°ΡΠΈΠΈ ΡΠΊΠ°Π·ΡΠ²Π°ΡΡΡΡ:
- ΠΠ°Π·Π²Π°Π½ΠΈΠ΅ ΡΡ Π΅ΠΌΡ. ΠΠ΄Π½ΠΈΠΌ ΠΈΠ· ΠΏΠ°ΡΡΠ΅ΡΠ½ΠΎΠ² Π½Π°ΠΌΠ΅Π½ΠΎΠ²Π°Π½ΠΈΡ ΡΠ²Π»ΡΠ΅ΡΡΡ TopicName-value / TopicName-key
- ΠΡΡΡ Π΄ΠΎ ΡΠ°ΠΉΠ»Π° ΡΠΎ ΡΡ Π΅ΠΌΠΎΠΉ
- Π€ΠΎΡΠΌΠ°Ρ ΡΠ΅ΡΠΈΠ°Π»ΠΈΠ·Π°ΡΠΈΠΈ, Π² Π½Π°ΡΠ΅ΠΌ ΡΠ»ΡΡΠ°Π΅ - AVRO.
ΠΠ°Π»Π΅Π΅ Π½Π΅ΠΎΠ±Ρ ΠΎΠ΄ΠΈΠΌΠΎ Π·Π°ΠΏΡΡΡΠΈΡΡ ΡΠ°ΡΠΊΡ Gradle ΠΈΠ· ΠΊΠΎΡΠ½Π΅Π²ΠΎΠΉ Π΄ΠΈΡΠ΅ΠΊΡΠΎΡΠΈΠΈ:
./gradlew registerSchemaTask
ΠΠ»Ρ ΡΠ°Π±ΠΎΡΡ Ρ Π·Π°ΠΏΠΈΡΡΠΌΠΈ Π² ΡΠΎΠΏΠΈΠΊΠ°Ρ
ΠΏΡΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠ΅ Kafka Streams Π΄ΠΎΠ»ΠΆΠ½ΠΎ Π·Π½Π°ΡΡ ΠΎ ΡΠΎΡΠΌΠ°ΡΠ΅ ΡΠ΅ΡΠΈΠ°Π»ΠΈΠ·Π°ΡΠΈΠΈ Π΄Π°Π½Π½ΡΡ
. ΠΠ»Ρ ΡΡΠΎΠ³ΠΎ Π½Π°ΡΡΡΠ°ΠΈΠ²Π°ΡΡΡΡ ΡΠΏΠ΅ΡΠΈΠ°Π»ΡΠ½ΡΠ΅ org.apache.kafka.common.serialization.Serde
ΠΊΠ»Π°ΡΡΡ, ΠΊΠΎΡΠΎΡΡΠ΅ Π·Π½Π°ΡΡ ΠΊΠ°ΠΊ ΡΠ΅ΡΠΈΠ°Π»ΠΈΠ·ΠΎΠ²Π°ΡΡ/Π΄Π΅ΡΠ΅ΡΠΈΠ°Π»ΠΈΠ·ΠΎΠ²Π°ΡΡ ΠΊΠ»ΡΡ/Π·Π½Π°ΡΠ΅Π½ΠΈΠ΅. ΠΠ»Ρ ΡΠ΄ΠΎΠ±ΡΡΠ²Π° Π²ΡΠ΅ Π½Π°ΡΡΡΠΎΠΉΠΊΠΈ ΠΏΠΎΠΌΠ΅ΡΠ΅Π½Ρ Π² ΠΎΡΠ΄Π΅Π»ΡΠ½ΡΠΉ ΠΊΠ»Π°ΡΡ Schemas. ΠΠ»Ρ ΡΠ³Π΅Π½Π΅ΡΠΈΡΠΎΠ²Π°Π½Π½ΡΡ
AVRO ΠΊΠ»Π°ΡΡΠΎΠ² ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΠ΅ΡΡΡ io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
, ΠΏΡΠΈ Π½Π°ΡΡΡΠΎΠΉΠΊΠ΅ ΠΊΠΎΡΠΎΡΠΎΠ³ΠΎ ΡΠΊΠ°Π·ΡΠ²Π°Π΅ΡΡΡ Π°Π΄ΡΠ΅Ρ Schema Registry.
fun configureSerdes() { val config = hashMapOf<String, Any>( AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to topicsProps.schemaRegistryUrl ) for (topic in ALL.values) { configureSerde(topic.keySerde, config, true) configureSerde(topic.valueSerde, config, false) } configureSerde(ORDER_VALUE_SERDE, config, false) } fun configureSerde(serde: Serde<*>, config: Map<String, Any>, isKey: Boolean) { if (serde is SpecificAvroSerde) { serde.configure(config, isKey) } }
ΠΠ°Π²ΠΈΡΠΈΠΌΠΎΡΡΠΈ:
dependencies { implementation project(':common') implementation 'org.apache.kafka:kafka-streams' implementation 'org.jetbrains.kotlin:kotlin-reflect' implementation 'org.springframework.kafka:spring-kafka' implementation("io.confluent:kafka-avro-serializer:7.5.2") implementation("io.confluent:kafka-streams-avro-serde:7.5.2") implementation("org.apache.avro:avro:1.11.0") annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor' testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation 'org.springframework.boot:spring-boot-testcontainers' testImplementation 'org.testcontainers:junit-jupiter' testImplementation 'org.testcontainers:kafka' }
ΠΠ΅ΠΎΠ±Ρ
ΠΎΠ΄ΠΈΠΌΠΎ ΠΎΠ±ΡΠ°ΡΠ°ΡΡ Π²Π½ΠΈΠΌΠ°Π½ΠΈΠ΅ Π½Π° ΡΠΎΠΎΡΠ²Π΅ΡΡΡΠ²ΠΈΠ΅ Π²Π΅ΡΡΠΈΠΉ io.confluent:kafka-avro-serializer:7.5.2
ΠΈ io.confluent:kafka-streams-avro-serde:7.5.2
.
Π Spring Boot Π½Π΅Ρ Π½Π΅ΠΎΠ±Ρ
ΠΎΠ΄ΠΈΠΌΠΎΡΡΠΈ Π²ΡΡΡΠ½ΡΡ ΡΠΏΡΠ°Π²Π»ΡΡΡ ΠΆΠΈΠ·Π½Π΅Π½Π½ΡΠΌ ΡΠΈΠΊΠ»ΠΎΠΌ ΡΡΡΠ½ΠΎΡΡΠΈ KafkaStreams. ΠΠ»Ρ ΡΡΠΎΠ³ΠΎ Π΅ΡΡΡ Π°Π±ΡΡΡΠ°ΠΊΡΠΈΡ org.springframework.kafka.config.StreamsBuilderFactoryBean
, ΠΊΠΎΡΠΎΡΠ°Ρ ΡΠΎΠ·Π΄Π°Π΅Ρ ΠΈ ΡΠΏΡΠ°Π²Π»ΡΠ΅Ρ org.apache.kafka.streams.KafkaStreams
ΠΈ org.apache.kafka.streams.StreamsBuilder
. Π’Π΅ΠΌ Π½Π΅ ΠΌΠ΅Π½Π΅Π΅, Π½Π°ΠΌ Π½Π΅ΠΎΠ±Ρ
ΠΎΠ΄ΠΈΠΌΠΎ ΡΠΊΠΎΠ½ΡΠΈΠ³ΡΡΠΈΡΠΎΠ²Π°ΡΡ Π±ΠΈΠ½ org.springframework.kafka.config.KafkaStreamsConfiguration
:
@EnableKafkaStreams @Configuration class KafkaConfig( private val kafkaProps: KafkaProps, private val serviceUtils: ServiceUtils, private val topicProps: TopicsProps ) { @Bean fun outstandingRequests(): MutableMap<String, FilteredResponse<String, Order, OrderDto>> { return ConcurrentHashMap() } @Bean(name = [KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME]) fun kStreamsConfig(): KafkaStreamsConfiguration { val props = hashMapOf<String, Any>( StreamsConfig.APPLICATION_ID_CONFIG to kafkaProps.appId, // must be specified to enable InteractiveQueries and checking metadata of Kafka Cluster StreamsConfig.APPLICATION_SERVER_CONFIG to serviceUtils.getServerAddress(), StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProps.bootstrapServers, StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG to Serdes.String().javaClass.name, // instances MUST have different stateDir StreamsConfig.STATE_DIR_CONFIG to kafkaProps.stateDir, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to kafkaProps.autoOffsetReset, StreamsConfig.PROCESSING_GUARANTEE_CONFIG to kafkaProps.processingGuarantee, StreamsConfig.COMMIT_INTERVAL_MS_CONFIG to kafkaProps.commitInterval, AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to topicProps.schemaRegistryUrl, StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG) to kafkaProps.sessionTimeout ) return KafkaStreamsConfiguration(props) } }
ΠΠ»Ρ ΡΠΎΠ³ΠΎ, ΡΡΠΎΠ±Ρ ΡΠ°Π±ΠΎΡΠ°Π»ΠΈ Interactive Queries (Π΅ΡΠ»ΠΈ ΠΌΡ Ρ
ΠΎΡΠΈΠΌ Π·Π°ΠΏΡΠ°ΡΠΈΠ²Π°ΡΡ Π΄Π°Π½Π½ΡΠ΅ Π½Π΅ ΡΠΎΠ»ΡΠΊΠΎ ΠΈΠ· Π»ΠΎΠΊΠ°Π»ΡΠ½ΠΎΠ³ΠΎ state store, Π½ΠΎ ΠΈ Ρ Π΄ΡΡΠ³ΠΈΡ
ΠΈΠ½ΡΡΠ°Π½ΡΠΎΠ²), Π½Π΅ΠΎΠ±Ρ
ΠΎΠ΄ΠΈΠΌΠΎ Π² ΠΊΠΎΠ½ΡΠΈΠ³ΡΡΠ°ΡΠΈΡΡ
ΡΠΊΠ°Π·Π°ΡΡ StreamsConfig.APPLICATION_SERVER_CONFIG
- Π°Π΄ΡΠ΅Ρ ΡΠ΅ΠΊΡΡΠ΅Π³ΠΎ ΠΈΠ½ΡΡΠ°Π½ΡΠ° Π² ΡΠΎΡΠΌΠ°ΡΠ΅ host:port
. Π’Π°ΠΊΠΆΠ΅ ΠΌΡ ΡΠΊΠ°Π·ΡΠ²Π°Π΅ΠΌ Π°Π΄ΡΠ΅Ρ Schema Registry, ΠΏΡΡΡ Π΄ΠΎ ΠΊΠ°ΡΠ°Π»ΠΎΠ³Π°, Π³Π΄Π΅ Π±ΡΠ΄Π΅Ρ Ρ
ΡΠ°Π½ΠΈΡΡΡ state store ΠΈ Ρ.Π΄. ΠΠΎ ΡΠΌΠΎΠ»ΡΠ°Π½ΠΈΡ Kafka Streams ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΠ΅Ρ Π²ΡΡΡΠΎΠ΅Π½Π½ΠΎΠ΅ key-value Ρ
ΡΠ°Π½ΠΈΠ»ΠΈΡΠ΅ RocksDB Π² ΠΊΠ°ΡΠ΅ΡΡΠ²Π΅ state store.
ΠΡΠΈ ΡΡΠ°ΡΡΠ΅ ΠΏΡΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΡ Orders Service Π½Π° ΠΎΡΠ½ΠΎΠ²Π΅ ΡΠΎΠΏΠΈΠΊΠ° Kafka orders.v1
ΡΠΎΠ·Π΄Π°Π΅ΡΡΡ KTable - Π°Π±ΡΡΡΠ°ΠΊΡΠΈΡ Kafka Streams, ΠΊΠΎΡΠΎΡΠ°Ρ ΠΏΡΠ΅Π΄ΡΡΠ°Π²Π»ΡΠ΅Ρ ΠΈΠ· ΡΠ΅Π±Ρ snapshot ΡΠΎΡΡΠΎΡΠ½ΠΈΡ (ΠΌΠΎΠΆΠ½ΠΎ ΠΏΡΠΎΠ²Π΅ΡΡΠΈ Π°Π½Π°Π»ΠΎΠ³ΠΈΡ Ρ compacted topic) - Ρ
ΡΠ°Π½ΡΡΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½ΠΈΠ΅ Π·Π°ΠΏΠΈΡΠΈ ΠΏΠΎ ΠΊΠ»ΡΡΡ Π² ΡΠΎΠΏΠΈΠΊΠ΅. ΠΡΠΈ ΠΏΠΎΡΡΡΠΏΠ»Π΅Π½ΠΈΠΈ Π½ΠΎΠ²ΡΡ
Π·Π°ΠΏΠΈΡΠ΅ΠΉ Π΄Π°Π½Π½ΡΠ΅ Π² KTable ΠΎΠ±Π½ΠΎΠ²Π»ΡΡΡΡΡ, ΡΠ°ΠΊΡΠΈΡΠ΅ΡΠΊΠΈ ΠΏΡΠΎΠΈΠ·Π²ΠΎΠ΄ΠΈΡΡΡ ΠΎΠΏΠ΅ΡΠ°ΡΠΈΡ UPSERT
- Π΅ΡΠ»ΠΈ ΠΊΠ»ΡΡ Π΅ΡΡΡ Π² ΡΠ°Π±Π»ΠΈΡΠ΅, Π΄Π°Π½Π½ΡΠ΅ ΠΎΠ±Π½ΠΎΠ²Π»ΡΡΡΡΡ, Π΅ΡΠ»ΠΈ Π½Π΅Ρ - ΡΠΎΠ·Π΄Π°ΡΡΡΡ, Π΅ΡΠ»ΠΈ ΠΏΡΠΈΡ
ΠΎΠ΄ΠΈΡ ΠΊΠ»ΡΡ - null
- ΡΠΎ ΡΡΠΎ ΠΎΠ·Π½Π°ΡΠ°Π΅Ρ ΡΠ΄Π°Π»Π΅Π½ΠΈΠ΅ Π΄Π°Π½Π½ΡΡ
, Π±ΠΎΠ»Π΅Π΅ ΠΎΠ½ΠΈ Π½Π΅ Π±ΡΠ΄ΡΡ ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°ΡΡΡΡ ΠΏΡΠΈ Π²ΡΠ±ΠΎΡΠΊΠ΅ ΠΏΠΎ ΠΊΠ»ΡΡΡ ΠΈΠ»ΠΈ ΡΠΎΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΠΈ ΡΠ°Π±Π»ΠΈΡΡ Ρ Π΄ΡΡΠ³ΠΈΠΌΠΈ ΡΡΡΠ½ΠΎΡΡΡΠΌΠΈ KafkaStreams (KStream, GlobalKTable). ΠΡΠΈ ΡΡΠΎΠΌ Kafka Streams Π³Π°ΡΠ°Π½ΡΠΈΡΡΠ΅Ρ, ΡΡΠΎ Π² ΡΠ»ΡΡΠ°Π΅ ΠΏΠ°Π΄Π΅Π½ΠΈΡ ΠΈΠ½ΡΡΠ°Π½ΡΠ°, Π΄Π°Π½Π½ΡΠ΅ Π½Π΅ Π±ΡΠ΄ΡΡ ΠΏΠΎΡΠ΅ΡΡΠ½Ρ, ΡΠ°ΠΊ ΠΊΠ°ΠΊ ΠΎΠ½ΠΈ ΠΏΠΎΠΌΠΈΠΌΠΎ Π»ΠΎΠΊΠ°Π»ΡΠ½ΠΎΠ³ΠΎ state store, Ρ
ΡΠ°Π½ΡΡΡΡ Π² change log ΡΠΎΠΏΠΈΠΊΠ΅ Kafka -Π΄Π»Ρ ΡΠΎΠΊΡΠ°ΡΠ΅Π½ΠΈΡ ΠΏΠΎΡΡΠ΅Π±Π»Π΅Π½ΠΈΡ ΡΠ΅ΡΡΡΡΠΎΠ² ΠΏΠΎ ΠΏΠ°ΠΌΡΡΠΈ ΡΡΠΎΡ ΡΠΎΠΏΠΈΠΊ - compacted.
ΠΠ»Ρ ΡΠΎΠ³ΠΎ, ΡΡΠΎΠ±Ρ ΠΏΡΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠ΅ ΠΈΠΌΠ΅Π»ΠΎ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡΡ ΠΏΠΎΠ»ΡΡΠ°ΡΡ Π΄Π°Π½Π½ΡΠ΅ ΠΈΠ· state store, Π½Π΅ΠΎΠ±Ρ ΠΎΠ΄ΠΈΠΌΠΎ Π΅Π³ΠΎ ΠΌΠ°ΡΠ΅ΡΠΈΠ°Π»ΠΈΠ·ΠΎΠ²Π°ΡΡ, ΡΠΊΠ°Π·Π°Π² Π½Π°Π·Π²Π°Π½ΠΈΠ΅, ΠΏΠΎ ΠΊΠΎΡΠΎΡΠΎΠΌΡ Π² Π΄Π°Π»ΡΠ½Π΅ΠΉΡΠ΅ΠΌ ΠΊ Π½Π΅ΠΌΡ ΠΌΠΎΠΆΠ½ΠΎ ΠΎΠ±ΡΠ°ΡΠ°ΡΡΡΡ.
@Component class OrdersServiceTopology( private val orderProps: OrdersProps, private val mapper: OrderMapper, private val schemas: Schemas, private val outstandingRequests: MutableMap<String, FilteredResponse<String, Order, OrderDto>> ) { @Autowired fun ordersTableTopology(builder: StreamsBuilder) { log.info("Calling OrdersServiceTopology.ordersTableTopology()") builder.table( orderProps.topic, Consumed.with(schemas.ORDERS.keySerde, schemas.ORDERS.valueSerde), Materialized.`as`(orderProps.storeName) ).toStream().foreach(::maybeCompleteLongPollGet) } private fun maybeCompleteLongPollGet(id: String, order: Order) { val callback = outstandingRequests[id] if (callback?.asyncResponse?.isSetOrExpired == true) { outstandingRequests.remove(id) } else if (callback != null && callback.predicate(id, order)) { callback.asyncResponse.setResult(mapper.toDto(order)) outstandingRequests.remove(id) } } }
ΠΠ΅ΡΠΎΠ΄ OrdersServiceTopology.maybeCompleteLongPollGet(String, Order)
ΠΏΠΎΠ»ΡΡΠ°Π΅Ρ ΠΏΡΠΈΡ
ΠΎΠ΄ΡΡΠΈΠ΅ Π² KTable Π΄Π°Π½Π½ΡΠ΅ ΠΎ Π·Π°ΠΊΠ°Π·Π°Ρ
, ΠΈ Π΅ΡΠ»ΠΈ Π½ΠΎΠ²ΡΠΉ Π·Π°ΠΊΠ°Π· Ρ
ΡΠ°Π½ΠΈΡΡΡ Π² ConcurrentHashMap, ΠΈ ΠΎΠ½ ΡΠ΄ΠΎΠ²Π»Π΅ΡΠ²ΠΎΡΡΠ΅Ρ ΡΡΠ»ΠΎΠ²ΠΈΡ FilteredResponse.predicate, ΡΠΎ ΡΠΎΠ³Π΄Π° ΠΌΡ ΠΊΠΎΠΌΠΏΠ»ΠΈΡΠΈΠΌ DeferredResult ΠΈ ΡΠ΄Π°Π»ΡΠ΅ΠΌ Π΅Π³ΠΎ ΠΈΠ· ConcurrentHashMap.
ΠΠ»Π°ΡΡ org.springframework.web.context.request.async.DeferredResult
ΠΏΠΎΠ·Π²ΠΎΠ»ΡΠ΅Ρ Π°ΡΠΈΠ½Ρ
ΡΠΎΠ½Π½ΠΎ ΠΎΠ±ΡΠ°Π±ΠΎΡΠ°ΡΡ REST Π·Π°ΠΏΡΠΎΡ. Π Orders Service ΠΎΠ½ ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΠ΅ΡΡΡ ΡΠ»Π΅Π΄ΡΡΡΠΈΠΌ ΠΎΠ±ΡΠ°Π·ΠΎΠΌ: Π² ΠΊΠ°ΡΠ΅ΡΡΠ²Π΅ Π²ΠΎΠ·Π²ΡΠ°ΡΠ°Π΅ΠΌΠΎΠ³ΠΎ Π·Π½Π°ΡΠ΅Π½ΠΈΡ Π² ΠΊΠΎΠ½ΡΡΠΎΠ»Π»Π΅ΡΠ΅ ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΠ΅ΡΡΡ DeferredResult. ΠΠΎΡΠΎΠΊ, ΠΏΡΠΈΠ½ΡΠ²ΡΠΈΠΉ Π·Π°ΠΏΡΠΎΡ ΠΌΠΎΠΆΠ½ΠΎ ΠΎΡΠ²ΠΎΠ±ΠΎΠ΄ΠΈΡΡ, Π° ΠΏΡΠΎΡΡΠ°Π²ΠΈΡΡ Π·Π½Π°ΡΠ΅Π½ΠΈΠ΅ Π² DeferredResult ΠΌΠΎΠΆΠ½ΠΎ ΠΈΠ· Π΄ΡΡΠ³ΠΎΠ³ΠΎ ΠΏΠΎΡΠΎΠΊΠ°. ΠΡΠΈ ΡΡΠΎΠΌ ΠΌΠΎΠΆΠ½ΠΎ ΡΠΊΠ°Π·Π°ΡΡ ΡΠ°ΠΉΠΌΠ°ΡΡ, ΠΏΠΎΡΠ»Π΅ ΠΊΠΎΡΠΎΡΠΎΠ³ΠΎ DeferredResult Π²Π΅ΡΠ½Π΅Ρ ΠΎΡΠΈΠ±ΠΊΡ. ΠΡΠΈ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΠΈ Π·Π°ΠΊΠ°Π·Π° - getOrder(id)
- Π΅ΡΠ»ΠΈ Π½ΠΈ Π² Π»ΠΎΠΊΠ°Π»ΡΠ½ΠΎΠΌ state store, Π½ΠΈ Π² state store Π½Π° Π΄ΡΡΠ³ΠΈΡ
ΠΈΠ½ΡΡΠ°Π½ΡΠ°Ρ
Π½Π΅Ρ Π·Π°ΠΊΠ°Π·Π° Ρ Π½ΡΠΆΠ½ΡΠΌ ID, ΠΌΡ ΠΏΠΎΠΌΠ΅ΡΠ°Π΅ΠΌ DeferredResult Π² ConcurrentHashMap.
ΠΡΠΈΠΌΠ΅Ρ ΠΌΠ΅ΡΠΎΠ΄Π° Π² ΠΊΠΎΠ½ΡΡΠΎΠ»Π»Π΅ΡΠ΅:
@GetMapping("/{id}") fun getOrder( @PathVariable("id") id: String, @RequestParam("timeout", defaultValue = "2000") timeout: Long ): DeferredResult<OrderDto> { log.info("Received request to GET order with id = {}", id) val deferredResult = DeferredResult<OrderDto>(timeout) ordersService.getOrderDto(id, deferredResult, timeout) return deferredResult }
ΠΡΠΈΠΌΠ΅Ρ ΡΠ°Π±ΠΎΡΡ Ρ DeferredResult:
fun getOrderDto(id: String, asyncResponse: DeferredResult<OrderDto>, timeout: Long) { initOrderStore() CompletableFuture.runAsync { val hostForKey = getKeyLocationOrBlock(id, asyncResponse) ?: return@runAsync //request timed out so return if (thisHost(hostForKey)) { fetchLocal(id, asyncResponse) { _, _ -> true } } else { val path = Paths(hostForKey.host, hostForKey.port).urlGet(id) fetchFromOtherHost(path, asyncResponse, timeout) } } } private fun fetchLocal( id: String, asyncResponse: DeferredResult<OrderDto>, predicate: (String, Order) -> Boolean ) { log.info("running GET on this node") try { val order = orderStore.get(id) if (order == null || !predicate(id, order)) { log.info("Delaying get as order not present for id $id") outstandingRequests[id] = FilteredResponse( asyncResponse = asyncResponse, predicate = predicate ) } else { asyncResponse.setResult(mapper.toDto(order)) } } catch (e: InvalidStateStoreException) { log.error("Exception while querying local state store. {}", e.message) outstandingRequests[id] = FilteredResponse(asyncResponse, predicate) } } private fun fetchFromOtherHost( path: String, asyncResponse: DeferredResult<OrderDto>, timeout: Long ) { log.info("Chaining GET to a different instance: {}", path) try { val order = webClientBuilder.build() .get() .uri("$path?timeout=$timeout") .retrieve() .bodyToMono<OrderDto>() .block() ?: throw NotFoundException("FetchFromOtherHost returned null for request: $path") asyncResponse.setResult(order) } catch (swallowed: Exception) { log.warn("FetchFromOtherHost failed.", swallowed) } }
ΠΠ»Ρ ΡΠΎΠ³ΠΎ, ΡΡΠΎΠ±Ρ ΠΎΠ±ΡΠ°ΡΠΈΡΡΡΡ ΠΊ state store, Π½Π°ΠΌ Π½Π΅ΠΎΠ±Ρ ΠΎΠ΄ΠΈΠΌΠΎ Π·Π°ΠΏΡΠΎΡΠΈΡΡ Π΅Π³ΠΎ ΠΈΠ· KafkaStreams. Π’ΡΡ ΡΡΠΎΠΈΡ ΠΎΠ±ΡΠ°ΡΠΈΡΡ Π²Π½ΠΈΠΌΠ°Π½ΠΈΠ΅ Π½Π° ΡΠΎ, ΡΡΠΎ Π½Π°ΠΌ ΠΏΡΠ΅Π΄ΠΎΡΡΠ°Π²Π»ΡΠ΅ΡΡΡ ΡΠΎΠ»ΡΠΊΠΎ read-only Ρ ΡΠ°Π½ΠΈΠ»ΠΈΡΠ΅.
private lateinit var orderStore: ReadOnlyKeyValueStore<String, Order> fun initOrderStore() { if (!::orderStore.isInitialized) { val streams = factoryBean.kafkaStreams ?: throw RuntimeException("Cannot obtain KafkaStreams instance.") orderStore = streams.store( StoreQueryParameters.fromNameAndType( ordersProps.storeName, QueryableStoreTypes.keyValueStore() ) ) } }
ΠΠ»Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ ΠΈΠ½ΡΠΎΡΠΌΠ°ΡΠΈΠΈ ΠΎ ΡΠΎΠΌ, Π½Π° ΠΊΠ°ΠΊΠΎΠΌ ΠΈΠ½ΡΡΠ°Π½ΡΠ΅ Ρ ΡΠ°Π½ΡΡΡΡ Π΄Π°Π½Π½ΡΠ΅ ΠΎ Π·Π°ΠΊΠ°Π·Π΅ Ρ ΠΊΠΎΠ½ΠΊΡΠ΅ΡΠ½ΡΠΌ ID, Kafka Streams ΠΏΡΠ΅Π΄ΠΎΡΡΠ°Π²Π»ΡΠ΅Ρ API:
/** * Find the metadata for the instance of this Kafka Streams Application that has the given * store and would have the given key if it exists. * @param store Store to find * @param key The key to find * @return {@link HostStoreInfo} */ fun <K> streamsMetadataForStoreAndKey( store: String, key: K, serializer: Serializer<K> ): HostStoreInfo { // Get metadata for the instances of this Kafka Streams application hosting the store and // potentially the value for key val metadata = factoryBean.kafkaStreams?.queryMetadataForKey(store, key, serializer) ?: throw NotFoundException("Metadata for store $store not found!") return HostStoreInfo( host = metadata.activeHost().host(), port = metadata.activeHost().port(), storeNames = hashSetOf(store) ) }
ΠΡΠΈ ΡΠΎΠ·Π΄Π°Π½ΠΈΠΈ Π·Π°ΠΊΠ°Π·Π°, Orders Service ΠΎΡΠΏΡΠ°Π²Π»ΡΠ΅Ρ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅ Π² ΡΠΎΠΏΠΈΠΊ Kafka orders.v1
Ρ ΠΏΠΎΠΌΠΎΡΡΡ KafkaTemplate<String, Order>
. ΠΠ°ΡΡΡΠΎΠΉΠΊΠΈ KafkaProducer Π² Π΄Π°Π½Π½ΠΎΠΌ ΡΠ»ΡΡΠ°Π΅ ΡΠΊΠ°Π·ΡΠ²Π°ΡΡΡΡ Π² application.yml ΡΠ°ΠΉΠ»Π΅:
spring: application: name: order-service kafka: bootstrap-servers: ${kafkaprops.bootstrapServers} producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer bootstrap-servers: ${kafkaprops.bootstrapServers} acks: ${kafkaprops.acks} client-id: ${kafkaprops.clientId} properties: enable.idempotence: ${kafkaprops.enableIdempotence} schema.registry.url: ${kafkaprops.schemaRegistryUrl} auto.register.schemas: false use.latest.version: true
ΠΡΠΎΠ±ΠΎΠ΅ Π²Π½ΠΈΠΌΠ°Π½ΠΈΠ΅ ΡΡΠΎΠΈΡ ΠΎΠ±ΡΠ°ΡΠΈΡΡ Π½Π° Π½Π°ΡΡΡΠΎΠΉΠΊΠΈ auto.register.schema: false
ΠΈ use.latest.version: true
. ΠΠΎ ΡΠΌΠΎΠ»ΡΠ°Π½ΠΈΡ ΠΎΠ½ΠΈ Π²ΡΡΡΠ°Π²Π»Π΅Π½Ρ Π½Π°ΠΎΠ±ΠΎΡΠΎΡ. Π Π΅ΡΠ»ΠΈ ΠΎΡΡΠ°Π²ΠΈΡΡ ΠΈΡ
Π±Π΅Π· ΠΈΠ·ΠΌΠ΅Π½Π΅Π½ΠΈΡ, ΡΠΎ ΠΌΠΎΠ³ΡΡ Π²ΠΎΠ·Π½ΠΈΠΊΠ½ΡΡΡ ΠΎΡΠΈΠ±ΠΊΠΈ ΠΏΡΠΈ ΡΠ΅ΡΠΈΠ°Π»ΠΈΠ·Π°ΡΠΈΠΈ / Π΄Π΅ΡΠ΅ΡΠΈΠ°Π»ΠΈΠ·Π°ΡΠΈΠΈ Π΄Π°Π½Π½ΡΡ
ΠΈΠ·-Π·Π° Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΠΉ Π½Π΅Π²ΠΎΡΠΌΠ΅ΡΡΠΈΠΌΠΎΡΡΠΈ ΡΡ
Π΅ΠΌ AVRO. Confluent ΡΠ΅ΠΊΠΎΠΌΠ΅Π½Π΄ΡΠ΅Ρ Π² ΠΏΡΠΎΠ΄Π΅ Π½Π΅ ΠΏΡΠ΅Π΄ΠΎΡΡΠ°Π²Π»ΡΡΡ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡΡ ΠΏΡΠΎΠ΄ΡΡΡΠ΅ΡΠ°ΠΌ Π°Π²ΡΠΎΠΌΠ°ΡΠΈΡΡΠΊΠΈ ΡΠ΅Π³ΠΈΡΡΡΠΈΡΠΎΠ²Π°ΡΡ ΡΡ
Π΅ΠΌΡ.
Π‘Π΅ΡΠ²ΠΈΡ ΠΎΡΠ²Π΅ΡΠ°Π΅Ρ Π·Π° ΠΏΡΠΎΠ²Π΅ΡΠΊΡ Π΄Π΅ΡΠ°Π»Π΅ΠΉ Π·Π°ΠΊΠ°Π·Π°:
- ΠΊΠΎΠ»ΠΈΡΠ΅ΡΡΠ²ΠΎ ΡΠΎΠ²Π°ΡΠΎΠ² Π½Π΅ ΠΌΠ΅Π½ΡΡΠ΅ 0
- ΡΡΠΎΠΈΠΌΠΎΡΡΡ Π·Π°ΠΊΠ°Π·Π° Π½Π΅ ΠΌΠ΅Π½ΡΡΠ΅ 0
- ΡΠΎΠ²Π°Ρ ΠΏΡΠΈΡΡΡΡΡΠ²ΡΠ΅Ρ Π² Π·Π°ΠΊΠ°Π·Π΅
Π‘ΠΎΠΎΠ±ΡΠ΅Π½ΠΈΡ Π²ΡΡΠΈΡΡΠ²Π°ΡΡΡΡ ΠΈΠ· ΡΠΎΠΏΠΈΠΊΠ° Kafka orders.v1
, Π·Π°ΠΊΠ°Π· ΠΏΡΠΎΠ²Π΅ΡΡΠ΅ΡΡΡ, Π° ΡΠ΅Π·ΡΠ»ΡΡΠ°Ρ ΠΏΡΠΎΠ²Π΅ΡΠΊΠΈ ΠΎΡΠΏΡΠ°Π²Π»ΡΠ΅ΡΡΡ Π² ΡΠΎΠΏΠΈΠΊ Kafka order-validations.v1
. Π ΡΡΠΎΠΌ ΡΠ΅ΡΠ²ΠΈΡΠ΅ Π½Π΅ ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΠ΅ΡΡΡ Kafka Streams, Π° ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΡΡΡΡ ΡΡΠ°Π½Π΄Π°ΡΡΠ½ΡΠ΅ Kafka Producer / Consumer.
ΠΠΎΠ½ΡΠΈΠ³ΡΡΠ°ΡΠΈΡ Kafka Consumer:
@Bean fun kafkaListenerContainerFactory( consumerFactory: ConsumerFactory<String, Order> ): ConcurrentKafkaListenerContainerFactory<String, Order> { val factory = ConcurrentKafkaListenerContainerFactory<String, Order>() factory.consumerFactory = consumerFactory return factory } @Bean fun consumerFactory(): ConsumerFactory<String, Order> { return DefaultKafkaConsumerFactory(consumerProps()) } private fun consumerProps(): Map<String, Any> { val props = hashMapOf<String, Any>( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProps.bootstrapServers, ConsumerConfig.GROUP_ID_CONFIG to kafkaProps.appId, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to kafkaProps.autoOffsetReset, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to !kafkaProps.enableExactlyOnce, ConsumerConfig.CLIENT_ID_CONFIG to kafkaProps.appId, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to schemas.ORDERS.keySerde.deserializer()::class.java, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to schemas.ORDERS.valueSerde.deserializer()::class.java, AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to kafkaProps.schemaRegistryUrl, "specific.avro.reader" to true ) return props }
ΠΠ΄Π΅ΡΡ Π²Π°ΠΆΠ½ΠΎ ΠΎΠ±ΡΠ°ΡΠΈΡΡ Π²Π½ΠΈΠΌΠ°Π½ΠΈΠ΅ Π½Π° ΠΊΠΎΠ½ΡΠΈΠ³ΡΡΠ°ΡΠΈΡ specific.avro.reader: true
. ΠΠ½Π° ΡΠΎΠΎΠ±ΡΠ°Π΅Ρ ΠΊΠΎΠ½ΡΡΡΠΌΠ΅ΡΡ, ΡΡΠΎ ΠΎΠ½ Π²ΡΡΠΈΡΡΠ²Π°Π΅Ρ Π½Π΅ ΠΎΠ±ΠΎΠ±ΡΠ΅Π½Π½ΡΠΉ AVRO ΡΠΈΠΏ Π·Π°ΠΏΠΈΡΠΈ (GenericAvroRecord), Π° ΠΊΠΎΠ½ΠΊΡΠ΅ΡΠ½ΡΠΉ ΡΠΈΠΏ SpecificAvroRecord, ΠΈ ΠΈΠΌΠ΅Π΅Ρ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡΡ ΠΏΠΎΠ»ΡΡΠ°ΡΡ Π΄ΠΎΡΡΡΠΏ ΠΊ ΠΏΠΎΠ»ΡΠΌ ΠΈ ΠΌΠ΅ΡΠΎΠ΄Π°ΠΌ Π±Π΅Π· Π½Π΅ΠΎΠ±Ρ
ΠΎΠ΄ΠΈΠΌΠΎΡΡΠΈ ΠΊΠ°ΡΡΠΈΡΡ ΠΊ ΠΊΠΎΠ½ΠΊΡΠ΅ΡΠ½ΠΎΠΌΡ ΡΠΈΠΏΡ.
ΠΠΎΠ½ΡΠΈΠ³ΡΡΠ°ΡΠΈΡ Kafka Producer:
@Bean fun producerFactory(): ProducerFactory<String, OrderValidation> { return DefaultKafkaProducerFactory(senderProps()) } private fun senderProps(): Map<String, Any> { val props = hashMapOf<String, Any>( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProps.bootstrapServers, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to kafkaProps.enableIdempotence, ProducerConfig.RETRIES_CONFIG to Int.MAX_VALUE.toString(), ProducerConfig.ACKS_CONFIG to kafkaProps.acks, ProducerConfig.CLIENT_ID_CONFIG to kafkaProps.appId, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to schemas.ORDER_VALIDATIONS.valueSerde.serializer().javaClass.name, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to schemas.ORDER_VALIDATIONS.keySerde.serializer().javaClass.name, AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to kafkaProps.schemaRegistryUrl, AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS to false, AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION to true, ) if (kafkaProps.enableExactlyOnce) { // attempt to provide a unique transactional ID which is a recommended practice props[ProducerConfig.TRANSACTIONAL_ID_CONFIG] = "${kafkaProps.appId}-${applicationProps.port}" } return props } @Bean fun kafkaTemplate( producerFactory: ProducerFactory<String, OrderValidation> ): KafkaTemplate<String, OrderValidation> { return KafkaTemplate(producerFactory) }
Π’ΡΡ ΡΡΠΎΠΈΡ ΠΎΠ±ΡΠ°ΡΠΈΡΡ Π²Π½ΠΈΠΌΠ°Π½ΠΈΠ΅ Π½Π° Π΄Π²Π΅ Π²Π΅ΡΠΈ. ΠΠ΅ΡΠ²Π°Ρ, ΠΊΠ°ΠΊ ΡΠΆΠ΅ Π±ΡΠ»ΠΎ ΡΠΊΠ°Π·Π°Π½ΠΎ Π²ΡΡΠ΅ - ΠΌΡ Π·Π°ΠΏΡΠ΅ΡΠ°Π΅ΠΌ ΠΏΡΠΎΠ΄ΡΡΡΠ΅ΡΡ Π°Π²ΡΠΎΠΌΠ°ΡΠΈΡΠ΅ΡΠΊΠΈ ΡΠ΅Π³ΠΈΡΡΡΠΈΡΠΎΠ²Π°ΡΡ ΡΡ
Π΅ΠΌΡ Π² Schema Registry, Π° ΡΠ°ΠΊΠΆΠ΅ ΡΠΊΠ°Π·ΡΠ²Π°Π΅ΠΌ ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°ΡΡ ΠΏΠΎΡΠ»Π΅Π΄Π½ΡΡ Π²Π΅ΡΡΠΈΡ ΡΡ
Π΅ΠΌΡ ΠΏΡΠΈ ΡΠ΅ΡΠΈΠ°Π»ΠΈΠ·Π°ΡΠΈΠΈ Π΄Π°Π½Π½ΡΡ
. ΠΡΠΎΡΠΎΠΉ ΠΌΠΎΠΌΠ΅Π½Ρ ΠΊΠ°ΡΠ°Π΅ΡΡΡ Π½Π°ΡΡΡΠΎΠΉΠΊΠΈ ProducerConfig.TRANSACTIONAL_ID_CONFIG
, ΡΡΠ° Π½Π°ΡΡΡΠΎΠΉΠΊΠ° Π΄ΠΎΠ»ΠΆΠ½Π° Π±ΡΡΡ ΡΠ½ΠΈΠΊΠ°Π»ΡΠ½ΠΎΠΉ Π΄Π»Ρ ΠΊΠ°ΠΆΠ΄ΠΎΠ³ΠΎ ΠΈΠ½ΡΡΠ°Π½ΡΠ° ΠΏΡΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΡ, ΡΡΠΎΠ±Ρ Kafka ΠΌΠΎΠ³Π»Π° Π³Π°ΡΠ°Π½ΡΠΈΡΠΎΠ²Π°ΡΡ ΠΊΠΎΡΡΠ΅ΠΊΡΠ½ΡΡ ΡΠ°Π±ΠΎΡΡ Ρ ΡΡΠ°Π½Π·Π°ΠΊΡΠΈΡΠΌΠΈ Π² ΡΠ»ΡΡΠ°Π΅ ΠΏΠ°Π΄Π΅Π½ΠΈΡ ΠΏΡΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΡ. ΠΡΠΈ ΡΡΠΎΠΌ Π²Π°ΠΆΠ½ΠΎ ΠΏΠΎΠ½ΠΈΠΌΠ°ΡΡ, ΡΡΠΎ ΡΠ½ΠΈΠΊΠ°Π»ΡΠ½ΡΠΉ - Π½Π΅ Π·Π½Π°ΡΠΈΡ ΡΠ°Π½Π΄ΠΎΠΌΠ½ΡΠΉ. Π’ΠΎ Π΅ΡΡΡ, Π·Π° ΠΈΠ½ΡΡΠ°Π½ΡΠΎΠΌ Π΄ΠΎΠ»ΠΆΠ΅Π½ Π±ΡΡΡ Π·Π°ΠΊΡΠ΅ΠΏΠ»Π΅Π½ ΡΠ²ΠΎΠΉ TRANSACTIONAL_ID
. Π Π΄Π°Π½Π½ΠΎΠΌ ΡΠ»ΡΡΠ°Π΅ Π΄Π»Ρ ΠΏΡΠΎΡΡΠΎΡΡ ΡΠ΅Π°Π»ΠΈΠ·Π°ΡΠΈΠΈ Ρ ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΡ ΡΡΡΡΠΈΠΊΡ Π² Π²ΠΈΠ΄Π΅ ΠΏΠΎΡΡΠ°, Π½Π° ΠΊΠΎΡΠΎΡΠΎΠΌ ΠΏΠΎΠ΄Π½ΡΡΠΎ ΠΏΡΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠ΅. ΠΠ΄Π½Π°ΠΊΠΎ, ΡΡΠ° Π»ΠΈΡΡ Π΄Π΅ΠΌΠΎ ΡΠ΅Π°Π»ΠΈΠ·Π°ΡΠΈΡ.
Π‘Π΅ΡΠ²ΠΈΡ Π²ΡΡΠΈΡΡΠ²Π°Π΅Ρ Π΄Π°Π½Π½ΡΠ΅ ΠΈΠ· Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΈΡ
ΡΠΎΠΏΠΈΠΊΠΎΠ² Kafka: customers
, payments.v1
, orders.v1
, ΠΎΠ±ΡΠ΅Π΄ΠΈΠ½ΡΠ΅Ρ ΠΈΡ
Ρ ΠΏΠΎΠΌΠΎΡΡΡ Kafka Streams, ΠΈ ΠΎΡΠΏΡΠ°Π²Π»ΡΠ΅Ρ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅ ΡΠ΅ΡΠ΅Π· ΠΈΠ½ΡΠ΅ΡΡΠ΅ΠΉΡ Emailer. ΠΡΠΎΠΌΠ΅ ΡΠΎΠ³ΠΎ, Π΄Π°Π½Π½ΡΠ΅ ΠΎ Π·Π°ΠΊΠ°Π·Π΅ ΠΈ ΠΏΠΎΠΊΡΠΏΠ°ΡΠ΅Π»Π΅ ΠΎΡΠΏΡΠ°Π²Π»ΡΡΡΡΡ Π² ΡΠΎΠΏΠΈΠΊ Kafka, ΡΠΎΠ²ΠΏΠ°Π΄Π°ΡΡΠΈΠΉ Ρ ΡΠ΅ΠΉΡΠΈΠ½Π³ΠΎΠΌ ΠΏΠΎΠΊΡΠΏΠ°ΡΠ΅Π»Ρ: platinum
, gold
, silver
, bronze
.
ΠΠ°ΡΡΡΠΎΠΉΠΊΠΈ Kafka Streams ΡΡΡ Π°Π½Π°Π»ΠΎΠ³ΠΈΡΠ½Ρ Orders Service.
ΠΠ»Π°ΡΡ EmailService ΠΎΡΠ²Π΅ΡΠ°Π΅Ρ Π·Π° ΡΠΎΠ·Π΄Π°Π½ΠΈΠ΅ ΠΈ ΠΎΠ±ΡΠ°Π±ΠΎΡΠΊΡ KStream
ΠΈ KTable
. ΠΠ»Ρ ΡΠΎΠ³ΠΎ, ΡΡΠΎΠ±Ρ ΠΏΠΎΡΠ»Π΅ ΡΡΠ°ΡΡΠ° ΠΏΡΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΡ ΡΠΎΠ·Π΄Π°Π»Π°ΡΡ ΠΈ Π·Π°ΡΠ°Π±ΠΎΡΠ°Π»Π° ΡΠΎΠΏΠΎΠ»ΠΎΠ³ΠΈΡ ΡΡΡΠΈΠΌΠΎΠ² Kafka Streams, Π½Π΅ΠΎΠ±Ρ
ΠΎΠ΄ΠΈΠΌΠΎ Π΅Π΅ Π½Π°ΡΡΡΠΎΠΈΡΡ Ρ ΠΏΠΎΠΌΠΎΡΡΡ StreamsBuilder
. ΠΠ»Ρ ΡΡΠΎΠ³ΠΎ Π΄ΠΎΡΡΠ°ΡΠΎΡΠ½ΠΎ Π² ΠΊΠ»Π°ΡΡ, ΠΏΠΎΠΌΠ΅ΡΠ΅Π½Π½ΡΠΉ Π°Π½Π½ΠΎΡΠ°ΡΠΈΠ΅ΠΉ @Component
ΠΈΠ»ΠΈ @Service
Π² ΠΎΠ΄ΠΈΠ½ ΠΈΠ· ΠΌΠ΅ΡΠΎΠ΄ΠΎΠ² Π·Π°ΠΈΠ½ΠΆΠ΅ΠΊΡΠΈΡΡ StreamsBuilder ΠΈ Π² ΡΡΠΎΠΌ ΠΌΠ΅ΡΠΎΠ΄Π΅ Π²ΡΡΡΡΠΎΠΈΡΡ Π½ΡΠΆΠ½ΡΡ ΡΠΎΠΏΠΎΠ»ΠΎΠ³ΠΈΡ.
@Service class EmailService( private val emailer: Emailer, private val schemas: Schemas ) { @Autowired fun processStreams(builder: StreamsBuilder) { val orders: KStream<String, Order> = builder.stream( schemas.ORDERS.name, Consumed.with(schemas.ORDERS.keySerde, schemas.ORDERS.valueSerde) ) //Create the streams/tables for the join val payments: KStream<String, Payment> = builder.stream( schemas.PAYMENTS.name, Consumed.with(schemas.PAYMENTS.keySerde, schemas.PAYMENTS.valueSerde) ) //Rekey payments to be by OrderId for the windowed join .selectKey { s, payment -> payment.orderId } val customers: GlobalKTable<Long, Customer> = builder.globalTable( schemas.CUSTOMERS.name, Consumed.with(schemas.CUSTOMERS.keySerde, schemas.CUSTOMERS.valueSerde) ) val serdes: StreamJoined<String, Order, Payment> = StreamJoined .with(schemas.ORDERS.keySerde, schemas.ORDERS.valueSerde, schemas.PAYMENTS.valueSerde) //Join the two streams and the table then send an email for each orders.join( payments, ::EmailTuple, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(1)), serdes ) //Next join to the GKTable of Customers .join( customers, { _, value -> value.order.customerId }, // note how, because we use a GKtable, we can join on any attribute of the Customer. EmailTuple::setCustomer ) //Now for each tuple send an email. .peek { _, emailTuple -> emailer.sendEmail(emailTuple) } //Send the order to a topic whose name is the value of customer level orders.join( customers, { orderId, order -> order.customerId }, { order, customer -> OrderEnriched(order.id, order.customerId, customer.level) } ) //TopicNameExtractor to get the topic name (i.e., customerLevel) from the enriched order record being sent .to( TopicNameExtractor { orderId, orderEnriched, record -> orderEnriched.customerLevel }, Produced.with(schemas.ORDERS_ENRICHED.keySerde, schemas.ORDERS_ENRICHED.valueSerde) ) } }
Π Π΄Π°Π½Π½ΠΎΠΌ ΠΏΡΠΈΠΌΠ΅ΡΠ΅ ΡΠ»Π΅Π΄ΡΠ΅Ρ ΠΎΠ±ΡΠ°ΡΠΈΡΡ Π²Π½ΠΈΠΌΠ°Π½ΠΈΠ΅ Π½Π° Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ ΠΌΠΎΠΌΠ΅Π½ΡΠΎΠ².
KStream<String, Payment>
ΡΠΎΠ·Π΄Π°Π΅ΡΡΡ Π½Π° ΠΎΡΠ½ΠΎΠ²Π΅ ΡΠΎΠΏΠΈΠΊΠ°payments.v1,
Π² ΠΊΠΎΡΠΎΡΠΎΠΌ ΠΊΠ»ΡΡΠΎΠΌ ΡΠ²Π»ΡΠ΅ΡΡΡ ID ΠΏΠ»Π°ΡΠ΅ΠΆΠ°, ΠΏΡΠΈ ΡΡΠΎΠΌ Π² ΠΈΠ½ΡΠΎΡΠΌΠ°ΡΠΈΠΈ ΠΎ ΠΏΠ»Π°ΡΠ΅ΠΆΠ΅ ΠΏΡΠΈΡΡΡΡΡΠ²ΡΠ΅Ρ ΠΏΠΎΠ»Π΅orderId
, ΠΊΠΎΡΠΎΡΠΎΠ΅ ΡΠ°ΠΊΡΠΈΡΠ΅ΡΠΊΠΈ ΡΡΡΠ»Π°Π΅ΡΡΡ Π½Π° ΡΡΡΠ½ΠΎΡΡΡOrder
. Kafka Streams ΠΏΠΎΠ΄Π΄Π΅ΡΠΆΠΈΠ²Π°Π΅Ρ ΠΎΠ±ΡΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΠ΅KStream
ΠΏΠΎ ΠΊΠ»ΡΡΡ ΡΠΎΠΏΠΈΠΊΠ°, ΠΈΠ· ΠΊΠΎΡΠΎΡΠΎΠ³ΠΎ Π²ΡΡΠΈΡΡΠ²Π°ΡΡΡΡ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΡ. ΠΡΠΎ Π½Π°Π·ΡΠ²Π°Π΅ΡΡΡ equi join. ΠΠΎ ΡΠ°ΠΊ ΠΊΠ°ΠΊ Π² ΠΈΡΡ ΠΎΠ΄Π½ΡΡ ΡΠΎΠΏΠΈΠΊΠ°Ρ ΡΠ°Π·Π½ΡΠ΅ ΠΊΠ»ΡΡΠΈ, ΠΏΡΠΈΡ ΠΎΠ΄ΠΈΡΡΡ ΠΏΡΠΎΠ²Π΅ΡΡΠΈ re-keying - Π²ΡΠ±ΠΎΡ Π½ΠΎΠ²ΠΎΠ³ΠΎ ΠΊΠ»ΡΡΠ° Π² "ΠΏΡΠ°Π²ΠΎΠΌ" ΡΡΡΠΈΠΌΠ΅ Ρ ΠΏΠΎΠΌΠΎΡΡΡ ΠΌΠ΅ΡΠΎΠ΄Π°KStream.selectKey()
, Π² Π½Π°ΡΠ΅ΠΌ ΡΠ»ΡΡΠ°Π΅ Π²KStream<String, Payment>
. ΠΠΎΠ΄ ΠΊΠ°ΠΏΠΎΡΠΎΠΌ Kafka Streams ΡΠΎΠ·Π΄Π°ΡΡ Π΅ΡΠ΅ ΠΎΠ΄ΠΈΠ½ ΡΠΎΠΏΠΈΠΊ Ρ ΡΠ°ΠΊΠΈΠΌ ΠΆΠ΅ ΠΊΠΎΠ»ΠΈΡΠ΅ΡΡΠ²ΠΎΠΌ ΠΏΠ°ΡΡΠΈΡΠΈΠΉ, ΠΊΠ°ΠΊ ΠΈ Π² ΠΈΡΡ ΠΎΠ΄Π½ΠΎΠΌ ΡΠΎΠΏΠΈΠΊΠ΅ ΠΈ Π±ΡΠ΄Π΅Ρ ΠΎΡΠΏΡΠ°Π²Π»ΡΡΡ Π΄Π°Π½Π½ΡΠ΅ ΠΈΠ· ΠΏΠ΅ΡΠ²ΠΎΠ½Π°ΡΠ°Π»ΡΠ½ΠΎΠ³ΠΎ ΡΠΎΠΏΠΈΠΊΠ° (payments.v1
) Π² Π½ΠΎΠ²ΡΠΉ ΡΠΎΠΏΠΈΠΊ. Π’Π΅ΠΏΠ΅ΡΡ ΡΡΠΎΡKStream
, ΠΎΡΠ½ΠΎΠ²Π°Π½Π½ΡΠΉ Π½Π° Π²Π½ΡΡΡΠ΅Π½Π½Π΅ΠΌ ΡΠΎΠΏΠΈΠΊΠ΅ ΠΌΠΎΠΆΠ½ΠΎ ΡΠΏΠΎΠΊΠΎΠΉΠ½ΠΎ ΠΎΠ±ΡΠ΅Π΄ΠΈΠ½ΡΡΡ ΡΠΎ ΡΡΡΠΈΠΌΠΎΠΌKStream<String, Order>
, ΡΠ°ΠΊ ΠΊΠ°ΠΊ Ρ Π½ΠΈΡ ΠΎΠ΄ΠΈΠ½Π°ΠΊΠΎΠ²ΡΠ΅ ΠΊΠ»ΡΡΠΈ Π·Π°ΠΏΠΈΡΠ΅ΠΉ.
NB! ΠΠΠΠΠ Π’ΡΡ Π΅ΡΡΡ ΠΎΠ΄ΠΈΠ½ ΠΎΡΠ΅Π½Ρ Π²Π°ΠΆΠ½ΡΠΉ Π½ΡΡΠ°Π½Ρ: Π΄Π»Ρ ΡΠΎΠ³ΠΎ, ΡΡΠΎΠ±Ρ Kafka Streams ΠΌΠΎΠ³Π»Π° ΠΏΡΠΎΠ²Π΅ΡΡΠΈ ΠΎΠ±ΡΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΠ΅ Π΄Π°Π½Π½ΡΡ ΠΈΡΡ ΠΎΠ΄Π½ΡΠ΅ ΡΠΎΠΏΠΈΠΊΠΈ Π΄ΠΎΠ»ΠΆΠ½Ρ Π±ΡΡΡ ΠΊΠΎ-ΠΏΠ°ΡΡΠΈΡΠΈΠΎΠ½ΠΈΡΠΎΠ²Π°Π½Ρ:
- Π΄ΠΎΠ»ΠΆΠ½Ρ Π±ΡΡΡ ΠΎΠ΄ΠΈΠ½Π°ΠΊΠΎΠ²ΡΠ΅ ΠΊΠ»ΡΡΠΈ
- Π΄ΠΎΠ»ΠΆΠ½Ρ ΠΈΠΌΠ΅ΡΡ ΠΎΠ΄ΠΈΠ½Π°ΠΊΠΎΠ²ΠΎΠ΅ ΠΊΠΎΠ»ΠΈΡΠ΅ΡΡΠ²ΠΎ ΠΏΠ°ΡΡΠΈΡΠΈΠΉ
- Π΄ΠΎΠ»ΠΆΠ½Ρ ΠΈΠΌΠ΅ΡΡ ΠΎΠ΄ΠΈΠ½Π°ΠΊΠΎΠ²ΡΡ ΡΡΡΠ°ΡΠ΅Π³ΠΈΡ ΠΏΠ°ΡΡΠΈΡΠΈΠΎΠ½ΠΈΡΠΎΠ²Π°Π½ΠΈΡ
ΠΠΎΠ΄ΡΠΎΠ±Π½Π΅Π΅ ΠΎΠ± ΡΡΠΎΠΌ Π² ΡΡΠ°ΡΡΠ΅ Co-Partitioning with Apache Kafka.
-
ΠΠ½ΡΠΎΡΠΌΠ°ΡΠΈΡ ΠΎ ΠΏΠΎΠΊΡΠΏΠ°ΡΠ΅Π»ΡΡ ΡΠΎΠ±ΠΈΡΠ°Π΅ΡΡΡ Π²
GlobalKTable<Long, Customer>
. ΠΡΠΎ ΠΎΠ·Π½Π°ΡΠ°Π΅Ρ, ΡΡΠΎ ΠΊΠ°ΠΆΠ΄ΡΠΉ ΠΈΠ½ΡΡΠ°Π½Ρ ΠΏΡΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΡ Email Service Π±ΡΠ΄Π΅Ρ ΠΈΠΌΠ΅ΡΡ Π² ΡΠ²ΠΎΠ΅ΠΌ ΡΠ°ΡΠΏΠΎΡΡΠΆΠ΅Π½ΠΈΠΈ Π΄Π°Π½Π½ΡΠ΅ ΠΎΠ±ΠΎ Π²ΡΠ΅Ρ ΠΏΠΎΠΊΡΠΏΠ°ΡΠ΅Π»ΡΡ , Π² ΠΎΡΠ»ΠΈΡΠΈΠ΅ ΠΎΡ ΠΎΠ±ΡΡΠ½ΠΎΠΉ KTable, ΠΊΠΎΡΠΎΡΠ°Ρ Ρ ΡΠ°Π½ΠΈΡ Π½Π° ΠΈΠ½ΡΡΠ°Π½ΡΠ΅ ΡΠΎΠ»ΡΠΊΠΎ Π΄Π°Π½Π½ΡΠ΅ ΠΈΠ· Π²ΡΡΠΈΡΡΠ²Π°Π΅ΠΌΡΡ ΡΡΠΈΠΌ ΠΈΠ½ΡΡΠ°Π½ΡΠΎΠΌ ΠΏΠ°ΡΡΠΈΡΠΈΠΉ. Π’Π°ΠΊΠΈΠΌ ΠΎΠ±ΡΠ°Π·ΠΎΠΌ ΠΌΡ ΠΌΠΎΠΆΠ΅ΠΌ ΠΎΠ±ΡΠ΅Π΄ΠΈΠ½ΡΡΡ KStream Ρ GlobalKTable ΠΏΠΎ Π»ΡΠ±ΠΎΠΌΡ ΠΏΠΎΠ»Ρ, ΠΏΠ»ΡΡ Π½Π΅Ρ Π½Π΅ΠΎΠ±Ρ ΠΎΠ΄ΠΈΠΌΠΎΡΡΠΈ ΡΠΎΠ±Π»ΡΠ΄Π°ΡΡ ΠΏΡΠ°Π²ΠΈΠ»ΠΎ ΠΎ ΠΊΠΎ-ΠΏΠ°ΡΡΠΈΡΠΈΠΎΠ½ΠΈΡΠΎΠ²Π°Π½ΠΈΠΈ ΡΠΎΠΏΠΈΠΊΠΎΠ². ΠΠ΄Π½Π°ΠΊΠΎ ΡΠ΅ΠΏΠ΅ΡΡ Π½Π°Ρ ΠΈΠ½ΡΡΠ°Π½Ρ ΠΌΠΎΠΆΠ΅Ρ Π±ΡΡΡ ΠΏΠ΅ΡΠ΅Π³ΡΡΠΆΠ΅Π½ ΠΊΠ°ΠΊ ΠΏΠΎ ΠΏΠ°ΠΌΡΡΠΈ, ΡΠ°ΠΊ ΠΈ ΠΏΠΎ ΡΠ΅ΡΠ΅Π²ΠΎΠΌΡ ΡΡΠ°ΡΠΈΠΊΡ, Π΅ΡΠ»ΠΈ ΡΠΎΠΏΠΈΠΊcustomers
Π±ΡΠ΄Π΅Ρ Ρ ΡΠ°Π½ΠΈΡΡ ΠΎΡΠ΅Π½Ρ ΠΌΠ½ΠΎΠ³ΠΎ Π΄Π°Π½Π½ΡΡ ΠΈ ΠΏΠΎΡΡΠΎΡΠ½Π½ΠΎ ΠΏΠΎΠΏΠΎΠ»Π½ΡΡΡΡΡ. Π’Π°ΠΊΠΆΠ΅ ΡΡΠΎΠΈΡ ΠΎΡΠΌΠ΅ΡΠΈΡΡ, ΡΡΠΎ GlobalKTable ΠΈΠ½ΠΈΡΠΈΠ°Π»ΠΈΠ·ΠΈΡΡΠ΅ΡΡΡ Π½Π° ΡΡΠ°ΡΡΠ΅ ΠΏΡΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΡ, ΡΠΎ Π΅ΡΡΡ Π²ΡΡΠΈΡΡΠ²Π°ΡΡΡΡ Π²ΡΠ΅ Π΄Π°Π½Π½ΡΠ΅ ΠΈΠ· ΡΠΎΠΎΡΠ²Π΅ΡΡΡΠ²ΡΡΡΠ΅Π³ΠΎ ΡΠΎΠΏΠΈΠΊΠ°. ΠΠΎ ΡΠΎΠ³ΠΎ, ΠΊΠ°ΠΊ ΠΏΡΠΎΠΈΠ·ΠΎΠΉΠ΄Π΅Ρ ΠΈΠ½ΠΈΡΠΈΠ°Π»ΠΈΠ·Π°ΡΠΈΡ, ΠΎΠ±ΡΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΠ΅ KStream Ρ GlobalKTable Π½Π΅ Π½Π°ΡΠ½Π΅ΡΡΡ. -
ΠΡΠΈ ΠΎΠ±ΡΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΠΈ Π΄Π²ΡΡ ΡΡΡΠΈΠΌΠΎΠ² Π²Π°ΠΆΠ½ΠΎ ΠΏΠΎΠ½ΠΈΠΌΠ°ΡΡ, ΡΡΠΎ Π΄Π°Π½Π½ΡΠ΅ Π² ΡΠΎΠΏΠΈΠΊΠΈ, Π½Π° ΠΎΡΠ½ΠΎΠ²Π΅ ΠΊΠΎΡΠΎΡΡΡ ΡΠΎΠ·Π΄Π°ΡΡΡΡ ΡΡΡΠΈΠΌΡ ΠΌΠΎΠ³ΡΡ ΠΏΠΎΠΏΠ°Π΄Π°ΡΡ Π½Π΅ ΠΎΠ΄Π½ΠΎΠ²ΡΠ΅ΠΌΠ΅Π½Π½ΠΎ. ΠΠΎΡΡΠΎΠΌΡ ΠΏΡΠΈ ΠΎΠ±ΡΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΠΈ ΡΡΡΠΈΠΌΠΎΠ² ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΡΡΡΡ "ΠΎΠΊΠ½Π°" ΠΎΠ±ΡΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΡ - JoinWindow. Π Π½Π°ΡΠ΅ΠΌ ΡΠ»ΡΡΠ°Π΅ ΠΏΡΠΈ ΠΎΠ±ΡΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΠΈ ΡΡΡΠΈΠΌΠΎΠ² orders ΠΈ payments ΠΌΡ ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΠ΅ΠΌ ΠΎΠΊΠ½ΠΎ Π² 1 ΠΌΠΈΠ½ΡΡΡ, Π±Π΅Π· ΠΊΠ°ΠΊΠΎΠ³ΠΎ-Π»ΠΈΠ±ΠΎ Π³ΡΠ΅ΠΉΡ ΠΏΠ΅ΡΠΈΠΎΠ΄Π°.
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(1))
ΠΡΠΎ ΠΎΠ·Π½Π°ΡΠ°Π΅Ρ, ΡΡΠΎ Π·Π°ΠΏΠΈΡΠΈ, timestamp ΠΊΠΎΡΠΎΡΡΡ Π² ΠΏΡΠ΅Π΄Π΅Π»Π°Ρ 1 ΠΌΠΈΠ½ΡΡΡ Π΄ΡΡΠ³ ΠΎΡ Π΄ΡΡΠ³Π°, ΠΌΠΎΠ³ΡΡ Π±ΡΡΡ ΠΎΠ±ΡΠ΅Π΄Π΅Π½ΠΈΠ½Ρ, ΡΠΎ Π΅ΡΡΡ ΠΌΡ ΠΏΠΎΠ΄ΡΠ°Π·ΡΠΌΠ΅Π²Π°Π΅ΠΌ, ΡΡΠΎ ΡΠΎΠ·Π΄Π°Π½ΠΈΠ΅ Π·Π°ΠΊΠ°Π·Π° ΠΈ Π΅Π³ΠΎ ΠΎΠΏΠ»Π°ΡΠ° ΠΏΠΎ Π²ΡΠ΅ΠΌΠ΅Π½ΠΈ Π½Π΅ ΡΠ°Π·Π»ΠΈΡΠ°ΡΡΡΡ Π½Π° +-1 ΠΌΠΈΠ½ΡΡΡ. ΠΠ°ΠΏΠΈΡΠΈ, ΠΊΠΎΡΠΎΡΡΠ΅ Π½Π΅ ΠΏΠΎΠΏΠ°Π΄Π°ΡΡ Π² ΡΡΠΎ ΠΎΠΊΠ½ΠΎ, Π½Π΅ Π±ΡΠ΄ΡΡ ΡΡΠΈΡΡΠ²Π°ΡΡΡΡ ΠΏΡΠΈ ΠΎΠ±ΡΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΠΈ ΡΡΡΠΈΠΌΠΎΠ². ΠΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π½ΠΈΠ΅ "ΠΎΠΊΠΎΠ½" ΡΠ°ΠΊΠΆΠ΅ ΠΎΠ±ΡΡΠ»ΠΎΠ²Π»Π΅Π½ΠΎ Π½Π΅ΠΎΠ±Ρ ΠΎΠ΄ΠΈΠΌΠΎΡΡΡΡ ΠΏΠΎΠ΄Π΄Π΅ΡΠΆΠΈΠ²Π°ΡΡ Π±ΡΡΠ΅Ρ Π΄Π»Ρ ΠΎΠ±ΡΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ . ΠΡΠ»ΠΈ Π±Ρ Π½Π΅ Π±ΡΠ»ΠΎ "ΠΎΠΊΠΎΠ½" ΡΡΠΎΡ Π±ΡΡΠ΅Ρ ΠΌΠΎΠ³ Π±Ρ ΠΏΠΎΠ³Π»ΠΎΡΠΈΡΡ Π²ΡΡ ΠΏΠ°ΠΌΡΡΡ. "ΠΠΊΠ½Π°" Ρ ΡΠ°Π½ΡΡ ΡΠ²ΠΎΠΈ Π΄Π°Π½Π½ΡΠ΅ Π² windowing state store, ΡΡΠ°ΡΡΠ΅ Π·Π°ΠΏΠΈΡΠΈ Π² ΠΊΠΎΡΠΎΡΠΎΠΌ ΡΠ΄Π°Π»ΡΡΡΡΡ ΠΏΠΎΡΠ»Π΅ Π½Π°ΡΡΡΠ°ΠΈΠ²Π°Π΅ΠΌΠΎΠ³ΠΎ window retention period, ΠΊΠΎΡΠΎΡΡΠΉ ΠΏΠΎ ΡΠΌΠΎΠ»ΡΠ°Π½ΠΈΡ ΡΠ°Π²Π΅Π½ 1 Π΄Π½Ρ. ΠΠ·ΠΌΠ΅Π½ΠΈΡΡ ΡΡΠΎ Π·Π½Π°ΡΠ΅Π½ΠΈΠ΅ ΠΌΠΎΠΆΠ½ΠΎ Ρ ΠΏΠΎΠΌΠΎΡΡΡ:Materialized#withRetention()
.
/** * Specifies that records of the same key are joinable if their timestamps are within {@code timeDifference}, * i.e., the timestamp of a record from the secondary stream is max {@code timeDifference} before or after * the timestamp of the record from the primary stream. * <p> * CAUTION: Using this method implicitly sets the grace period to zero, which means that any out-of-order * records arriving after the window ends are considered late and will be dropped. * * @param timeDifference join window interval * @return a new JoinWindows object with the window definition and no grace period. Note that this means out-of-order records arriving after the window end will be dropped * @throws IllegalArgumentException if {@code timeDifference} is negative or can't be represented as {@code long milliseconds} */ public static JoinWindows ofTimeDifferenceWithNoGrace(final Duration timeDifference) { return ofTimeDifferenceAndGrace(timeDifference, Duration.ofMillis(NO_GRACE_PERIOD)); }
ΠΠΎΠ»Π΅Π΅ ΠΏΠΎΠ΄ΡΠΎΠ±Π½ΠΎ ΠΎ ΡΠΎΠΌ, ΠΊΠ°ΠΊΠΈΠ΅ Π±ΡΠ²Π°ΡΡ Π²ΠΈΠ΄Ρ ΠΎΠ±ΡΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΠΉ ΠΌΠΎΠΆΠ½ΠΎ ΠΏΡΠΎΡΠΈΡΠ°ΡΡ Π² Π΄ΠΎΠΊΡΠΌΠ΅Π½ΡΠ°ΡΠΈΠΈ.
Π‘Π΅ΡΠ²ΠΈΡ ΠΎΡΠ²Π΅ΡΠ°Π΅Ρ Π·Π° ΠΏΠΎΠΈΡΠΊ ΠΏΠΎΡΠ΅Π½ΡΠΈΠ°Π»ΡΠ½ΠΎ ΠΌΠΎΡΠ΅Π½Π½ΠΈΡΠ΅ΡΠΊΠΈΡ ΡΡΠ°Π½Π·Π°ΠΊΡΠΈΠΉ: Π΅ΡΠ»ΠΈ ΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°ΡΠ΅Π»Ρ ΡΠ΄Π΅Π»Π°Π» Π·Π°ΠΊΠ°Π·Ρ Π½Π° Π±ΠΎΠ»Π΅Π΅ ΡΠ΅ΠΌ 2 ΡΡΡ Π² ΡΠ΅ΡΠ΅Π½ΠΈΠ΅ ΡΠ΅ΡΡΠΈΠΈ Π²Π·Π°ΠΈΠΌΠΎΠ΄Π΅ΠΉΡΡΠ²ΠΈΡ Ρ ΡΠ΅ΡΠ²Π΅ΡΠΎΠΌ (ΠΏΠΎΠ΄ΡΠΎΠ±Π½Π΅Π΅ Π½ΠΈΠΆΠ΅ ΠΏΠΎ ΡΠ΅ΠΊΡΡΡ), ΡΠΎ Π²ΡΠ΅ ΡΠ°ΠΊΠΈΠ΅ Π·Π°ΠΊΠ°Π·Ρ Π½Π΅ ΠΏΡΠΎΡ ΠΎΠ΄ΡΡ Π²Π°Π»ΠΈΠ΄Π°ΡΠΈΡ.
ΠΠΎΠ½ΡΠΈΠ³ΡΡΠ°ΡΠΈΡ Kafka Streams Π½Π΅ΠΌΠ½ΠΎΠ³ΠΎ ΠΎΡΠ»ΠΈΡΠ°Π΅ΡΡΡ ΠΎΡ ΠΏΡΠ΅Π΄ΡΠ΄ΡΡΠΈΡ :
@Bean(name = [KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME]) fun kStreamsConfig(): KafkaStreamsConfiguration { val props = hashMapOf<String, Any>( StreamsConfig.APPLICATION_ID_CONFIG to kafkaProps.appId, // must be specified to enable InteractiveQueries and checking metadata of Kafka Cluster StreamsConfig.APPLICATION_SERVER_CONFIG to serviceUtils.getServerAddress(), StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProps.bootstrapServers, StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG to Serdes.String().javaClass.name, // instances MUST have different stateDir StreamsConfig.STATE_DIR_CONFIG to kafkaProps.stateDir, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to kafkaProps.autoOffsetReset, StreamsConfig.PROCESSING_GUARANTEE_CONFIG to kafkaProps.processingGuarantee, StreamsConfig.COMMIT_INTERVAL_MS_CONFIG to kafkaProps.commitInterval, AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to topicProps.schemaRegistryUrl, StreamsConfig.consumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG) to kafkaProps.sessionTimeout, // disable caching to ensure a complete aggregate changelog. // This is a little trick we need to apply // as caching in Kafka Streams will conflate subsequent updates for the same key. // Disabling caching ensures // we get a complete "changelog" from the aggregate(...) // (i.e. every input event will have a corresponding output event. // https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390 StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG to "0" ) return KafkaStreamsConfiguration(props) }
Π‘ΡΠΎΠΈΡ ΠΏΠΎΡΡΠ½ΠΈΡΡ, ΡΡΠΎ Π² ΡΡΠΎΠΌ ΡΠ΅ΡΠ²ΠΈΡΠ΅ Π±ΡΠ΄Π΅Ρ ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°ΡΡΡΡ Π°Π³Π³ΡΠ΅Π³Π°ΡΠΈΡ Π΄Π°Π½Π½ΡΡ , Π²ΡΠ»Π΅Π΄ΡΡΠ²ΠΈΠ΅ ΡΠ΅Π³ΠΎ Kafka Streams Π±ΡΠ΄Π΅Ρ Π·Π°Π΄Π΅ΠΉΡΡΠ²ΠΎΠ²Π°ΡΡ state store Π΄Π»Ρ Ρ ΡΠ°Π½Π΅Π½ΠΈΡ ΡΠΎΡΡΠΎΡΠ½ΠΈΡ Π°Π³Π³ΡΠ΅Π³ΠΈΡΠΎΠ²Π°Π½Π½ΡΡ Π΄Π°Π½Π½ΡΡ . Π ΡΡΠΎΠΌ ΡΠ»ΡΡΠ°Π΅ ΠΌΡ ΠΎΡΠΊΠ»ΡΡΠ°Π΅ΠΌ ΠΊΡΡΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ Π΄Π°Π½Π½ΡΡ Π½Π° ΠΈΠ½ΡΡΠ°Π½ΡΠ΅, ΡΠ°ΠΊ ΠΊΠ°ΠΊ ΠΊΡΡΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅ ΠΌΠΎΠΆΠ΅Ρ ΠΏΡΠΈΠ²Π΅ΡΡΠΈ ΠΊ ΠΎΠ±ΡΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΡ ΠΏΠΎΡΠ»Π΅Π΄ΠΎΠ²Π°ΡΠ΅Π»ΡΠ½ΡΡ ΡΠΎΠ±ΡΡΠΈΠΉ ΠΏΡΠΈ ΡΠ°Π±ΠΎΡΠ΅ ΡΠΎ state store Π²ΠΎ Π²ΡΠ΅ΠΌΡ Π°Π³Π³ΡΠ΅Π³Π°ΡΠΈΠΈ Π΄Π°Π½Π½ΡΡ , Π² ΡΠΎ Π²ΡΠ΅ΠΌΡ ΠΊΠ°ΠΊ ΠΌΡ Ρ ΠΎΡΠΈΠΌ ΠΎΠ±ΡΠ°Π±Π°ΡΡΠ²Π°ΡΡ ΠΊΠ°ΠΆΠ΄ΡΡ Π·Π°ΠΏΠΈΡΡ Π² ΠΏΡΠΎΡΠ΅ΡΡΠ΅ Π°Π³Π³ΡΠ΅Π³Π°ΡΠΈΠΈ. ΠΠΎΠ»Π΅Π΅ ΠΏΠΎΠ΄ΡΠΎΠ±Π½ΠΎ ΠΎΠ± ΡΡΠΎΠΌ Π² ΡΡΠ°ΡΡΠ΅ Kafka Streams Memory Management.
ΠΠ° ΠΏΠ΅ΡΠ²ΠΎΠΌ ΡΡΠ°ΠΏΠ΅ ΡΠ°Π±ΠΎΡΡ ΡΠ΅ΡΠ²ΠΈΡΠ° ΡΠΎΠ·Π΄Π°Π΅ΡΡΡ ΡΠ·Π΅Π» Π°Π³Π³ΡΠ΅Π³Π°ΡΠΈΠΈ ΡΡΡΠΈΠΌΠ° KStream<String, Order>:
val orders: KStream<String, Order> = builder .stream( schemas.ORDERS.name, Consumed.with(schemas.ORDERS.keySerde, schemas.ORDERS.valueSerde) ) .peek { key, value -> log.info("Processing Order {} in Fraud Service.", value) } .filter { _, order -> OrderState.CREATED == order.state } // Create an aggregate of the total value by customer and hold it with the order. // We use session windows to detect periods of activity. val aggregate: KTable<Windowed<Long>, OrderValue> = orders // creates a repartition internal topic if the value to be grouped by differs from // the key and downstream nodes need the new key .groupBy( { id, order -> order.customerId }, Grouped.with(Serdes.Long(), schemas.ORDERS.valueSerde) ) .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofHours(1))) .aggregate( ::OrderValue, { custId, order, total -> OrderValue(order, total.value + order.quantity * order.price) }, { k, a, b -> simpleMerge(a, b) },//include a merger as we're using session windows., Materialized.with(null, schemas.ORDER_VALUE_SERDE) ) private fun simpleMerge(a: OrderValue?, b: OrderValue): OrderValue { return OrderValue(b.order, (a?.value ?: 0.0) + b.value) }
Π‘ΡΠΎΠΈΡ ΠΎΠ±ΡΠ°ΡΠΈΡΡ Π²Π½ΠΈΠΌΠ°Π½ΠΈΠ΅ Π½Π° ΡΠΎ, ΡΡΠΎ ΠΏΡΠΈ Π³ΡΡΠΏΠΏΠΈΡΠΎΠ²ΠΊΠ΅ ΠΏΠΎ ΠΊΠ»ΡΡΡ, ΠΎΡΠ»ΠΈΡΠ°ΡΡΠ΅ΠΌΡΡΡ ΠΎΡ ΠΊΠ»ΡΡΠ° ΠΈΡΡ
ΠΎΠ΄Π½ΠΎΠ³ΠΎ ΡΠΎΠΏΠΈΠΊΠ° Π±ΡΠ΄Π΅Ρ ΡΠΎΠ·Π΄Π°Π½ Π²Π½ΡΡΡΠ΅Π½Π½ΠΈΠΉ ΡΠ»ΡΠΆΠ΅Π±Π½ΡΠΉ ΡΠΎΠΏΠΈΠΊ Kafka, Ρ ΡΠ°ΠΊΠΈΠΌ ΠΆΠ΅ ΠΊΠΎΠ»ΠΈΡΠ΅ΡΡΠ²ΠΎΠΌ ΠΏΠ°ΡΡΠΈΡΠΈΠΉ, ΡΡΠΎ ΠΈ ΠΈΡΡ
ΠΎΠ΄Π½ΡΠΉ ΡΠΎΠΏΠΈΠΊ. Π Π½Π°ΡΠ΅ΠΌ ΠΏΡΠΈΠΌΠ΅ΡΠ΅ ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΠ΅ΡΡΡ Π΅ΡΠ΅ ΠΎΠ΄ΠΈΠ½ ΡΠΈΠΏ ΠΎΠΊΠ½Π° Π΄Π»Ρ Π°Π³Π³ΡΠ΅Π³Π°ΡΠΈΠΈ Π΄Π°Π½Π½ΡΡ
- SessionWindows.ofInactivityGapWithNoGrace(Duration.ofHours(1))
ΠΡΠΎ ΠΎΠ·Π½Π°ΡΠ°Π΅Ρ, ΡΡΠΎ Π΄Π°Π½Π½ΡΠ΅ Π±ΡΠ΄ΡΡ ΠΏΠΎΠΏΠ°Π΄Π°ΡΡ Π² Π°Π³Π³ΡΠ΅Π³Π°ΡΠΈΡ Π² ΡΠ΅ΡΠ΅Π½ΠΈΠ΅ ΡΠ΅ΡΡΠΈΠΈ Π°ΠΊΡΠΈΠ²Π½ΠΎΡΡΠΈ ΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°ΡΠ΅Π»Ρ. Π‘Π΅ΡΡΠΈΡ Π·Π°ΠΊΡΡΠ²Π°Π΅ΡΡΡ, ΠΊΠΎΠ³Π΄Π° Π² ΡΠΎΠΏΠΈΠΊ Π½Π΅ ΠΏΠΎΡΡΡΠΏΠ°ΡΡ Π΄Π°Π½Π½ΡΠ΅ Π² ΡΠ΅ΡΠ΅Π½ΠΈΠ΅ ΡΠΊΠ°Π·Π°Π½Π½ΠΎΠ³ΠΎ ΠΏΠ΅ΡΠΈΠΎΠ΄Π° Π²ΡΠ΅ΠΌΠ΅Π½ΠΈ. ΠΠΎΠ»Π΅Π΅ ΠΏΠΎΠ΄ΡΠΎΠ±Π½ΠΎ Π² ΡΡΠ°ΡΡΠ΅ Create session windows.
ΠΠ° Π²ΡΠΎΡΠΎΠΌ ΡΡΠ°ΠΏΠ΅ ΠΌΡ ΠΈΠ·Π±Π°Π²Π»ΡΠ΅ΠΌΡΡ ΠΎΡ Π΄Π°Π½Π½ΡΡ
ΠΎΠ± "ΠΎΠΊΠ½Π°Ρ
" ΠΈ Π΄Π΅Π»Π°Π΅ΠΌ re-keying Π·Π°ΠΏΠΈΡΠ΅ΠΉ ΠΏΠΎ orderId
:
val ordersWithTotals: KStream<String, OrderValue> = aggregate .toStream { windowedKey, orderValue -> windowedKey.key() } //When elements are evicted from a session window they create delete events. Filter these out. .filter { k, v -> v != null } .selectKey { id, orderValue -> orderValue.order.id }
ΠΠ° ΡΡΠ΅ΡΡΠ΅ΠΌ ΡΡΠ°ΠΏΠ΅ ΠΌΡ ΡΠΎΡΠΌΠΈΡΡΠ΅ΠΌ Π΄Π²Π΅ "Π²Π΅ΡΠΊΠΈ" ΡΡΡΠΈΠΌΠΎΠ²:
- ΠΎΠ΄Π½Π° ΡΠΎΠ΄Π΅ΡΠΆΠΈΡ Π·Π½Π°ΡΠ΅Π½ΠΈΡ Π±ΠΎΠ»ΡΡΠ΅ FRAUD_LIMIT
- Π²ΡΠΎΡΠ°Ρ - ΠΌΠ΅Π½ΡΡΠ΅ FRAUD_LIMIT
//Now branch the stream into two, for pass and fail, based on whether the windowed // total is over Fraud Limit val forks: Map<String, KStream<String, OrderValue>> = ordersWithTotals .peek { key, value -> log.info("Processing OrderValue: {} in FraudService BEFORE branching.", value) } .split(Named.`as`("limit-")) .branch( { id, orderValue -> orderValue.value >= FRAUD_LIMIT }, Branched.`as`("above") ) .branch( { id, orderValue -> orderValue.value < FRAUD_LIMIT }, Branched.`as`("below") ) .noDefaultBranch()
ΠΠ° ΡΠ»Π΅Π΄ΡΡΡΠ΅ΠΌ ΡΡΠ°ΠΏΠ΅ Π·Π½Π°ΡΠ΅Π½ΠΈΡ ΠΈΠ· ΠΊΠ°ΠΆΠ΄ΠΎΠΉ "Π²Π΅ΡΠΊΠΈ" Π±ΡΠ΄ΡΡ ΠΎΡΠΏΡΠ°Π²Π»Π΅Π½Ρ Π² ΡΠΎΠΏΠΈΠΊ order-validations.v1
, Ρ ΡΠΎΠΎΡΠ²Π΅ΡΡΡΠ²ΡΡΡΠΈΠΌ ΡΠ΅Π·ΡΠ»ΡΡΠ°ΡΠΎΠΌ: FAIL ΠΈΠ»ΠΈ PASS.
val keySerde = schemas.ORDER_VALIDATIONS.keySerde val valueSerde = schemas.ORDER_VALIDATIONS.valueSerde val config = hashMapOf<String, Any>( AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to topicProps.schemaRegistryUrl, AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS to false, AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION to true, ) keySerde.configure(config, true) valueSerde.configure(config, false) forks["limit-above"]?.mapValues { orderValue -> OrderValidation( orderValue.order.id, OrderValidationType.FRAUD_CHECK, OrderValidationResult.FAIL ) }?.peek { key, value -> log.info("Sending OrderValidation for failed check in Fraud Service to Kafka. Order: {}", value) }?.to( schemas.ORDER_VALIDATIONS.name, Produced.with( keySerde, valueSerde ) ) forks["limit-below"]?.mapValues { orderValue -> OrderValidation( orderValue.order.id, OrderValidationType.FRAUD_CHECK, OrderValidationResult.PASS ) }?.peek { key, value -> log.info("Sending OrderValidation for passed check in Fraud Service to Kafka. Order: {}", value) }?.to( schemas.ORDER_VALIDATIONS.name, Produced.with( schemas.ORDER_VALIDATIONS.keySerde, schemas.ORDER_VALIDATIONS.valueSerde ) )
Π ΠΏΡΠΎΡΠ΅ΡΡΠ΅ ΡΠ΅ΡΡΠΈΡΠΎΠ²Π°Π½ΠΈΡ ΡΠ΅Π°Π»ΠΈΠ·Π°ΡΠΈΠΈ Ρ ΠΌΠ΅Π½Ρ Π²ΠΎΠ·Π½ΠΈΠΊΠ»Π° ΠΏΡΠΎΠ±Π»Π΅ΠΌΠ° Ρ ΡΠ΅ΠΌ, ΡΡΠΎ Π² Π½Π΅ΠΊΠΎΡΠΎΡΡΡ ΡΠ»ΡΡΠ°ΡΡ Kafka Streams ΠΏΡΡΠ°Π»Π°ΡΡ ΠΎΡΠΏΡΠ°Π²ΠΈΡΡ Π΄Π°Π½Π½ΡΠ΅ Π² ΡΠΎΠΏΠΈΠΊ, ΠΏΡΠ΅Π΄Π²Π°ΡΠΈΡΠ΅Π»ΡΠ½ΠΎ Π°Π²ΡΠΎΠΌΠ°ΡΠΈΡΠ΅ΡΠΊΠΈ Π·Π°ΡΠ΅Π³ΠΈΡΡΡΠΈΡΠΎΠ²Π°Π² Π½ΠΎΠ²ΡΡ ΡΡ Π΅ΠΌΡ Π΄Π°Π½Π½ΡΡ Π² Schema Registry, ΠΏΡΠΈ ΡΡΠΎΠΌ Π½ΠΎΠ²Π°Ρ ΡΡ Π΅ΠΌΠ° Π±ΡΠ»Π° Π½Π΅ΡΠΎΠ²ΠΌΠ΅ΡΡΠΈΠΌΠ° ΡΠΎ ΡΡΠ°ΡΠΎΠΉ, ΡΡΠΎ ΠΎΡΡΠ°Π½Π°Π²Π»ΠΈΠ²Π°Π»ΠΎ Kafka Streams. ΠΠΎΠΏΡΡΠΊΠΈ ΠΎΡΠΊΠ»ΡΡΠΈΡΡ Π°Π²ΡΠΎΠΌΠ°ΡΠΈΡΠ΅ΡΠΊΡΡ ΡΠ΅Π³ΠΈΡΡΡΠ°ΡΠΈΡ ΡΡ Π΅ΠΌ Π΄Π»Ρ ΡΠ΅ΡΠΈΠ°Π»ΠΈΠ·Π°ΡΠΎΡΠΎΠ² / Π΄Π΅ΡΠ΅ΡΠΈΠ°Π»ΠΈΠ·Π°ΡΠΎΡΠΎΠ² Kafka Streams ΡΠ°ΠΊΠΆΠ΅ ΠΏΡΠΈΠ²ΠΎΠ΄ΠΈΠ»ΠΈ ΠΊ ΠΎΡΡΠ°Π½ΠΎΠ²ΠΊΠ΅ ΠΏΡΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΡ ΠΈΠ·-Π·Π° ΡΠΎΠ³ΠΎ, ΡΡΠΎ Kafka Streams ΡΠ΅Π³ΡΠ»ΡΡΠ½ΠΎ ΠΏΠΎΠ΄ ΠΊΠ°ΠΏΠΎΡΠΎΠΌ ΡΠΎΠ·Π΄Π°Π΅Ρ ΡΠ»ΡΠΆΠ΅Π±Π½ΡΠ΅ ΡΠΎΠΏΠΈΠΊΠΈ ΠΈ Π΄ΠΎΠ»ΠΆΠ½Π° ΠΈΠΌΠ΅ΡΡ Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡΡ ΡΠ΅Π³ΠΈΡΡΡΠΈΡΠΎΠ²Π°ΡΡ ΠΈΡ ΡΡ Π΅ΠΌΡ. ΠΡΠΈ ΡΡΠΎΠΌ ΡΡΠΎΠΈΡ Π·Π°ΠΌΠ΅ΡΠΈΡΡ, ΡΡΠΎ ΠΎΡΡΠ°Π½ΠΎΠ²ΠΊΠ° ΠΏΡΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΡ Kafka Streams Π½Π΅ ΠΏΡΠΈΠ²ΠΎΠ΄ΠΈΡ ΠΊ ΠΏΠ°Π΄Π΅Π½ΠΈΡ ΠΈΠ½ΡΡΠ°Π½ΡΠ° - ΠΎΠ½-ΡΠΎ ΠΊΠ°ΠΊ ΡΠ°Π· ΠΈ ΠΏΡΠΎΠ΄ΠΎΠ»ΠΆΠ°Π΅Ρ ΡΠ°Π±ΠΎΡΠ°ΡΡ, ΠΏΠΎΡΡΠΎΠΌΡ Π² ΠΏΡΠΎΠ΄Π΅ ΠΏΡΠΈΠ΄Π΅ΡΡΡ Π½Π°ΡΡΡΠ°ΠΈΠ²Π°ΡΡ ΠΎΡΠ΄Π΅Π»ΡΠ½ΡΠΉ ΠΌΠΎΠ½ΠΈΡΠΎΡΠΈΠ½Π³ ΠΆΠΈΠ·Π½Π΅ΡΠΏΠΎΡΠΎΠ±Π½ΠΎΡΡΠΈ ΠΏΡΠΈΠ»ΠΎΠΆΠ΅Π½ΠΈΠΉ Kafka Streams, ΡΡΠΎΠ±Ρ Π² ΡΠ»ΡΡΠ°Π΅ ΠΎΡΡΠ°Π½ΠΎΠ²ΠΊΠΈ ΡΡΡΠΈΠΌΠΎΠ² ΠΏΠ΅ΡΠ΅Π·Π°ΠΏΡΡΡΠΈΡΡ ΠΈΠ½ΡΡΠ°Π½Ρ.
ΠΡΡΠ΅ΠΌ Π΄ΠΎΠ»Π³ΠΈΡ ΠΌΡΡΠ°ΡΡΡΠ² ΡΠ΅ΡΠ΅Π½ΠΈΠ΅ (ΡΠΊΠΎΡΠ΅Π΅ Π·Π°ΠΏΠ»Π°ΡΠΊΠ°) Π±ΡΠ»ΠΎ Π½Π°ΠΉΠ΄Π΅Π½ΠΎ: ΡΠ°ΠΌ, Π³Π΄Π΅ Kafka Streams ΠΎΡΠΏΡΠ°Π²Π»ΡΠ΅Ρ Π°Π³Π³ΡΠ΅Π³ΠΈΡΠΎΠ²Π°Π½Π½ΡΠ΅ ΠΈΠ»ΠΈ ΠΎΠ±ΡΠ΅Π΄ΠΈΠ½Π΅Π½Π½ΡΠ΅ Π΄Π°Π½Π½ΡΠ΅ Π² ΡΠΆΠ΅ ΡΠΎΠ·Π΄Π°Π½Π½ΡΠΉ ΡΠΎΠΏΠΈΠΊ Kafka Ρ Π·Π°ΡΠ΅Π³ΠΈΡΡΡΠΈΡΠΎΠ²Π°Π½Π½ΠΎΠΉ Π² Schema Registry ΡΡ Π΅ΠΌΠΎΠΉ, Ρ ΠΎΡΠΊΠ»ΡΡΠΈΠ» Π²ΠΎΠ·ΠΌΠΎΠΆΠ½ΠΎΡΡΡ Serdes Π°Π²ΡΠΎΠΌΠ°ΡΠΈΡΠ΅ΡΠΊΠΈ ΡΠ΅Π³ΠΈΡΡΡΠΈΡΠΎΠ²Π°ΡΡ Π½ΠΎΠ²ΡΠ΅ ΡΡ Π΅ΠΌΡ.
ΠΡΡΠΈΡΡΠ²Π°Π΅Ρ Π΄Π°Π½Π½ΡΠ΅ ΠΎ Π·Π°ΠΊΠ°Π·Π°Ρ
ΠΈΠ· ΡΠΎΠΏΠΈΠΊΠ° orders.v1
ΠΈ Π²Π°Π»ΠΈΠ΄ΠΈΡΡΠ΅Ρ Π½Π° ΠΏΡΠ΅Π΄ΠΌΠ΅Ρ Π½Π°Π»ΠΈΡΠΈΡ ΡΡΠ΅Π±ΡΠ΅ΠΌΠΎΠ³ΠΎ ΠΊΠΎΠ»ΠΈΡΠ΅ΡΡΠ²Π° ΡΠΎΠ²Π°ΡΠΎΠ² Π² Π·Π°ΠΊΠ°Π·Π΅. ΠΠ°Π»ΠΈΠ΄Π°ΡΠΈΡ ΡΡΠΈΡΡΠ²Π°Π΅Ρ ΠΊΠ°ΠΊ ΡΠ΅ ΡΠΎΠ²Π°ΡΡ, ΠΊΠΎΡΠΎΡΡΠ΅ Π΅ΡΡΡ Π½Π° "ΡΠΊΠ»Π°Π΄Π΅", ΡΠ°ΠΊ ΠΈ ΡΠ΅, ΠΊΠΎΡΠΎΡΡΠ΅ Π·Π°ΡΠ΅Π·Π΅ΡΠ²ΠΈΡΠΎΠ²Π°Π½Ρ (Π½Π° Π½ΠΈΡ
Π΅ΡΡΡ Π΄Π΅ΠΉΡΡΠ²ΡΡΡΠΈΠ΅ Π·Π°ΠΊΠ°Π·Ρ). ΠΠ° ΡΠ΅ΠΊΡΡΠΈΠΉ ΠΌΠΎΠΌΠ΅Π½Ρ, Π½Π΅ ΡΠ΅Π°Π»ΠΈΠ·ΠΎΠ²Π°Π½Π° Π»ΠΎΠ³ΠΈΠΊΠ° ΡΠ½ΠΈΠΆΠ΅Π½ΠΈΡ ΠΊΠΎΠ»ΠΈΡΠ΅ΡΡΠ²Π° ΡΠΎΠ²Π°ΡΠΎΠ² Π½Π° "ΡΠΊΠ»Π°Π΄Π΅" - ΡΡΠΎ Π΄ΠΎΠ»ΠΆΠ½ΠΎ ΠΏΡΠΎΠΈΡΡ
ΠΎΠ΄ΠΈΡΡ, Π½Π°ΠΏΡΠΈΠΌΠ΅Ρ, ΠΏΡΠΈ ΠΎΡΠΏΡΠ°Π²ΠΊΠ΅ Π·Π°ΠΊΠ°Π·Π° ΠΏΠΎΠΊΡΠΏΠ°ΡΠ΅Π»Ρ ΡΠ΅ΡΠ΅Π· ΡΠ»ΡΠΆΠ±Ρ Π΄ΠΎΡΡΠ°Π²ΠΊΠΈ. "Π‘ΠΊΠ»Π°Π΄" - ΠΏΡΠ΅Π΄ΡΡΠ°Π²Π»ΡΠ΅Ρ ΠΈΠ· ΡΠ΅Π±Ρ ΡΠΎΠΏΠΈΠΊ Kafka warehouse-inventory.v1
, Π³Π΄Π΅ ΠΊΠ»ΡΡΠ΅ΠΌ Π·Π°ΠΏΠΈΡΠΈ ΡΠ²Π»ΡΠ΅ΡΡΡ Π½Π°Π·Π²Π°Π½ΠΈΠ΅ ΡΠΎΠ²Π°ΡΠ°, Π° Π·Π½Π°ΡΠ΅Π½ΠΈΠ΅ΠΌ - ΠΊΠΎΠ»ΠΈΡΠ΅ΡΡΠ²ΠΎ.
ΠΡΠ½ΠΎΠ²Π½ΠΎΠΉ ΡΠ΅ΡΠ²ΠΈΡ InventoryKafkaService
ΡΠΎΠ·Π΄Π°Π΅Ρ KStream<String, Order>
Π½Π° ΠΎΡΠ½ΠΎΠ²Π΅ ΡΠΎΠΏΠΈΠΊΠ° orders.v1
:
val orders: KStream<String, Order> = builder .stream( schemas.ORDERS.name, Consumed.with(schemas.ORDERS.keySerde, schemas.ORDERS.valueSerde) ) .peek { key, value -> log.info("Orders streams record. Key: {}, Order: {}", key, value) }
KTable<Product, Int>
Π½Π° ΠΎΡΠ½ΠΎΠ²Π΅ ΡΠΎΠΏΠΈΠΊΠ° warehouse-inventory.v1
:
val warehouseInventory: KTable<Product, Int> = builder .table( schemas.WAREHOUSE_INVENTORY.name, Consumed.with( schemas.WAREHOUSE_INVENTORY.keySerde, schemas.WAREHOUSE_INVENTORY.valueSerde ) )
Π° ΡΠ°ΠΊΠΆΠ΅ ΠΌΠ°ΡΠ΅ΡΠΈΠ°Π»ΠΈΠ·ΠΎΠ²Π°Π½Π½ΡΠΉ KeyValueStore<Product, Long>
, Ρ
ΡΠ°Π½ΡΡΠΈΠΉ ΠΈΠ½ΡΠΎΡΠΌΠ°ΡΠΈΡ ΠΎ Π·Π°ΡΠ΅Π·Π΅ΡΠ²ΠΈΡΠΎΠ²Π°Π½Π½ΡΡ
ΡΠΎΠ²Π°ΡΠ°Ρ
:
val reservedStock: StoreBuilder<KeyValueStore<Product, Long>> = Stores .keyValueStoreBuilder( Stores.persistentKeyValueStore(inventoryProps.reservedStockStoreName), schemas.WAREHOUSE_INVENTORY.keySerde, Serdes.Long() ) .withLoggingEnabled(hashMapOf()) builder.addStateStore(reservedStock)
ΠΠΎΡΠ»Π΅ ΡΡΠΎΠ³ΠΎ ΠΌΡ Π΄Π΅Π»Π°Π΅ΠΌ re-keying ΡΡΡΠΈΠΌΠ° KStream<String, Order>
ΠΏΠΎ ΠΊΠ»ΡΡΡ product. Π’ΡΡ ΡΡΠΎΠΈΡ Π½Π°ΠΏΠΎΠΌΠ½ΠΈΡΡ, ΡΡΠΎ ΠΏΠΎΠ΄ ΠΊΠ°ΠΏΠΎΡΠΎΠΌ Kafka Streams ΠΏΠΎΠΌΠ΅ΡΠΈΡ ΡΡΠΎΡ KStream ΠΊΠ°ΠΊ ΠΏΠΎΠ΄Π»Π΅ΠΆΠ°ΡΠΈΠΉ ΡΠ΅ΠΏΠ°ΡΡΠΈΡΠΈΠΎΠ½ΠΈΡΠΎΠ²Π°Π½ΠΈΡ, ΠΈ Π² ΡΠ»ΡΡΠ°Π΅ Π½Π΅ΠΎΠ±Ρ
ΠΎΠ΄ΠΈΠΌΠΎΡΡΠΈ (Π½ΠΈΠΆΠ΅ Π±ΡΠ΄Π΅Ρ join ΠΈΠ»ΠΈ Π°Π³Π³ΡΠ΅Π³Π°ΡΠΈΡ), ΠΏΡΠΎΠΈΠ·ΠΎΠΉΠ΄Π΅Ρ ΡΠ΅ΠΏΠ°ΡΡΠΈΡΠΈΠΎΠ½ΠΈΡΠΎΠ²Π°Π½ΠΈΠ΅. ΠΠ°Π»Π΅Π΅ Π² ΡΡΡΠΈΠΌ ΠΏΠΎΠΏΠ°Π΄Π°ΡΡ ΡΠΎΠ»ΡΠΊΠΎ Π·Π°ΠΊΠ°Π·Ρ Π² ΡΠΎΡΡΠΎΡΠ½ΠΈΠΈ CREATED, ΠΊΠΎΡΠΎΡΡΠ΅ ΠΎΠ±ΡΠ΅Π΄ΠΈΠ½ΡΡΡΡΡ Ρ ΡΠ°Π±Π»ΠΈΡΠ΅ΠΉ Π·Π°ΡΠ΅Π·Π΅ΡΠ²ΠΈΡΠΎΠ²Π°Π½Π½ΡΡ
ΡΠΎΠ²Π°ΡΠΎΠ². ΠΠΎΡΠ»Π΅ ΠΎΠ±ΡΠ΅Π΄ΠΈΠ½Π΅Π½ΠΈΡ ΠΏΠΎΠ»ΡΡΠ°Π΅ΠΌ Π½ΠΎΠ²ΡΠΉ KStream
, ΠΊΠ°ΠΆΠ΄Π°Ρ Π·Π°ΠΏΠΈΡΡ ΠΊΠΎΡΠΎΡΠΎΠ³ΠΎ ΠΏΡΠΎΡ
ΠΎΠ΄ΠΈΡ Π²Π°Π»ΠΈΠ΄Π°ΡΠΈΡ ΠΈ ΠΎΡΠΏΡΠ°Π²Π»ΡΠ΅ΡΡΡ Π² ΡΠΎΠΏΠΈΠΊ order-validations.v1
.
orders.selectKey { id, order -> order.product } .peek { key, value -> log.info("Orders stream record after SELECT KEY. New key: {}. \nNew value: {}", key, value) } // Limit to newly created orders .filter { id, order -> OrderState.CREATED == order.state } //Join Orders to Inventory so we can compare each order to its corresponding stock value .join( warehouseInventory, ::KeyValue, Joined.with( schemas.WAREHOUSE_INVENTORY.keySerde, schemas.ORDERS.valueSerde, Serdes.Integer() ) ) //Validate the order based on how much stock we have both in the warehouse // and locally 'reserved' stock .transform( TransformerSupplier { InventoryValidator(inventoryProps) }, inventoryProps.reservedStockStoreName ) .peek { key, value -> log.info( "Pushing the result of validation Order record to topic: {} with key: {}. \nResult: {}", schemas.ORDER_VALIDATIONS.name, key, value ) } //Push the result into the Order Validations topic .to( schemas.ORDER_VALIDATIONS.name, Produced.with( orderValidationsKeySerde.apply { configureSerde(this, true) }, orderValidationsValueSerde.apply { configureSerde(this, false) } ) )
ΠΠΎΠ»Π½Π°Ρ ΡΠ΅Π°Π»ΠΈΠ·Π°ΡΠΈΡ ΠΊΠ»Π°ΡΡΠ°:
private val log = LoggerFactory.getLogger(InventoryKafkaService::class.java) @Service class InventoryKafkaService( private val schemas: Schemas, private val inventoryProps: InventoryProps, private val topicProps: TopicsProps ) { @Autowired fun processStreams(builder: StreamsBuilder) { // Latch onto instances of the orders and inventory topics val orders: KStream<String, Order> = builder .stream( schemas.ORDERS.name, Consumed.with(schemas.ORDERS.keySerde, schemas.ORDERS.valueSerde) ) .peek { key, value -> log.info("Orders streams record. Key: {}, Order: {}", key, value) } val warehouseInventory: KTable<Product, Int> = builder .table( schemas.WAREHOUSE_INVENTORY.name, Consumed.with( schemas.WAREHOUSE_INVENTORY.keySerde, schemas.WAREHOUSE_INVENTORY.valueSerde ) ) // Create a store to reserve inventory whilst the order is processed. // This will be prepopulated from Kafka before the service starts processing val reservedStock: StoreBuilder<KeyValueStore<Product, Long>> = Stores .keyValueStoreBuilder( Stores.persistentKeyValueStore(inventoryProps.reservedStockStoreName), schemas.WAREHOUSE_INVENTORY.keySerde, Serdes.Long() ) .withLoggingEnabled(hashMapOf()) builder.addStateStore(reservedStock) val orderValidationsKeySerde = Serdes.String() val orderValidationsValueSerde = SpecificAvroSerde<OrderValidation>() //First change orders stream to be keyed by Product (so we can join with warehouse inventory) orders.selectKey { id, order -> order.product } .peek { key, value -> log.info("Orders stream record after SELECT KEY. New key: {}. \nNew value: {}", key, value) } // Limit to newly created orders .filter { id, order -> OrderState.CREATED == order.state } //Join Orders to Inventory so we can compare each order to its corresponding stock value .join( warehouseInventory, ::KeyValue, Joined.with( schemas.WAREHOUSE_INVENTORY.keySerde, schemas.ORDERS.valueSerde, Serdes.Integer() ) ) //Validate the order based on how much stock we have both in the warehouse // and locally 'reserved' stock .transform( TransformerSupplier { InventoryValidator(inventoryProps) }, inventoryProps.reservedStockStoreName ) .peek { key, value -> log.info( "Pushing the result of validation Order record to topic: {} with key: {}. \nResult: {}", schemas.ORDER_VALIDATIONS.name, key, value ) } //Push the result into the Order Validations topic .to( schemas.ORDER_VALIDATIONS.name, Produced.with( orderValidationsKeySerde.apply { configureSerde(this, true) }, orderValidationsValueSerde.apply { configureSerde(this, false) } ) ) } private fun configureSerde(serde: Serde<*>, isKey: Boolean) { val serdesConfig = hashMapOf<String, Any>( AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to topicProps.schemaRegistryUrl, AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS to false, AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION to true, ) schemas.configureSerde(serde, serdesConfig, isKey) } }
Π‘Π΅ΡΠ²ΠΈΡ ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΠ΅Ρ Java ΠΊΠ»ΠΈΠ΅Π½Ρ ksqlDB, Π΄Π»Ρ ΡΠΎΠ³ΠΎ, ΡΡΠΎΠ±Ρ ΡΠΎΠ·Π΄Π°ΡΡ Π² ksqlDB stream, ΠΊΠΎΡΠΎΡΡΠΉ ΡΠΎΠ΄Π΅ΡΠΆΠΈΡ ΠΈΠ½ΡΠΎΡΠΌΠ°ΡΠΈΡ ΠΊΠ°ΠΊ ΠΎ ΠΏΠΎΠΊΡΠΏΠ°ΡΠ΅Π»Π΅, ΡΠ°ΠΊ ΠΈ ΠΎ Π·Π°ΠΊΠ°Π·Π΅. ΠΠ°Π½Π½ΡΠ΅ ΠΈΠ· ΡΡΠΎΠ³ΠΎ ΡΡΡΠΈΠΌΠ° ΠΌΠΎΠΆΠ½ΠΎ ΠΏΠΎΠ»ΡΡΠΈΡΡ, Π½Π΅ΠΏΠΎΡΡΠ΅Π΄ΡΡΠ²Π΅Π½Π½ΠΎ ΠΎΠ±ΡΠ°ΡΠΈΠ²ΡΠΈΡΡ ΠΊ ksqlDB ΠΊΠ°ΠΊ ΡΠ΅ΡΠ΅Π· Java ΠΊΠ»ΠΈΠ΅Π½Ρ, ΡΠ°ΠΊ ΠΈ Π΄ΡΡΠ³ΠΈΠΌΠΈ ΡΠΏΠΎΡΠΎΠ±Π°ΠΌΠΈ.
private val CREATE_ORDERS_STREAM = "CREATE SOURCE STREAM IF NOT EXISTS ${ksqlDBProps.ordersStream} (" + "ID STRING KEY, " + "CUSTOMERID BIGINT, " + "STATE STRING, " + "PRODUCT STRING, " + "QUANTITY INT, " + "PRICE DOUBLE) WITH (" + "kafka_topic='${topicProps.ordersTopic}'," + " value_format='AVRO'" + ");" private val CREATE_CUSTOMERS_TABLE = """ CREATE SOURCE TABLE IF NOT EXISTS ${ksqlDBProps.customersTable} ( CUSTOMERID BIGINT PRIMARY KEY, FIRSTNAME STRING, LASTNAME STRING, EMAIL STRING, ADDRESS STRING, LEVEL STRING ) WITH ( KAFKA_TOPIC='${topicProps.customersTopic}', VALUE_FORMAT='AVRO' ); """ private val CREATE_ORDERS_ENRICHED_STREAM = """ CREATE STREAM IF NOT EXISTS ${ksqlDBProps.ordersEnrichedStream} AS SELECT ${ksqlDBProps.customersTable}.CUSTOMERID AS customerId, ${ksqlDBProps.customersTable}.FIRSTNAME, ${ksqlDBProps.customersTable}.LASTNAME, ${ksqlDBProps.customersTable}.LEVEL, ${ksqlDBProps.ordersStream}.PRODUCT, ${ksqlDBProps.ordersStream}.QUANTITY, ${ksqlDBProps.ordersStream}.PRICE FROM ${ksqlDBProps.ordersStream} LEFT JOIN ${ksqlDBProps.customersTable} ON ${ksqlDBProps.ordersStream}.CUSTOMERID = ${ksqlDBProps.customersTable}.CUSTOMERID; """
ΠΠΎΠΌΠΈΠΌΠΎ ΡΡΠΎΠ³ΠΎ ΡΠ°ΠΊΠΆΠ΅ ΡΠΎΠ·Π΄Π°Π΅ΡΡΡ ΡΠ°Π±Π»ΠΈΡΠ°, ΡΠΎΠ΄Π΅ΡΠΆΠ°ΡΠ°Ρ ΠΈΠ½ΡΠΎΡΠΌΠ°ΡΠΈΡ ΠΎ ΠΏΠΎΡΠ΅Π½ΡΠΈΠ°Π»ΡΠ½ΠΎ ΠΌΠΎΡΠ΅Π½Π½ΠΈΡΠ΅ΡΠΊΠΈΡ Π΄Π΅ΠΉΡΡΠ²ΠΈΡΡ ΠΊΠ»ΠΈΠ΅Π½ΡΠΎΠ²:
private val CREATE_FRAUD_ORDER_TABLE = """ CREATE TABLE IF NOT EXISTS ${ksqlDBProps.fraudOrderTable} WITH (KEY_FORMAT='json') AS SELECT CUSTOMERID, LASTNAME, FIRSTNAME, COUNT(*) AS COUNTS FROM ${ksqlDBProps.ordersEnrichedStream} WINDOW TUMBLING (SIZE 30 SECONDS) GROUP BY CUSTOMERID, LASTNAME, FIRSTNAME HAVING COUNT(*)>2; """
Π‘Π΅ΡΠ²ΠΈΡ Π²ΡΡΡΠ°Π²Π»ΡΠ΅Ρ Π½Π°ΡΡΠΆΡ REST endpoint, ΠΏΠΎ ΠΊΠΎΡΠΎΡΠΎΠΌΡ ΠΌΠΎΠΆΠ½ΠΎ ΠΏΠΎΠ»ΡΡΠΈΡΡ Π΄Π°Π½Π½ΡΠ΅ ΠΎ ΠΏΠΎΡΠ΅Π½ΡΠΈΠ°Π»ΡΠ½ΡΡ ΠΌΠΎΡΠ΅Π½Π½ΠΈΠΊΠ°Ρ :
data class FraudDto( val customerId: Long, val lastName: String, val firstName: String, val counts: Long ) @RestController @RequestMapping("/fraud") class FraudController( private val fraudService: FraudService ) { @GetMapping fun getFraudulentOrders( @RequestParam(name = "limit", defaultValue = "10") limit: Int ): List<FraudDto> { log.info("Received request to GET $limit number of fraudulent orders.") return fraudService.getFraudulentOrders(limit) } } @Service class FraudServiceImpl( private val client: Client, private val ksqlDBProps: KsqlDBProps ) : FraudService { override fun getFraudulentOrders(limit: Int): List<FraudDto> { log.info("Executing getFraudulentOrders stream query.") val frauds = client.executeQuery("SELECT * FROM ${ksqlDBProps.fraudOrderTable} LIMIT $limit;").get() return frauds.map(::mapRowToResponse) } private fun mapRowToResponse(row: Row): FraudDto { log.info("Mapping ksqlDB Query row: {}", row) val cusId = row.getLong("CUSTOMERID") val lastName = row.getString("LASTNAME") val firstName = row.getString("FIRSTNAME") val count = row.getLong("COUNTS") return FraudDto(cusId, lastName, firstName, count) } }
ΠΡΠΎΡΡΠΎΠΉ ΡΠ΅ΡΠ²ΠΈΡ, ΠΎΡΠ²Π΅ΡΠ°ΡΡΠΈΠΉ Π·Π° Π²ΡΡΠΈΡΡΠ²Π°Π½ΠΈΠ΅ Π΄Π°Π½Π½ΡΡ
ΠΈΠ· ΡΠΎΠΏΠΈΠΊΠ° order-validations.v1
, ΠΊΡΠ΄Π° ΠΎΡΠΏΡΠ°Π²Π»ΡΡΡ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΡ ΡΠ΅ΡΠ²ΠΈΡΡ ΠΏΡΠΎΠ²Π΅ΡΠΊΠΈ Π·Π°ΠΊΠ°Π·ΠΎΠ²: Inventory Service, Fraud Service ΠΈ Order Details Service. ΠΠΎΡΠ»Π΅ ΡΠΎΠ³ΠΎ, ΠΊΠ°ΠΊ ΠΎΡ ΠΊΠ°ΠΆΠ΄ΠΎΠ³ΠΎ Π²Π°Π»ΠΈΠ΄Π°ΡΠΎΡΠ° Π±ΡΠ΄Π΅Ρ ΠΏΠΎΠ»ΡΡΠ΅Π½ ΠΎΡΠ²Π΅Ρ ΠΏΠΎ ΠΊΠΎΠ½ΠΊΡΠ΅ΡΠ½ΠΎΠΌΡ Π·Π°ΠΊΠ°Π·Ρ, Π² ΡΠΎΠΏΠΈΠΊ orders.v1
ΠΎΡΠΏΡΠ°Π²Π»ΡΠ΅ΡΡΡ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅ Ρ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Π½ΡΠΌ ΡΡΠ°ΡΡΡΠΎΠΌ Π·Π°ΠΊΠ°Π·Π°.
ΠΠ°ΡΡΡΠΎΠΉΠΊΠΈ Kafka Streams Π°Π½Π°Π»ΠΎΠ³ΠΈΡΠ½Ρ Orders Service.
Π Π΅Π°Π»ΠΈΠ·Π°ΡΠΈΡ ΠΊΠ»Π°ΡΡΠ° ValidationAggregatorService:
@Service class ValidationAggregatorService( private val schemas: Schemas, private val topicProps: TopicsProps ) { private val serdes1: Consumed<String, OrderValidation> = Consumed.with( schemas.ORDER_VALIDATIONS.keySerde, schemas.ORDER_VALIDATIONS.valueSerde ) private val serdes2: Consumed<String, Order> = Consumed .with(schemas.ORDERS.keySerde, schemas.ORDERS.valueSerde) private val serdes3: Grouped<String, OrderValidation> = Grouped .with(schemas.ORDER_VALIDATIONS.keySerde, schemas.ORDER_VALIDATIONS.valueSerde) private val serdes4: StreamJoined<String, Long, Order> = StreamJoined .with(schemas.ORDERS.keySerde, Serdes.Long(), schemas.ORDERS.valueSerde) private val serdes5: Grouped<String, Order> = Grouped .with(schemas.ORDERS.keySerde, schemas.ORDERS.valueSerde) private val serdes6: StreamJoined<String, OrderValidation, Order> = StreamJoined .with(schemas.ORDERS.keySerde, schemas.ORDER_VALIDATIONS.valueSerde, schemas.ORDERS.valueSerde) // disables auto registering schemas for specified Serde<*> private fun configureSerdes(serde: Serde<*>, isKey: Boolean) { val config = hashMapOf<String, Any>( AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to topicProps.schemaRegistryUrl, AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS to false, AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION to true, ) schemas.configureSerde(serde, config, isKey) } @Autowired fun aggregateOrderValidations(builder: StreamsBuilder) { // 1. ΠΠΎΠ»ΠΈΡΠ΅ΡΡΠ²ΠΎ ΠΏΡΠΎΠ²Π΅ΡΠΎΠΊ, ΠΊΠΎΡΠΎΡΡΠ΅ ΠΏΡΠΎΡ
ΠΎΠ΄ΠΈΡ ΠΊΠ°ΠΆΠ΄ΡΠΉ Π·Π°ΠΊΠ°Π·. ΠΠ»Ρ ΠΏΡΠΎΡΡΠΎΡΡ ΡΠ΅Π°Π»ΠΈΠ·Π°ΡΠΈΠΈ Π·Π°Ρ
Π°ΡΠ΄ΠΊΠΎΠΆΠ΅Π½ΠΎ. val numberOfRules = 3 // 2. ΠΠ°ΡΡΡΠΎΠΉΠΊΠ° Serdes, ΠΊΠΎΡΠΎΡΡΠ΅ Π±ΡΠ΄ΡΡ ΡΡΠ°ΡΡΠ²ΠΎΠ²Π°ΡΡ Π² ΠΎΡΠΏΡΠ°Π²ΠΊΠ΅ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠΉ Π² ΡΠΎΠΏΠΈΠΊ orders.v1. // ΠΠ°ΠΏΡΠ΅ΡΠ°Π΅ΠΌ Π°Π²ΡΠΎΠΌΠ°ΡΠΈΡΠ΅ΡΠΊΡΡ ΡΠ΅Π³ΠΈΡΡΡΠ°ΡΠΈΡ ΡΡ
Π΅ΠΌ Π² Schema Registry, ΡΡΠΎΠ±Ρ ΠΈΠ·Π±Π΅ΠΆΠ°ΡΡ ΠΊΠΎΠ½ΡΠ»ΠΈΠΊΡΠΎΠ². val ordersKeySerde = Serdes.String() val ordersValueSerde = SpecificAvroSerde<Order>() configureSerdes(ordersKeySerde, true) configureSerdes(ordersValueSerde, false) // 3. ΡΡΡΠΈΠΌ ΠΈΠ· ΡΠΎΠΏΠΈΠΊΠ° order-validations.v1 val validations: KStream<String, OrderValidation> = builder .stream(schemas.ORDER_VALIDATIONS.name, serdes1) // 4. ΡΡΡΠΈΠΌ ΠΈΠ· ΡΠΎΠΏΠΈΠΊΠ° orders.v1, Π² ΠΊΠΎΡΠΎΡΠΎΠΌ ΠΏΡΠΈΡΡΡΡΡΠ²ΡΡΡ ΡΠΎΠ»ΡΠΊΠΎ Π·Π°ΠΊΠ°Π·Ρ Π² ΡΡΠ°ΡΡΡΠ΅ CREATED val orders = builder .stream(schemas.ORDERS.name, serdes2) .filter { id, order -> OrderState.CREATED == order.state } validations .peek { key, value -> log.info("Starting validation for key: {}, value: {}", key, value) } // 5. Π³ΡΡΠΏΠΏΠΈΡΡΠ΅ΠΌ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΡ ΠΎ Π²Π°Π»ΠΈΠ΄Π°ΡΠΈΠΈ ΠΏΠΎ orderId .groupByKey(serdes3) // 6. Π‘ΠΎΠΎΠ±ΡΠ΅Π½ΠΈΡ ΡΠ°ΠΊΠΆΠ΅ ΡΠ°Π·Π±ΠΈΠ²Π°ΡΡΡΡ ΠΏΠΎ ΠΎΠΊΠ½Π°ΠΌ ΡΠ΅ΡΡΠΈΠΎΠ½Π½ΠΎΠΉ Π°ΠΊΡΠΈΠ²Π½ΠΎΡΡΠΈ Ρ ΠΏΠ΅ΡΠΈΠΎΠ΄ΠΈΠΌ Π½Π΅Π°ΠΊΡΠΈΠ²Π½ΠΎΡΡΠΈ // 5 ΠΌΠΈΠ½ΡΡ .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(5))) // 7. ΠΠ³Π³ΡΠ΅Π³ΠΈΡΡΠ΅ΠΌ Π΄Π°Π½Π½ΡΠ΅ ΠΎ ΠΏΡΠΎΠ²Π΅ΡΠΊΠ΅ Π·Π°ΠΊΠ°Π·ΠΎΠ², Π΅ΡΠ»ΠΈ Π·Π°ΠΊΠ°Π· ΠΏΡΠΎΡΠ΅Π» ΠΏΡΠΎΠ²Π΅ΡΠΊΡ (PASS), // ΡΠΎΠ³Π΄Π° ΡΠ²Π΅Π»ΠΈΡΠΈΠ²Π°Π΅ΠΌ ΡΡΠ΅ΡΡΠΈΠΊ ΠΏΡΠΎΠ²Π΅ΡΠ΅Π½Π½ΡΡ
Π·Π°ΠΊΠ°Π·ΠΎΠ² .aggregate( { 0L }, { id, result, total -> if (PASS == result.validationResult) total + 1 else total }, // ΡΠ°ΠΊ ΠΊΠ°ΠΊ ΠΌΡ ΠΈΡΠΏΠΎΠ»ΡΠ·ΡΠ΅ΠΌ SessionWindow, Π½Π°ΠΌ Π½Π΅ΠΎΠ±Ρ
ΠΎΠ΄ΠΈΠΌΠΎ Π΄ΠΎΠ±Π°Π²ΠΈΡΡ merger Π΄Π»Ρ ΡΠ΅ΡΡΠΈΠΉ // ΠΈΠ·-Π·Π° ΠΎΡΠΎΠ±Π΅Π½Π½ΠΎΡΡΠ΅ΠΉ ΡΠ°Π±ΠΎΡΡ ΡΡΠΈΡ
ΠΎΠΊΠΎΠ½. ΠΠΎ ΡΡΡΠΈ, ΠΊΠΎΠ³Π΄Π° Π² ΡΡΡΠΈΠΌ ΠΏΡΠΈΡ
ΠΎΠ΄ΠΈΡ ΠΏΠ΅ΡΠ²ΠΎΠ΅ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅ // ΡΡΠ°Π·Ρ ΡΠΎΠ·Π΄Π°Π΅ΡΡΡ ΡΠ΅ΡΡΠΈΡ, ΠΏΠΎΡΠ»Π΅ ΡΡΠΎΠ³ΠΎ, Π΅ΡΠ»ΠΈ ΠΏΡΠΈΡ
ΠΎΠ΄ΠΈΡ Π΅ΡΠ΅ ΠΎΠ±Π½ΠΎ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠ΅, ΠΏΠΎΠΏΠ°Π΄Π°ΡΡΠ΅Π΅ Π² // inactivity gap ΠΏΠ΅ΡΠ²ΠΎΠ³ΠΎ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΡ, ΡΠΎ Π΄Π»Ρ Π½Π΅Π³ΠΎ ΡΠΎΠΆΠ΅ ΡΠΎΠ·Π΄Π°Π΅ΡΡΡ ΡΠ΅ΡΡΠΈΡ, ΠΊΠΎΡΠΎΡΠ°Ρ ΠΌΠ΅ΡΠΆΠΈΡΡΡ // Ρ ΡΠ΅ΡΡΠΈΠ΅ΠΉ ΠΏΠ΅ΡΠ²ΠΎΠ³ΠΎ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΡ. ΠΠΎΠ΄ΡΠΎΠ±Π½Π΅Π΅ ΡΡΡ: // https://kafka.apache.org/36/documentation/streams/developer-guide/dsl-api.html#windowing-session { k, a, b -> b ?: a }, Materialized.with(null, Serdes.Long()) ) // 8. ΠΈΠ·Π±Π°Π²Π»ΡΠ΅ΠΌΡΡ ΠΎΡ "ΠΎΠΊΠΎΠ½" ΠΈ ΡΠΎΡΠΌΠΈΡΡΠ΅ΠΌ Π½ΠΎΠ²ΡΠΉ ΡΡΡΠΈΠΌ, Π³Π΄Π΅: key = orderId, value = ΠΊΠΎΠ»ΠΈΡΠ΅ΡΡΠ²ΠΎ ΠΏΡΠΎΠΉΠ΄Π΅Π½Π½ΡΡ
ΠΏΡΠΎΠ²Π΅ΡΠΎΠΊ .toStream { windowedKey, total -> windowedKey.key() } // ΠΠΎΠ³Π΄Π° ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΡ ΡΠ΄Π°Π»ΡΡΡΡΡ ΠΈΠ· Session Window, ΠΏΡΠ±Π»ΠΈΠΊΡΠ΅ΡΡΡ ΡΠΎΠ±ΡΡΠΈΠ΅ delete (null-value), ΠΈΠ·Π±Π°Π²Π»ΡΠ΅ΠΌΡΡ ΠΎΡ Π½ΠΈΡ
.filter { _, v -> v != null } // 9. Π’Π°ΠΊΠΆΠ΅ Π±Π΅ΡΠ΅ΠΌ ΡΠΎΠ»ΡΠΊΠΎ ΡΠ΅ Π·Π°ΠΏΠΈΡΠΈ, ΠΊΠΎΡΠΎΡΡΠ΅ ΠΏΡΠΎΡΠ»ΠΈ Π²ΡΠ΅ ΠΏΡΠΎΠ²Π΅ΡΠΊΠΈ .filter { _, total -> total >= numberOfRules } // 9. // 10. ΠΎΠ±ΡΠ΅Π΄ΠΈΠ½ΡΠ΅ΠΌ ΡΠΎ ΡΡΡΠΈΠΌΠΎΠΌ orders ΠΈ ΠΎΠ±Π½ΠΎΠ²Π»ΡΠ΅ΠΌ ΡΡΠ°ΡΡΡ Π·Π°ΠΊΠ°Π·Π° Π½Π° VALIDATED .join( orders, { id, order -> Order.newBuilder(order).setState(OrderState.VALIDATED).build() }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)), serdes4 ) .peek { _, value -> log.info("Order {} has been validated and is being sent to Kafka Topic: {}", value, topicProps.ordersTopic) } // 11. ΠΡΠΏΡΠ°Π²Π»ΡΠ΅ΠΌ Π·Π°ΠΊΠ°Π· Ρ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Π½ΡΠΌ ΡΡΠ°ΡΡΡΠΎΠΌ Π² ΡΠΎΠΏΠΈΠΊ orders.v1 .to(schemas.ORDERS.name, Produced.with(ordersKeySerde, ordersValueSerde)) // 12. ΡΠ°ΠΊΠΆΠ΅ ΠΎΠ±ΡΠ°Π±Π°ΡΡΠ²Π°Π΅ΠΌ ΡΠ»ΡΡΠ°ΠΉ, ΠΊΠΎΠ³Π΄Π° Π·Π°ΠΊΠ°Π· ΠΏΡΠΎΠ²Π°Π»ΠΈΠ» Ρ
ΠΎΡΡΠ±Ρ ΠΎΠ΄Π½Ρ ΠΏΡΠΎΠ²Π΅ΡΠΊΡ validations.filter { id, rule -> FAIL == rule.validationResult } // 13. ΠΎΠ±ΡΠ΅Π΄ΠΈΠ½ΡΠ΅ΠΌ ΡΠΎ ΡΡΡΠΈΠΌΠΎΠΌ orders ΠΈ ΠΎΠ±Π½ΠΎΠ²Π»ΡΠ΅ΠΌ ΡΡΠ°ΡΡΡ Π·Π°ΠΊΠ°Π·Π° Π½Π° FAILED .join( orders, { id, order -> Order.newBuilder(order).setState(OrderState.FAILED).build() }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)), serdes6 ) .peek { key, value -> log.info("Order {} failed validation. BEFORE groupByKey.", value) } // 14. ΠΡΡΠΏΠΏΠΈΡΡΠ΅ΠΌ ΠΏΠΎ ΠΊΠ»ΡΡΡ - orderId .groupByKey(serdes5) // 15. ΡΠ°ΠΊ ΠΊΠ°ΠΊ Π·Π°ΠΊΠ°Π· ΠΌΠΎΠ³ Π½Π΅ ΠΏΡΠΎΠΉΡΠΈ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ ΠΏΡΠΎΠ²Π΅ΡΠΎΠΊ, Π² Π½Π°ΡΠ΅ΠΌ ΡΡΡΠΈΠΌΠ΅ ΠΌΠΎΠ³ΡΡ ΠΏΡΠΈΡΡΡΡΡΠ²ΠΎΠ²Π°ΡΡ // Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ Π·Π°ΠΏΠΈΡΠ΅ΠΉ ΡΠΎ ΡΡΠ°ΡΡΡΠΎΠΌ FAIL, ΡΡΡ ΠΌΡ ΠΈΠ·Π±Π°Π²Π»ΡΠ΅ΠΌΡΡ ΠΎΡ Π΄ΡΠ±Π»ΠΈΠΊΠ°ΡΠΎΠ², ΡΠ°ΠΊ ΠΊΠ°ΠΊ Π½Π°ΠΌ // Π½Π΅ Π²Π°ΠΆΠ½ΠΎ ΡΠΊΠΎΠ»ΡΠΊΠΎ ΠΏΡΠΎΠ²Π΅ΡΠΎΠΊ Π½Π΅ ΠΏΡΠΎΡΠ΅Π» Π·Π°ΠΊΠ°Π· .reduce { order, v1 -> order } .toStream() .peek { key, value -> log.info("Order {} failed validation. AFTER groupByKey.", value) } .to( // 16. ΠΡΠΏΡΠ°Π²Π»ΡΠ΅ΠΌ Π·Π°ΠΊΠ°Π· Ρ ΠΎΠ±Π½ΠΎΠ²Π»Π΅Π½Π½ΡΠΌ ΡΡΠ°ΡΡΡΠΎΠΌ Π² ΡΠΎΠΏΠΈΠΊ orders.v1 schemas.ORDERS.name, Produced.with(ordersKeySerde, ordersValueSerde) ) } }
- JDK 17
- Docker + docker-compose
- ΠΠΠ£ Π΄Π»Ρ Docker Π½Π΅ ΠΌΠ΅Π½Π΅Π΅ 8 ΠΠ±
- ΠΠΠ£ Π΄Π»Ρ ΠΌΠΈΠΊΡΠΎΡΠ΅ΡΠ²ΠΈΡΠΎΠ² Π½Π΅ ΠΌΠ΅Π½Π΅Π΅ 8 ΠΠ±. (Π’ΠΎΡΠ½ΠΎ Π·Π°ΠΏΡΡΠΊΠ°Π΅ΡΡΡ Π½Π° Ubuntu 22.04 Ρ 64 ΠΠ± ΠΠΠ£ ΠΈ 8 ΡΠ΄ΡΠ°ΠΌΠΈ ΠΏΡΠΎΡΠ΅ΡΡΠΎΡΠ°)
ΠΠ°ΠΏΡΡΡΠΈΡΡ ΡΠΊΡΠΈΠΏΡ ΠΈΠ· ΠΊΠΎΡΠ½Π΅Π²ΠΎΠΉ ΠΏΠ°ΠΏΠΊΠΈ:
- ΠΡΠ»ΠΈ ΡΡΡΠ°Π½ΠΎΠ²Π»Π΅Π½ Docker V2, ΡΠΎ:
./prepare_script_docker_new.sh
- ΠΡΠ»ΠΈ Π±ΠΎΠ»Π΅Π΅ ΡΡΠ°ΡΠ° Π²Π΅ΡΡΠΈΡ, ΡΠΎ:
./prepare_script_docker_old.sh
Π‘ΠΊΡΠΈΠΏΡ ΠΎΡΡΡΠ΅ΡΡΠ²Π»ΡΠ΅Ρ ΡΠ»Π΅Π΄ΡΡΡΠΈΠ΅ Π΄Π΅ΠΉΡΡΠ²ΠΈΡ:
-
ΠΠΎΠ΄Π½ΠΈΠΌΠ°Π΅Ρ docker ΠΊΠΎΠ½ΡΠ΅ΠΉΠ½Π΅ΡΡ:
- Zookeeper
- Kafka
- Schema Registry (Confluent)
- Connect (Kafka Connect)
- ksqlDB
- Elasticsearch
- SQLite
- Kibana
-
Π Π΅Π³ΠΈΡΡΡΠΈΡΡΠ΅Ρ Avro ΡΡ Π΅ΠΌΡ ΡΡΡΠ½ΠΎΡΡΠ΅ΠΉ, ΠΊΠΎΡΠΎΡΡΠ΅ Π±ΡΠ΄ΡΡ ΠΎΠ±ΡΠ°Π±Π°ΡΡΠ²Π°ΡΡΡΡ Π² Kafka. Π Π΅Π³ΠΈΡΡΡΠ°ΡΠΈΡ ΠΎΡΡΡΠ΅ΡΡΠ²Π»ΡΠ΅ΡΡΡ Ρ ΠΏΠΎΠΌΠΎΡΡΡ Gradle Plugin:
id 'com.github.imflog.kafka-schema-registry-gradle-plugin' version "1.12.0"
-
ΠΡΠΎΠ²ΠΎΠ΄ΠΈΡ ΡΠΈΡΡΡΡ ΡΠ±ΠΎΡΠΊΡ ΠΌΠΈΠΊΡΠΎΡΠ΅ΡΠ²ΠΈΡΠΎΠ² (ΠΏΠΎΠΊΠ° Π±Π΅Π· ΡΠ΅ΡΡΠΎΠ², ΡΠ°ΠΊ ΠΊΠ°ΠΊ ΠΈΡ Π΅ΡΠ΅ Π½Π΅ Π½Π°ΠΏΠΈΡΠ°Π»).
-
Π‘ΠΎΠ·Π΄Π°Π΅Ρ ΠΈΠ½Π΄Π΅ΠΊΡ
orders.v1
Π² ElasticSearch -
ΠΠΎΠ½ΡΠΈΠ³ΡΡΠΈΡΡΠ΅Ρ Kafka Connect Π΄Π»Ρ:
- ΠΏΠΎΠ»ΡΡΠ΅Π½ΠΈΡ Π΄Π°Π½Π½ΡΡ ΠΈΠ· SQLite (ΡΠ°Π±Π»ΠΈΡΠ° customers) ΠΈ Π·Π°ΠΏΠΈΡΠΈ Π»ΠΎΠ³Π° ΠΈΠ·ΠΌΠ΅Π½Π΅Π½ΠΈΠΉ Π² Kafka (ΠΏΠ°ΡΡΠ΅ΡΠ½ Change Data Capture)
- Π·Π°ΠΏΠΈΡΠΈ Π΄Π°Π½Π½ΡΡ
ΠΈΠ· ΡΠΎΠΏΠΈΠΊΠ° Kafka
orders.v1
Π² ΠΈΠ½Π΄Π΅ΠΊΡorders.v1
ElasticSearch
-
ΠΠ°ΡΡΡΠ°ΠΈΠ²Π°Π΅Ρ Kibana Π΄Π»Ρ ΠΏΡΠΎΡΠΌΠΎΡΡΠ° Π·Π°ΠΏΠΈΡΠ΅ΠΉ ΠΈΠ· ElasticSearch.
Π£ ΠΌΠ΅Π½Ρ Π½Π° Π²ΠΈΡΡΡΠ°Π»ΡΠ½ΠΎΠΉ ΠΌΠ°ΡΠΈΠ½Π΅ Ubuntu 22.04 ΠΈΠ·Π½Π°ΡΠ°Π»ΡΠ½ΠΎ Π½Π΅ Π·Π°ΠΏΡΡΡΠΈΠ»ΡΡ ΠΊΠΎΠ½ΡΠ΅ΠΉΠ½Π΅Ρ elasticsearch, ΡΠ°ΠΊ ΠΊΠ°ΠΊ ΡΡΠ΅Π±ΠΎΠ²Π°Π»ΠΎΡΡ ΡΡΡΠ°Π½ΠΎΠ²ΠΈΡΡ Π·Π½Π°ΡΠ΅Π½ΠΈΠ΅ vm.max_map_count Π½Π΅ Π½ΠΈΠΆΠ΅ 262144. ΠΡΠ»ΠΈ Π²ΠΎΠ·Π½ΠΈΠΊΠ°Π΅Ρ ΡΠ°ΠΊΠ°Ρ ΠΏΡΠΎΠ±Π»Π΅ΠΌΠ°, ΡΠΎ ΠΌΠΎΠΆΠ½ΠΎ Π²ΡΠΏΠΎΠ»Π½ΠΈΡΡ ΡΠ»Π΅Π΄ΡΡΡΡΡ ΠΊΠΎΠΌΠ°Π½Π΄Ρ:
sudo sysctl -w vm.max_map_count=262144
ΠΠΎΡΠ»Π΅ ΡΠΎΠ³ΠΎ, ΠΊΠ°ΠΊ ΡΠΊΡΠΈΠΏΡ ΠΎΡΡΠ°Π±ΠΎΡΠ°Π΅Ρ, ΠΌΠΎΠΆΠ½ΠΎ Π·Π°ΠΏΡΡΠΊΠ°ΡΡ ΠΌΠΈΠΊΡΠΎΡΠ΅ΡΠ²ΠΈΡΡ. Π‘Π°ΠΌΡΠΉ ΠΏΡΠΎΡΡΠΎΠΉ Π²Π°ΡΠΈΠ°Π½Ρ - Π·Π°ΠΏΡΡΡΠΈΡΡ Π² IntellijIdea Π²ΡΠ΅ ΠΌΠΈΠΊΡΠΎΡΠ΅ΡΠ²ΠΈΡΡ, ΠΊΡΠΎΠΌΠ΅ populator. Populator ΡΠ»Π΅Π΄ΡΠ΅Ρ Π·Π°ΠΏΡΡΡΠΈΡΡ Π² ΠΏΠΎΡΠ»Π΅Π΄Π½ΡΡ ΠΎΡΠ΅ΡΠ΅Π΄Ρ, ΡΠ°ΠΊ ΠΊΠ°ΠΊ ΠΎΠ½ Π±ΡΠ΄Π΅Ρ ΠΏΠΎΡΡΠΎΡΠ½Π½ΠΎ Π³Π΅Π½Π΅ΡΠΈΡΠΎΠ²Π°ΡΡ Π΄Π°Π½Π½ΡΠ΅ ΠΈ ΠΎΡΠΏΡΠ°Π²Π»ΡΡΡ POST Π·Π°ΠΏΡΠΎΡΡ Π² orders-service Π½Π° ΡΠΎΠ·Π΄Π°Π½ΠΈΠ΅ Π·Π°ΠΊΠ°Π·Π° (1 Π·Π°ΠΊΠ°Π· Π² ΡΠ΅ΠΊΡΠ½Π΄Ρ), Π° ΡΠ°ΠΊΠΆΠ΅ ΠΏΠΎΡΡΠ»Π°ΡΡ ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΡ Π² Kafka ΡΠΎΠΏΠΈΠΊΠΈ: warehouse-inventory.v1 ΠΈ payments.v1.
ΠΠΎΠΆΠ½ΠΎ Π΄Π°ΡΡ ΡΠ΅ΡΠ²ΠΈΡΡ populator ΠΏΠΎΡΠ°Π±ΠΎΡΠ°ΡΡ Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΎ ΡΠ΅ΠΊΡΠ½Π΄ (15-30) ΠΈ ΠΏΠΎΡΠ»Π΅ ΡΡΠΎΠ³ΠΎ ΠΎΡΡΠ°Π½ΠΎΠ²ΠΈΡΡ. Π§ΡΠΎΠ±Ρ ΠΏΡΠΎΡΠΈΡΠ°ΡΡ ΡΠΎΠ΄Π΅ΡΠΆΠΈΠΌΠΎΠ΅ ΡΠΎΠΏΠΈΠΊΠΎΠ² Kafka (ΠΏΠΎΡΠ»Π΅Π΄Π½ΠΈΠ΅ 5 ΡΠΎΠΎΠ±ΡΠ΅Π½ΠΈΠΉ Π² ΡΠΎΠΏΠΈΠΊΠ΅) Π½Π΅ΠΎΠ±Ρ ΠΎΠ΄ΠΈΠΌΠΎ Π²ΡΠΏΠΎΠ»Π½ΠΈΡΡ ΡΠ»Π΅Π΄ΡΡΡΠΈΠΉ ΡΠΊΡΠΈΠΏΡ (ΠΎΠΏΡΡΡ ΠΆΠ΅ Π² Π·Π°Π²ΠΈΡΠΈΠΌΠΎΡΡΠΈ ΠΎΡ ΡΠΎΠ³ΠΎ, ΠΊΠ°ΠΊΠΎΠΉ docker ΡΡΡΠ°Π½ΠΎΠ²Π»Π΅Π½):
./read-topics-docker-new.sh
ΠΈΠ»ΠΈ
./read-dopics-docker-old.sh
Π ΠΊΠΎΠ½ΡΠ΅ Π½Π΅ΠΎΠ±Ρ ΠΎΠ΄ΠΈΠΌΠΎ ΠΎΡΡΠ°Π½ΠΎΠ²ΠΈΡΡ docker ΠΊΠΎΠ½ΡΠ΅ΠΉΠ½Π΅ΡΡ:
docker compose down