# Spark Streaming运行流程是怎样的 ## 一、Spark Streaming概述 Spark Streaming是Apache Spark核心API的扩展,用于构建可扩展、高吞吐量、容错的实时数据流处理系统。它能够将来自Kafka、Flume、Kinesis等数据源的实时数据流进行高效处理,并以微批次(Micro-Batch)的方式进行处理,最终将结果输出到文件系统、数据库或实时仪表盘。 ### 1.1 核心特点 - **微批处理架构**:将实时数据流切分为小批次(通常0.5~2秒),转换为Spark的RDD进行处理 - **Exactly-Once语义**:通过检查点(Checkpoint)和预写日志(WAL)保证数据一致性 - **与Spark生态无缝集成**:可直接复用Spark的机器学习(MLlib)、图计算(GraphX)等能力 - **多语言支持**:提供Scala、Java、Python API ### 1.2 基本概念 | 术语 | 说明 | |---------------|----------------------------------------------------------------------| | DStream | 离散化流(Discretized Stream),Spark Streaming的基础抽象 | | Batch Interval| 批次时间间隔(如1秒),决定微批次的划分粒度 | | Receiver | 数据接收器,负责从外部源获取数据并存储到Spark内存中 | ## 二、系统架构与核心组件 ### 2.1 整体架构 ```mermaid graph TD A[数据源] --> B[Spark Streaming] B --> C{核心组件} C --> D[Receiver] C --> E[DStream] C --> F[JobGenerator] B --> G[输出操作]
StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(1))
JobGenerator
spark.streaming.gracefulStopTimeout
:优雅停止超时时间ReceiverTracker
Receiver
BlockGenerator
class BlockGenerator: def __init__(self): self.currentBuffer = [] self.blockInterval = 200ms # 默认值
创建StreamingContext
JavaStreamingContext jssc = new JavaStreamingContext( new SparkConf().setAppName("NetworkWordCount"), Durations.seconds(1) );
定义输入源(以Socket为例):
val lines = ssc.socketTextStream("localhost", 9999)
构建DStream转换操作:
words = lines.flatMap(lambda line: line.split(" ")) wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b: a+b)
Receiver启动过程:
数据分块存储:
容错机制:
ssc.checkpoint("hdfs://checkpoint_dir")
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
时间窗口划分:
val windowedWordCounts = wordCounts.window(Seconds(30), Seconds(10))
作业生成时序:
timeline title 批次处理时序 section Batch Interval 批次N-1 : 2023-01-01 12:00:00 批次N : 2023-01-01 12:00:01 批次N+1 : 2023-01-01 12:00:02
任务执行阶段:
常见输出方式对比:
输出方式 | 特点 | 示例代码 |
---|---|---|
print() | 调试使用,打印前10条记录 | wordCounts.print() |
saveAsTextFiles() | 保存到HDFS | dstream.saveAsTextFiles("hdfs://output") |
foreachRDD | 灵活自定义输出逻辑 | 见下方代码示例 |
foreachRDD典型用法:
wordCounts.foreachRDD(rdd -> { rdd.foreachPartition(partition -> { // 创建数据库连接 Connection conn = createConnection(); while (partition.hasNext()) { Tuple2<String, Integer> record = partition.next(); // 写入数据库 insertRecord(conn, record); } conn.close(); }); });
Executor配置:
并行度调整:
sc.setLogLevel("WARN") // 减少日志量 ssc.sparkContext.setCheckpointDir("/tmp")
参数名 | 推荐值 | 说明 |
---|---|---|
spark.streaming.blockInterval | 200ms | 数据块生成间隔 |
spark.streaming.receiver.maxRate | 10000 | 单个Receiver最大接收速率(条/秒) |
spark.streaming.backpressure.enabled | true | 启用反压机制 |
数据积压:
spark.dynamicAllocation.enabled=true
Receiver故障:
val kafkaStreams = (1 to 3).map(_ => KafkaUtils.createStream(...)) val unifiedStream = ssc.union(kafkaStreams)
特性 | Spark Streaming | Structured Streaming |
---|---|---|
处理模型 | 微批处理 | 微批/持续处理 |
API层级 | RDD级 | DataFrame/DataSet级 |
事件时间处理 | 需手动实现 | 原生支持 |
graph LR A[Spark Streaming] --> B[兼容模式] B --> C[完全切换]
Spark Streaming通过将流数据离散化为一系列小批次RDD,实现了: 1. 高吞吐量的实时处理能力 2. 与批处理统一编程模型 3. 强大的容错机制
典型应用场景包括: - 实时监控告警系统 - 用户行为实时分析 - IoT设备数据处理
随着Spark 3.0的发布,虽然Structured Streaming成为主流,但理解Spark Streaming的运行机制仍是掌握Spark流处理体系的重要基础。 “`
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。