Decoding the data integration matrix Connectors in Kafka connect and Flink Sambhav Jain Software Engineer, Confluent
Sink System What is a connector framework? Connector Framework Kafka 2 Source System Kafka
Flink connectors 3 Source System Sink System
Connector Ecosystem syslog SOURCES SINKS 100+ open source connectors ~20 open source connectors 4
Source Systems Sink Systems Worker Connect Cluster Connector Connector Connector Source Sink Source Operators Sink Operators Task Manager Job Manager Source coordinator Kafka Connect Flink Physical Architectures group by group by 5
Connector Development
7 Source Interfaces in Kafka Connect Sink SMT SMT Source Connector Kafka Connect WorkerSourceTask execute() SourceTask poll() SMT/ converters Offset store commitOffsets() After successful production SourceConnector taskConfigs() validate() WorkerConnector Source system
Let’s start with the datadog connector { "events": [ { "alert_type": "info", "date_happened": "integer", "device_name": "string", "host": "string", "id": "integer", "id_str": "string", "payload": "{}", "priority": "normal", "source_type_name": "string", "tags": [ "environment:test" ], "text": "Oh boy!", "title": "Did you hear the news today?", "url": "string" } ], "status": "string" } API Request API Response 8
Source Interfaces in Kafka Connect 9
Source Interfaces in Kafka Connect 10
Source Interfaces in Flink Source Task Manager Job Manager handleSplitRequest() SplitEnumerator SourceReader RecordEmitter Split Fetcher Records Queue emitRecord() SplitReader RecordEmitter fetch() 11 Source
Source Interfaces in Flink 12
Source Interfaces in Flink 13
Source Flink v/s Source Kafka Connect fetch() poll() converter emitRecord() Flink Connect Record, Schema SourceRecord Schema Worker 14
State Management "date_happened": "integer" from DD response 15 Source Offsets Split State
State Management State Store Offset topic Worker Source offsets { task1: { timestamp: 1708508270 }, task2: { timestamp: 1708507620 } } SourceTask Kafka Connect Flink SplitState snapshotState() emitRecord() emitRecord() RecordEmitter { split1: { timestamp: 1708508270 }, split2: { timestamp: 1708507620 } } 16
17 WorkerSinkTask WorkerConnector execute() SinkTask put() SMT/ converters preCommit() flush() Consumer committing its offsets Kafka Connect Flink SinkConnector taskConfigs() Sink Interfaces validate() flush() write() Sink Periodically called just before completing every checkpoint StatefulSinkWriter
Kafka Connect Flink Connector State Management Framework handles source offset management. Connector can’t override this behaviour Handled automatically with default behaviour but connector can override this. Serialization Connectors send data to converters in workers Connectors have to serialize and deserialize Internal data Representation Custom objects: SourceRecord and SinkRecord User defined Delivery Semantics (Best possible) Source: Exactly once Sink: At Least Once Source: Exactly once Sink: Exactly Once Summary 18
TableAPI / SQL in Flink DynamicTableFactory identifier() { return “kafka”; } DynamicTableSource FlinkSource DynamicTableSink FlinkSink 19
Real time database Replication Use cases of Connectors and Kafka Integrating multiple systems with Kafka Capture CDC events from your source system Database Lookups Real time Analytics Enrichment Joins Real time Materialised views 20
Q/A
References 1. FLIP-27: https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface 2. FLIP-143: https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API 3. Example implementation of Datadog Flink Source https://github.com/sambhav-jain-16/flink/tree/datadog-source--flink-poc/flink-connectors/flink-co nnector-datadog/src/main/java/org/apache/flink/connector/datadog/source Special Thanks: 1. Anupam Aggarwal 2. Suchi Amalapurapu 22 LinkedIn: Sambhav Jain

Decoding the Data Integration Matrix? - Connectors in Kafka Connect and Flink