heartsavior / spark-sql-kafka-offset-committer   0.2.0

Apache License 2.0 GitHub

Kafka offset committer for structured streaming query

Scala versions: 2.12 2.11

Kafka offset committer for Spark structured streaming

CircleCI

Kafka offset committer helps structured streaming query which uses Kafka Data Source to commit offsets which batch has been processed.

This project is not for replacing checkpoint mechanism of Spark with Kafka's one. To provide full of "fault-tolerance" semantic, Spark has to take 100% of control of manipulating checkpoint, and Kafka data source is no exception. This project can be used to leverage Kafka ecosystem tools to track the committed offsets on Spark checkpoint, which is not possible solely with Spark.

This project is inspired by SPARK-27549, which proposed to add this feature in Spark codebase, but the decision was taken as not include to Spark. You can call this project as a "follow-up" of SPARK-27549. This project is also inspired by Spark Atlas Connector - SAC leverages Scala reflection to extract topic information from query execution. Kafka offset committer uses the same approach to extract Kafka parameters. Credits to everyone involved SPARK-27549 & SAC.

Supported versions

Both Spark 3.0.x and 2.4.x is supported: it only means you should use these versions when using this project.

The project provides cross-compile for Scala 2.11 and 2.12 (thanks @redsk!) for Spark 2.4.x; please pick the right artifact for your Scala version.

Spark version Scala versions artifact version
2.4.x 2.11 / 2.12 0.4.0-spark-2.4
3.0.x 2.12 0.4.0-spark-3.0

How to import

Add this to your maven pom.xml file. If you're using other builds like groovy or sbt or so, please import the artifact accordingly; groupId: net.heartsavior.spark, artifactId: spark-sql-kafka-offset-committer_<scala_version>.

Please replace {{...}} with content in above matrix:

<dependency> <groupId>net.heartsavior.spark</groupId> <artifactId>spark-sql-kafka-offset-committer_{{scala_version}}</artifactId> <version>{{artifact_version}}</version> </dependency> 

You can dynamically include jar file while submitting, via leveraging --packages option. --packages net.heartsavior.spark:spark-sql-kafka-offset-committer:0.1.0. You may want to add --conf spark.sql.streaming.streamingQueryListeners=net.heartsavior.spark.KafkaOffsetCommitterListener as well, since you're dynamically adding the jar, hence the class is not accessible in your uber jar.

How to use

Kafka offset committer is implemented as StreamingQueryListener. There're two approaches to enable streaming query listener:

  1. Attach the instance of KafkaOffsetCommitterListener via below:
val listener = new KafkaOffsetCommitterListener() spark.streams.addListener(listener)
  1. Add net.heartsavior.spark.KafkaOffsetCommitterListener to the value of spark.sql.streaming.streamingQueryListeners in your Spark config. (The value is separated by , so you can add multiple listeners if you have any other listeners.)

Once the listener is set, you can add special option to Kafka data source options so that Kafka committer can see the groupId to commit:

spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribePattern", "topic[1-3]") .option("startingOffsets", "earliest") .option("kafka.consumer.commit.groupid", "groupId1") .load()

"kafka.consumer.commit.groupid" is the new config to specify consumer group ID to commit. Manually specifying consumer group ID is needed, because Spark will assign unique consumer group ID to avoid multiple queries being conflicted to each other. This also means, you may want to thoughtfully set the option and decide the name of group ID so that multiple queries don't use the same group ID for committing.

Due to technical reason, the project uses reflection to extract options from query execution. Given we intercept Kafka parameters instead of source options of DataSource, adding "kafka." to option key is necessary and it brings unintended warning messages from Kafka side. (Sorry!) You can adjust your log4j config to hide the warning messages.

Here's an example of command to run spark-shell with kafka committer listener being set, and simple query to read from Kafka topics and write to Kafka topic.

command

./bin/spark-shell --master "local[3]" --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 --jars ./spark-sql-kafka-offset-committer-0.1.0-SNAPSHOT.jar --conf spark.sql.streaming.streamingQueryListeners=net.heartsavior.spark.KafkaOffsetCommitterListener 

query

val bootstrapServers = "localhost:9092" val checkpointLocation = "/tmp/mykafkaaaaaaa" val sourceTopics = Seq("truck_events_stream").mkString(",") val sourceTopics2 = Seq("truck_speed_events_stream").mkString(",") val targetTopic = "sparksinkstreaming" val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", sourceTopics).option("startingOffsets", "earliest").option("kafka.consumer.commit.groupid", "spark-sql-kafka-offset-committer-test-1").load() val df2 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", sourceTopics2).option("startingOffsets", "earliest").option("kafka.consumer.commit.groupid", "spark-sql-kafka-offset-committer-test-1").load() val query = df.union(df2).writeStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("checkpointLocation", checkpointLocation).option("topic", targetTopic).option("kafka.atlas.cluster.name", "sink").start()

result

$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group spark-sql-kafka-offset-committer-test-1 Consumer group 'spark-sql-kafka-offset-committer-test-1' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID truck_speed_events_stream 5 844553 844577 24 - - - truck_speed_events_stream 2 675521 675540 19 - - - truck_speed_events_stream 6 168828 168833 5 - - - truck_speed_events_stream 3 337819 337827 8 - - - truck_speed_events_stream 7 675566 675585 19 - - - truck_speed_events_stream 4 168914 168919 5 - - - truck_speed_events_stream 0 168894 168899 5 - - - truck_speed_events_stream 8 675570 675589 19 - - - truck_speed_events_stream 1 168917 168922 5 - - - truck_events_stream 0 3884586 3884695 109 - - - truck_speed_events_stream 9 0 0 0 - - - 

After stopping ingestion of records and waiting for query to fully process the records:

$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group spark-sql-kafka-offset-committer-test-1 Consumer group 'spark-sql-kafka-offset-committer-test-1' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID truck_speed_events_stream 5 856338 856338 0 - - - truck_speed_events_stream 2 684958 684958 0 - - - truck_speed_events_stream 6 171186 171186 0 - - - truck_speed_events_stream 3 342534 342534 0 - - - truck_speed_events_stream 7 684998 684998 0 - - - truck_speed_events_stream 4 171272 171272 0 - - - truck_speed_events_stream 0 171255 171255 0 - - - truck_speed_events_stream 8 684999 684999 0 - - - truck_speed_events_stream 1 171276 171276 0 - - - truck_events_stream 0 3938820 3938820 0 - - - truck_speed_events_stream 9 0 0 0 - - - 

License

Copyright 2019-2021 Jungtaek Lim "[email protected]"

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.