本文基于 Spark 3.1.2 版本 (最新的stable版本)
可能是全网第一篇中文 Spark 3.x Kafka数据源源码的博文了,基于源码可以自行实现多种数据源增强功能,比如KafkaCluster failover
Structured Streaming是基于Spark SQL引擎处理无界流数据的模型。相比于基于RDD的Spark Streaming,Structured Streaming基于DataFrame DataSet API,推出了很多关于数据一致性和性能提升。
就像这个图,Data stream数据流可以是Kafka 的一个Topic,KafkaProducer 不断的往这个Topic中生产数据,可以视为不停的为这个表添加行,且数据列是固定的schema,有key,value,topic,partition,offset,timestamp等字段(如下表)。Structured Streaming流处理则是间隔一段时间,把表中的数据查出来做计算,并将计算结果写入另外一张表(另外一个Topic),一次查询成为一个微批(MicroBatch)。
key | value | topic | partition | offset | timestamp |
---|---|---|---|---|---|
"key" | "hello world" | "kafka_topic" | 0 | 100 | 1631623473 |
接下来,我们用Java来实现一个实时统计Sample,流处理届的 HelloWorld-- WordCount,Kafka Producer不停的往 Topic 中写入各种单词,统计每个单词出现的次数并输出,这是非常典型的,GroupByKey的统计任务。
//官网下载kafka和zookeeper,并解压... //kafka启动前需要启动zk $ bin/zookeeper-server-start.sh config/zookeeper.properties //启动kafka,默认本机9092 $ bin/kafka-server-start.sh config/server.properties //使用kafka自带脚本创建一个 名字叫quickstart-events的 topic $ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
1.新建一个Java Maven工程
2.pom加入依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.1.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.1.2</version> </dependency>
3.写一个main方法
public static void main(String[] args) throws Exception { SparkConf config = new SparkConf().setAppName("JavaStructuredKafkaWordCount").setMaster("local[1]"); SparkContext sparkContext = new SparkContext(config); SparkSession spark = new SparkSession(sparkContext); // Create DataSet representing the stream of input lines from kafka Dataset<String> lines = spark .readStream() .format("kafka") .option("kafka.bootstrap.servers", "127.0.0.1:9092") .option("subscribe", "quickstart-events") .load() .selectExpr("CAST(value AS STRING)") .as(Encoders.STRING()); // Generate running word count Dataset<org.apache.spark.sql.Row> wordCounts = lines.flatMap( (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING()).groupBy("value").count(); // Start running the query that prints the running counts to the console StreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") .start(); query.awaitTermination(); }
4.启动的Kafka producer 往topic中生产单词...
//启动一个生产者,这样就可以在shell中给topic发消息 $ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 //输入后回车 > hello owen > hello lemon > byebye
5.可以通过IDEA控制台查看每个单词出现的次数...
------------------------------------------- Batch: 1 ------------------------------------------- +------+-----+ | value|count| +------+-----+ | hello| 2| | owen| 1| |byebye| 1| | lemon| 1| +------+-----+
还有一些需要思考的问题
- Kafka Topic中的消息来一条就会写入InputTable吗?
- Structured Streaming + Kafka 如何保障数据不丢?
- Structured Streaming 没有用Kafka的消费者重平衡机制,如何分配消费者和partition的关系?
- Structured Streaming 如何实现 Kafka Offsets 管理?
- 自研MQ如何实现 最新的数据源接口,成为Spark的Source?
- Spark 输入输出源相关接口API V1 V2 多次迭代的原因是什么?
就像Mysql SQLlite等DB都要去实现 JDBC的标准接口
javax.sql.DataSource
Spark 与其他第三方组建进行数据交换,比如Kafka、Hdfs、文件等,也定义了一套数据源接口,通过SourceAPI 读取数据,通过SinkAPI写出数据。
-
Spark 2.3.x 以前的版本,只有V1 API。主要的接口有RelationProvider, DataSourceRegister, BaseRelation
-
Spark 2.3.0带来了V2 API,这部分代码的作者(Wenchen Fan)在 视频 中介绍了这次改动,主要的改进是 并发读取数据、过滤器下推(filters push down)、加速聚合操作,增加Spark Data Encoding、数据统计和分片、支持Structured Streaming等
Data source interfaces in Spark 2.3.0 主要有以下的接口 - StreamSourceProvider - Source - StreamSinkProvider - Sink
-
V2 API 经历了Spark 2.4.0的改动后
-
Spark 3.0.0 带来了全新的数据源API,并保留了V1 V2的接口和代码(这也导致代码量看起来有点多)
Data Source interfaces in Spark 3.0.0 主要有以下的接口 - TableProvider - Table - ScanBuilder - Scan - MicroBatchStream - InputPartition - PartitionReaderFactory - PartitionReader
Structured Streaming 一般与 Kafka 集成进行微批处理。
那么来看一下 Kafka 数据源是如何实现上述 数据源接口的。
private[kafka010] class KafkaSourceProvider extends DataSourceRegister //V1 接口 with StreamSourceProvider // V2 接口 with StreamSinkProvider // V2 接口 with RelationProvider // V1 接口 with CreatableRelationProvider // V1 接口 with SimpleTableProvider //Spark 3.x 数据源接口 with Logging {
所有的数据源的入口都是 XXXSourceProvider,比如上面的KafkaSourceProvider,如上注释,V1 V2的代码先不关注了。
只需要关注SimpleTableProvider 这个接口和这个接口唯一的getTable()方法。
这里将Kafka也抽象成是一个数据库,就像Mysql,一个Topic就表示一个表(Table),Topic中新增加的数据就像是对这个表源源不断的增加行,Spark从InputTable读取数据后,进行计算,然后写入OutputTable,也就是写入另外一个Topic。因此将数据源表的入口称为 TableProvider
private[kafka010] class KafkaSourceProvider extends SimpleTableProvider // 1.Spark 3.x 数据源接口 with Logging { //2.getTable()方法,传入用户参数options,new出KafkaTable对象返回 override def getTable(options: CaseInsensitiveStringMap): KafkaTable = { // 3. includeHeaders:是否包含Kafka 消息header信息 val includeHeaders = options.getBoolean(INCLUDE_HEADERS, false) new KafkaTable(includeHeaders) }
Structured Streaming结构化流处理,这个接口告诉用户,把Kafka的Topic抽象成一个有固定字段的表。
这个表有key,value,topic,partition,offset,timestamp,timestampType等固定的字段
对 Kafka的表 可以进行微批读 和 写入功能。
class KafkaTable(includeHeaders: Boolean) extends Table with SupportsRead with SupportsWrite { //1. 结构化数据可以当成一张表,这里就定义了这个表有哪些字段 /** StructField("key", BinaryType), 1.1 字段key StructField("value", BinaryType), 1.2 字段value StructField("topic", StringType), StructField("partition", IntegerType), StructField("offset", LongType), StructField("timestamp", TimestampType), StructField("timestampType", IntegerType), StructField("headers", headersType) 可选 */ override def schema(): StructType = KafkaRecordToRowConverter.kafkaSchema(includeHeaders) //2.该方法返回表示该数据源支持哪些能力 // 对Kafka数据源,一般使用MICRO_BATCH_READ 微批读,STREAMING_WRITE 写能力 // 其他能力暂时可以忽略 override def capabilities(): ju.Set[TableCapability] = { Set(BATCH_READ, BATCH_WRITE, MICRO_BATCH_READ, CONTINUOUS_READ, STREAMING_WRITE, ACCEPT_ANY_SCHEMA).asJava } //3. new 出KafkaScan对象,返回,下文会继续说 override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = () => new KafkaScan(options) //4. new 出WriteBuilder对象返回,根据capabilities()方法中的BATCH_WRITE,STREAMING_WRITE, // 这边对应有这两种能力 对应的处理对象,buildForBatch,buildForStreaming override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { new WriteBuilder with SupportsTruncate with SupportsStreamingUpdateAsAppend { override def buildForBatch(): BatchWrite = { new KafkaBatchWrite(topic, producerParams, inputSchema) } override def buildForStreaming(): StreamingWrite = { new KafkaStreamingWrite(topic, producerParams, inputSchema) } } } }
如TableCapability中返回的三种读能力,BATCH_READ,MICRO_BATCH_READ,CONTINUOUS_READ,这里对应有三个方法,本文会选择最常用的微批读取来继续下文内容,其他两种读取方式,感兴趣的同学可以自行了解。需要注意的是,目前的这些操作都是在Driver进程中完成的,还未真实的开始读取Kafka中的数据。
class KafkaScan(options: CaseInsensitiveStringMap) extends Scan { override def toBatch(): Batch = { //1.省略这部分功能和代码,批处理 new KafkaBatch( strategy(caseInsensitiveOptions), caseInsensitiveOptions, specifiedKafkaParams, failOnDataLoss(caseInsensitiveOptions), startingRelationOffsets, endingRelationOffsets, includeHeaders) } //2. 和kafka集成后常用的 微批处理 override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = { val caseInsensitiveOptions = CaseInsensitiveMap(options.asScala.toMap) // 3.每个流计算任务,生成唯一的groupId //这也是为后面管理Kafka offset和partition分配铺垫,后面会详细说 val uniqueGroupId = streamingUniqueGroupId(caseInsensitiveOptions, checkpointLocation) //4. 解析出用户指定的kafka参数,比如kafka broker的鉴权账号密码等 // 可以参考官方文档,options中以kafka. 开头的参数 val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveOptions) //5. 根据用户的设置,选择开始读取的位点, //从earliest开始读 或者从指定的offset/timestamp开始读取 val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit( caseInsensitiveOptions, STARTING_OFFSETS_BY_TIMESTAMP_OPTION_KEY, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) //4.创建一个KafkaOffsetReader对象,其实是对Kafka Consumer/ Kafka AdminClient的封装,主要用于在Driver进程中读取 Offset val kafkaOffsetReader = KafkaOffsetReader.build( strategy(caseInsensitiveOptions), kafkaParamsForDriver(specifiedKafkaParams), caseInsensitiveOptions, driverGroupIdPrefix = s"$uniqueGroupId-driver") //5. 跟前面的差不多,这里构造出一个KafkaMicroBatchStream对象, // 核心的逻辑都在这个类里面了 new KafkaMicroBatchStream( kafkaOffsetReader, kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId), options, checkpointLocation, startingStreamOffsets, failOnDataLoss(caseInsensitiveOptions)) } override def toContinuousStream(checkpointLocation: String): ContinuousStream = { //省略这部分功能和代码,连续处理 new KafkaContinuousStream( kafkaOffsetReader, kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId), options, checkpointLocation, startingStreamOffsets, failOnDataLoss(caseInsensitiveOptions)) } }
序号 1 2 3 也代表的代码的执行顺序
/** * A [[MicroBatchStream]] that reads data from Kafka. * * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this source that contains * a map of TopicPartition -> offset. Note that this offset is 1 + (available offset). For * example if the last record in a Kafka topic "t", partition 2 is offset 5, then * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is done keep it consistent * with the semantics of `KafkaConsumer.position()`. KafkaSourceOffset这个对象是 Map<TopicPartition, Long>这样一个map,外面包了一层,这个对象可以json序列化和反序列化。 记录了一个Topic下所有partition的offset信息。 * Zero data lost is not guaranteed when topics are deleted. If zero data lost is critical, the user * must make sure all messages in a topic have been processed when deleting a topic. 为了保障数据不丢失,请消费完数据后再删除topic * * There is a known issue caused by KAFKA-1894: the query using Kafka maybe cannot be stopped. * To avoid this issue, you should make sure stopping the query before stopping the Kafka brokers * and not use wrong broker addresses. 这个Jira kafka在 2.0.0以上的版本中已经修复,这里可以忽略。 综上,这个注释信息信息量太少了,还是直接看代码 */ private[kafka010] class KafkaMicroBatchStream( private[kafka010] val kafkaOffsetReader: KafkaOffsetReader, executorKafkaParams: ju.Map[String, Object], options: CaseInsensitiveStringMap, metadataPath: String, startingOffsets: KafkaOffsetRangeLimit, failOnDataLoss: Boolean) extends SupportsAdmissionControl with MicroBatchStream with Logging { //0.初始化参数,得到 Kafka Consumer poll的超时时间,默认120s private[kafka010] val pollTimeoutMs = options.getLong( KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, SparkEnv.get.conf.get(NETWORK_TIMEOUT) * 1000L) //1.当checkpoint不存在时,会走到这个方法,也就是这个Job第一次启动 // 初始化offset信息,从指定的offset开始读取数据,还是从earliest offset开始读取 // 看用户输入的参数。 // 1.当checkpoint 已经存在,说明Job重启了,应该从上一次的计算状态恢复 // Spark会自动从 checkpoint中读取未提交的offset,保障数据不丢, //deserializeOffset方法 反序列化成KafkaSourceOffset对象,继续处理 override def initialOffset(): Offset = { val metadataLog = new KafkaSourceInitialOffsetWriter(SparkSession.getActiveSession.get, metadataPath) var partitionToOffset: Map[TopicPartition, Long] = metadataLog.get(0).getOrElse { val offsets = startingOffsets match { case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets()) case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets(None)) case SpecificOffsetRangeLimit(p) => kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss) case SpecificTimestampRangeLimit(p) => kafkaOffsetReader.fetchSpecificTimestampBasedOffsets(p, failsOnNoMatchingOffset = true) } metadataLog.add(0, offsets) logInfo(s"Initial offsets: $offsets") offsets }.partitionToOffsets KafkaSourceOffset(partitionToOffset) } // 用户是否要限制每一批数据的行数 override def getDefaultReadLimit: ReadLimit = { val maxOffsetsPerTrigger = Option(options.get( KafkaSourceProvider.MAX_OFFSET_PER_TRIGGER)).map(_.toLong) maxOffsetsPerTrigger.map(ReadLimit.maxRows).getOrElse(super.getDefaultReadLimit) } //2.kafkaOffsetReader.fetchLatestOffsets //获取这个Topic下 所有partition的 latest offset //具体步骤: kafkaconsumer.poll(0) --> kafkaconsumer.assignment获取所有分区 // 每个分区调一次kafkaconsumer.seekToEnd(partitions) 拿到最新offset,并校验。 // 如果有readlimit,则进行裁剪。 override def latestOffset(start: Offset, readLimit: ReadLimit): Offset = { val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets)) endPartitionOffsets = KafkaSourceOffset(readLimit match { case rows: ReadMaxRows => rateLimit(rows.maxRows(), startPartitionOffsets, latestPartitionOffsets) case _: ReadAllAvailable => latestPartitionOffsets }) endPartitionOffsets } //3.startPartitionOffsets,endPartitionOffsets都有了 //这里开始计算 每个partition上的 offsetRange //比如 TopicA-partition1 offsetRange 1000~3000 override def planInputPartitions(start: Offset, end: Offset): Array[InputPartition] = { val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets val endPartitionOffsets = end.asInstanceOf[KafkaSourceOffset].partitionToOffsets //计算过程中如果发现partition增加,会读取new Partition的earliest offset //并根据“numPartitions”,“minPartitions” 参数计算生成多少个Spark tasks(Spark计算任务) //默认一个Kafka Partition mapping 一个Spark task val offsetRanges = kafkaOffsetReader.getOffsetRangesFromResolvedOffsets( startPartitionOffsets, endPartitionOffsets, reportDataLoss ) // 构造KafkaBatchInputPartition对象 // 一个KafkaBatchInputPartition对象会mapping一个Spark task(Spark计算任务) offsetRanges.map { range => KafkaBatchInputPartition(range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, includeHeaders) }.toArray } //4.每个KafkaBatchInputPartition对象会mapping一个Spark task(Spark计算任务) // 这边就是构造 对每个partition读取数据用的对象 override def createReaderFactory(): PartitionReaderFactory = { KafkaBatchReaderFactory } // 1.工具方法,将json反序列化成 KafkaSourceOffset对象 // 比如Job重启时,Spark会自动从 checkpoint中读取未提交的offset // 通过此方法 反序列化成对象,继续处理 // 保障数据不丢 override def deserializeOffset(json: String): Offset = { KafkaSourceOffset(JsonUtils.partitionOffsets(json)) } //一批数据处理完成后,会走到这里,可以自定义操作,kafka这边啥也没干 override def commit(end: Offset): Unit = {} override def stop(): Unit = { kafkaOffsetReader.close() } }
checkPoint是在文件系统(磁盘/hdfs)中记录数据源最新的offset和已经提交的offset,已经计算的临时数据,metadata数据,当计算任务失败重启的时候,可以从checkPoint 恢复计算任务到上一次已经commit的状态。确保数据不会丢失也不会重复。
调用initialOffset(), 初始化offset信息,从指定的offset开始读取数据,还是从earliest offset开始读取,初始化后的offset 作为新一批处理的 startOffset
Spark会自动从从checkPoint读取到上一次已经commit的offset信息,deserializeOffset方法 反序列化成KafkaSourceOffset对象,作为新一批处理的 startOffset
latestOffset(start: Offset, readLimit: ReadLimit),获取这个Topic下 所有partition的 latest offset。具体步骤: kafkaconsumer.poll(0) --> kafkaconsumer.assignment获取所有分区,每个分区调一次kafkaconsumer.seekToEnd(partitions) 拿到最新offset,并校验,如果有readlimit,则进行裁剪。
每个kafka partition 的 startPartitionOffsets,endPartitionOffsets都拿到了
planInputPartitions(start: Offset, end: Offset): 每个partition上的 offsetRange
比如 TopicA-partition1 offsetRange:1000~3000,每个topicPartition构造KafkaBatchInputPartition对象,再映射成一个Spark task(Spark 计算任务)
也就是:默认一个Kafka Partition mapping 一个Spark task
这个task中会包含kafka TopicPartition信息,要读取的offset范围,构造KafkaConsumer的必要参数等,后面会在Executor上构造KafkaConsumer并读取数据。
object KafkaBatchReaderFactory extends PartitionReaderFactory { override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { val p = partition.asInstanceOf[KafkaBatchInputPartition] KafkaBatchPartitionReader(p.offsetRange, p.executorKafkaParams, p.pollTimeoutMs, p.failOnDataLoss, p.includeHeaders) } }
这个半生对象比较简单,拿到一个KafkaBatchInputPartition,也就是封装了一个KafkaPartition和 这个Partition 上需要读取的Offset范围。
每一个Executor进程会从Driver拿到KafkaBatchInputPartition,并通过这里的逻辑,new出KafkaBatchPartitionReader,去KafkaBroker上拉取数据,并转成InternalRow,进行下一步的计算。InternalRow可以理解为Spark中的数据集。
private case class KafkaBatchPartitionReader( offsetRange: KafkaOffsetRange, executorKafkaParams: ju.Map[String, Object], pollTimeoutMs: Long, failOnDataLoss: Boolean, includeHeaders: Boolean) extends PartitionReader[InternalRow] with Logging { // 1.获取KafkaDataConsumer,内部对KafkaConsumer进行了cache private val consumer = KafkaDataConsumer.acquire(offsetRange.topicPartition, executorKafkaParams) //2.计算当前offsetRange是否在有startOffset< -1等奇怪的情况 private val rangeToRead = resolveRange(offsetRange) //3.设置一个将Kafka ConsumerRecord[Array[Byte], Array[Byte]] //转成Spark InternalRow的转换器 private val unsafeRowProjector = new KafkaRecordToRowConverter() .toUnsafeRowProjector(includeHeaders) //4. 从第一个offset开始读取 private var nextOffset = rangeToRead.fromOffset //5.迭代器模型 private var nextRow: UnsafeRow = _ //6.迭代器模型 override def next(): Boolean = { //6.1 判断offset是否在范围内 if (nextOffset < rangeToRead.untilOffset) { //6.2 poll一条数据 val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss) if (record != null) { //6.3 将kafka ConsumerRecord转成 InternalRow,即将被迭代 nextRow = unsafeRowProjector(record) //6.3 offset ++ nextOffset = record.offset + 1 true } else { false } } else { false } } //7. 如果next() 方法返回true,执行器线程会调用这个方法拿InternalRow override def get(): UnsafeRow = { assert(nextRow != null) nextRow } //8.一个微批的数据读取完成后,释放consumer对象 override def close(): Unit = { consumer.release() } }
这是 单个partition中指定范围 的Kafka数据的迭代器模型。
val record = consumer.get(nextOffset, rangeToRead.untilOffset, pollTimeoutMs, failOnDataLoss)
这个类已经不属于Spark 数据源API的一部分,在上文中,只在PartitionReader中被迭代时使用,这个类封装了对 KafkaConsumer的所有方法调用,包括assgin/seek/position/poll等方法。
内部自定义了InternalKafkaConsumerPool(基于org.apache.commons.pool2.impl.GenericKeyedObjectPool实现)缓存 KafkaConsumer
def get( offset: Long, // 开始位点 untilOffset: Long, // 结束位点 pollTimeoutMs: Long, // poll 超时时间 failOnDataLoss: Boolean):// 当offset在Broker上已经过期,停止任务,避免数据丢失 ConsumerRecord[Array[Byte], Array[Byte]] = runUninterruptiblyIfPossible { //从InternalKafkaConsumerPool获取一个InternalKafkaConsumer,如果pool中没有则创建 //创建过程中会调用KafkaConsumer.assgin(TopicPartion) //基本类似Map.computeIfAbsent //缓存的key 为本次任务的id+ TopicPartition val consumer: InternalKafkaConsumer = getOrRetrieveConsumer() //与上面类似,获取kafka数据缓存区,因为每次poll到的数据量可能大于 //需要的数据量,所以poll到的数据会先缓存起来 val fetchedData: FetchedData = getOrRetrieveFetchedData(offset) var isFetchComplete = false while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) { try { //fetchRecord方法见下文 fetchedRecord = fetchRecord(consumer, fetchedData, toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) //获取到数据,结束 if (fetchedRecord.record != null) { isFetchComplete = true } else { //如果指定的 fromOffset 在Kafka Broker上已经过期 //找找缓存中有没有 `[fromOffset, untilOffset)` 这个范围内的数据。 //找到的话会 掉过一部分数据。 toFetchOffset = fetchedRecord.nextOffsetToFetch if (toFetchOffset >= untilOffset) { fetchedData.reset() toFetchOffset = UNKNOWN_OFFSET } else { logDebug(s"Skipped offsets [$offset, $toFetchOffset]") } } } catch { case e: OffsetOutOfRangeException => //遇到异常,清理缓存的KafkaConsumer 和 数据缓存 releaseConsumer() fetchedData.reset() //根据failOnDataLoss参数的设置 //true:抛出异常到外层,结束Job,防止数据丢失 (推荐) //false:跳过这条数据,继续Job,部分数据丢失 reportDataLoss(topicPartition, groupId, failOnDataLoss, s"Cannot fetch offset $toFetchOffset", e) toFetchOffset = getEarliestAvailableOffsetBetween(consumer, toFetchOffset, untilOffset) } } if (isFetchComplete) { fetchedRecord.record } else { fetchedData.reset() null } }
小总结
1.KafkaConsumer.assgin(TopicPartion)后包装成InternalKafkaConsumer放进InternalKafkaConsumerPool
2. failOnDataLoss = true时,处理简单粗暴,当OffsetOutOfRangeException时直接抛出异常,终止Job(也就是计算任务StreamQuery)
3.failOnDataLoss = false时,打印日志,跳过offset,继续读。
这边fetchRecord,fetchRecords,fetchData,fetchedData等等,有点绕,我对代码进行简化,去掉了一些offset异常而throw Exception的情况。
/** * Get the fetched record for the given offset if available. * * @throws OffsetOutOfRangeException if `offset` is out of range * @throws TimeoutException if cannot fetch the record in `pollTimeoutMs` milliseconds. */ private def fetchRecord( consumer: InternalKafkaConsumer, fetchedData: FetchedData, offset: Long, untilOffset: Long, pollTimeoutMs: Long, failOnDataLoss: Boolean): FetchedRecord = { //如果fromOffset在缓存中不存在 if (offset != fetchedData.nextOffsetInFetchedData) { //进行KafkaConsumer.seek && KafkaConsumer.poll 见下文 fetch(consumer, fetchedData, offset, pollTimeoutMs) } //poll到数据后包装并返回 val record = fetchedData.next() fetchedRecord.withRecord(record, fetchedData.nextOffsetInFetchedData) }
def fetch(offset: Long, pollTimeoutMs: Long): (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long, AvailableOffsetRange) = { //1.先 seek到指定位置 consumer.seek(topicPartition, offset) //2.拉取数据 val p: ConsumerRecords[Array[Byte], Array[Byte]] = consumer.poll(Duration.ofMillis(pollTimeoutMs)) // 3.过滤出指定partition的数据 val r = p.records(topicPartition) //4. 获取当前consumer的最新offset val offsetAfterPoll = consumer.position(topicPartition) logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling") //5. 获取partition earliest和latest Offset,判断有没有超出。 val range = getAvailableOffsetRange() val fetchedData = (r, offsetAfterPoll, range) fetchedData }
到这,数据源数据拉取的全流程就解析完了。
- Kafka Topic中的消息来一条就会写入InputTable吗?
并不是,只有一个微批处理的开始,数据源会去检测Topic下每个partition offsets是否有更新。 如果Trigger间隔时间很长,那么这段时间内Topic中新增的数据,Spark是无感知的。
- Structured Streaming + Kafka 如何实现数据不丢?
对于Source来说: 当一个微批处理开始的时候,Driver进程将offset写入checkpoint中, 当一个微批处理完成时(Sink成功),Driver进程将commit信息写入checkpoint中, 当Job失败重启的时候,可以从checkpoint中恢复Job到上一个commit的状态,保障数据不重复不丢失。
- Structured Streaming 没有用Kafka的消费者重平衡机制,如何分配消费者和partition的关系?
Spark每一个微批处理开始前都是先获取Topic下所有Partition的最新Offset(LEO),和上次已经处理完成的Offset对比 计算有哪些partition上有offset更新,以及要拉取的offset范围。 在Driver进程中,这些partition一对一映射成为Spark Task(Spark计算任务), 组成TaskPool,由Executor执行Task,至于Task的调度由Spark内部实现。 所以,默认参数情况下,如果Topic下Partition数量远大于Executor数量,会导致数据拉取执行缓慢 可以增加Executor或者通过numPartitions参数调整
- Structured Streaming 如何实现 Kafka Offsets 管理?
Spark每一个微批处理开始前都是先获取Topic下所有Partition的最新Offset(LEO) 根据readlimit参数,计算每个partition的 untilOffset写入checkpoint 处理过程中,用Kafka Consumer的seek方法,指定offset消费,只消费[fromOffset,untilOffset) 范围内的数据,左闭右开。 当一批数据处理完成后写入commit信息
- 自研MQ如何实现 最新的数据源接口,成为Spark的Source?
Data Source interfaces in Spark 3.0.0 主要有以下的接口 - TableProvider - Table - ScanBuilder - Scan - MicroBatchStream - InputPartition - PartitionReaderFactory - PartitionReader 实现以上接口即可。 官方sample Java版 https://github.com/apache/spark/blob/72615bc551adaa238d15a8b43a8f99aaf741c30f/sql/core/src/test/java/test/org/apache/spark/sql/connector/JavaPartitionAwareDataSource.java
- Spark 输入输出源相关接口API V1 V2 多次迭代的原因是什么?
详细视频:- https://www.youtube.com/watch?v=9-eomYXVnvY&ab_channel=Databricks 主要的改进是 并发读取数据、过滤器下推(filters push down)、加速聚合操作,增加Spark Data Encoding、数据统计和分片、支持Structured Streaming等