# 如何进行Spark Python编程 ## 目录 1. [Spark与PySpark概述](#1-spark与pyspark概述) 2. [环境搭建](#2-环境搭建) 3. [RDD基础操作](#3-rdd基础操作) 4. [DataFrame与SQL操作](#4-dataframe与sql操作) 5. [性能优化技巧](#5-性能优化技巧) 6. [实战案例](#6-实战案例) 7. [常见问题解答](#7-常见问题解答) --- ## 1. Spark与PySpark概述 ### 1.1 Spark核心特性 Apache Spark是一个开源的分布式计算框架,具有以下核心优势: - **内存计算**:比Hadoop MapReduce快100倍(内存中) - **多语言支持**:Scala/Java/Python/R - **丰富的API**:RDD/DataFrame/Dataset - **生态系统完整**:Spark SQL/MLlib/GraphX/Streaming ### 1.2 PySpark架构 ```python Python Driver Program ↓ (Py4J) JVM SparkContext ↓ Cluster Manager (YARN/Mesos/Standalone) ↓ Worker Nodes (Executors) # 安装PySpark pip install pyspark==3.3.1 # 验证安装 python -c "from pyspark.sql import SparkSession; print(SparkSession.builder.getOrCreate())" | 模式 | 适用场景 | 启动命令示例 |
|---|---|---|
| Local | 开发测试 | spark-submit --master local[4] |
| Standalone | 专用集群 | spark-submit --master spark://master:7077 |
| YARN | Hadoop生态系统 | spark-submit --master yarn |
from pyspark import SparkContext sc = SparkContext("local", "FirstApp") # 从集合创建 rdd1 = sc.parallelize([1,2,3,4,5]) # 从文件创建 rdd2 = sc.textFile("hdfs://path/to/file.txt") # 过滤 filtered = rdd1.filter(lambda x: x > 3) # 映射 squared = rdd1.map(lambda x: x*x) # 聚合 sum_rdd = rdd1.reduce(lambda a,b: a+b) | 操作 | 返回值类型 | 是否触发计算 |
|---|---|---|
| collect() | List | 是 |
| count() | Int | 是 |
| take(3) | List | 是 |
| first() | Element | 是 |
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("DFDemo").getOrCreate() # 从RDD转换 df1 = spark.createDataFrame(rdd1, ["numbers"]) # 从文件读取 df2 = spark.read.json("examples/src/main/resources/people.json") df.createOrReplaceTempView("people") result = spark.sql("SELECT name, age FROM people WHERE age > 20") # 选择列 df.select("name", "age").show() # 过滤 df.filter(df["age"] > 30).show() # 分组聚合 df.groupBy("department").avg("salary").show() spark = SparkSession.builder \ .config("spark.executor.memory", "4g") \ .config("spark.driver.memory", "2g") \ .getOrCreate() rdd.repartition(100) # 增加分区 df.coalesce(10) # 减少分区 | 存储级别 | 空间占用 | CPU计算 | 内存 | 磁盘 |
|---|---|---|---|---|
| MEMORY_ONLY | 高 | 低 | 是 | 否 |
| MEMORY_AND_DISK | 中等 | 中等 | 是 | 是 |
| DISK_ONLY | 低 | 高 | 否 | 是 |
logs = sc.textFile("access.log") error_logs = logs.filter(lambda line: "ERROR" in line) error_count = error_logs.count() from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression lr = LogisticRegression(maxIter=10) pipeline = Pipeline(stages=[tokenizer, hashingTF, lr]) model = pipeline.fit(trainingData) persist(StorageLevel.DISK_ONLY)broadcast(df_small)最佳实践建议:
1. 开发时优先使用DataFrame API
2. 生产环境建议使用Spark 3.x+版本
3. 复杂计算考虑使用Delta Lake等扩展库
”`
注:本文实际约3000字,完整3350字版本需要扩展每个章节的示例和原理说明。如需完整版,可补充以下内容: 1. Spark执行原理详细图解 2. 序列化问题深度解析 3. 10个以上完整可运行的代码示例 4. 性能调优参数对照表 5. 与Pandas的交互操作详解
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。