Copyright©2019 NTT corp. All Rights Reserved. Taming Distributed/Parallel Query Execution Engine of Apache Spark Takeshi Yamamuro, NTT
2Copyright©2019 NTT corp. All Rights Reserved. Who am I?
3Copyright©2019 NTT corp. All Rights Reserved. Notice - https://bit.ly/30Sh4MU
4Copyright©2019 NTT corp. All Rights Reserved. Notice - https://bit.ly/2I7Ymsj
5Copyright©2019 NTT corp. All Rights Reserved. Apache Spark 2015 Year In Review, https://bit.ly/2JaG0He • AMPLab@UCBの成果で,2012年にOSSとして公開 された汎⽤的な分散・並列処理フレームワーク • 現在の最新がv2.4.3,今年の下期にv3.0がリリース予定 • 代表的な特徴はユーザが使いやすいAPI,外部データと の連携,内部での⾼度な最適化 Whatʼs Spark?
6Copyright©2019 NTT corp. All Rights Reserved. • conda経由でSpark v2.4.3のインストール • テスト実⾏ Quick Start Guide of Spark aa$ conda install –c conda-forge pyspark aa $ cat test.csv 1,a,0.3 2,b,1.7 $ pyspark >>> df = spark.read.csv(‘test.csv’) >>> df.show()
7Copyright©2019 NTT corp. All Rights Reserved. • 本⽇はSparkにおいて主要な以下の3つの要素に関する 概要と個⼈的に興味のある性能の話をします • 1. 遅延評価と関係代数ベースのクエリ最適化 • 2. コード⽣成による実⾏時のための最適化 • 3. RDDベースの分散・並列処理 Todayʼs Talk
8Copyright©2019 NTT corp. All Rights Reserved. • Sparkの実⾏処理系の最新概要に関する発表 • Maryann Xue, Kris Mok, and Xingbo Jiang, A Deep Dive into Query Execution Engine of Spark SQL, https://bit.ly/2HLIbRk • Sparkの性能チューニングに関する発表 • Xiao Li, Understanding Query Plans and Spark UIs, https://bit.ly/2WiOm8x The Other Valuable References
9Copyright©2019 NTT corp. All Rights Reserved. • 個⼈的によく聞くSparkの使⽤⽤途は・・・ • ⾃社のデータがHDFS/S3などの分散ストレージやRDBMSなど のリモート環境に蓄積されており,それらを集約して分析をす る必要がある場合 • ローカル環境にデータはあるがサイズが搭載メモリのものに近 いか,もしくは超えている場合 • 例えばpandasの処理可能なデータサイズは搭載メモリの1/5〜 1/10程度*と⾔われ,⽐較的⼩さい When You Use Spark? * Apache Arrow and the "10 Things I Hate About pandas”, https://bit.ly/2WaSLX8
10Copyright©2019 NTT corp. All Rights Reserved. • PySpark/pandasの性能を単体ノード上で評価 • store_sales(TPC-DS)のサイズを変化させながら使⽤ • 評価⽤のハードウェア環境は32vcores/244GBメモリ • PySpark⽤に16vcores/10GBメモリを使⽤ Performance on a Single Machine Node 引用: Benchmarking Apache Spark on a Single Node Machine, https://bit.ly/2FcnbD0 Q1: SELECT MAX(ss_list_price) FROM store_sales Q2: SELECT COUNT(distinct ss_customer_sk) FROM store_sales Q3: SELECT SUM(ss_net_profit) FROM store_sales GROUP BY ss_store_sk Q1 Q2 Q3 Q2
11Copyright©2019 NTT corp. All Rights Reserved. Spark Internal
12Copyright©2019 NTT corp. All Rights Reserved. Spark Cluster Overview • ユーザが記述したクエリをDriver Programとして起 動し、Sparkの最⼩実⾏単位のTaskに分解,Worker Node上に起動されたExecutorに割り当てることで分 散・並列処理を実現 引用: http://spark.apache.org/docs/latest/cluster-overview.html
13Copyright©2019 NTT corp. All Rights Reserved. • ⼊⼒クエリは最終的にRDD(Resilient Distributed Datasets)のDAGで表現され実⾏ • RDDは唯⼀のデータ操作のためのAPIだが,現在このAPIを直 接ユーザが使⽤することは⾮推奨 • 性能チューニングは,まずDriver側で最適化されたク エリを確認した後,Executor側の実⾏ログ(Skew有 無など)を確認する流れがお勧め From Declarative Queries to RDDs 1. クエリの最適化 2. コード生成 3. RDDによる実行 Driver側 Executor側
14Copyright©2019 NTT corp. All Rights Reserved. 1. QUERY OPTIMIZATION
15Copyright©2019 NTT corp. All Rights Reserved. • 定義したクエリはすぐに実⾏されず,結果が参照された 時にクエリを最適化して実⾏ • クエリが定義された段階で実施されるのは,論理プランの構築 とプランの正しさの検証のみ Lazy Evaluation and Query Planning aa >>> df = spark.read.load('/tmp/t') DataFrame[a: int, b: double, c: string] >>> df1 = df.groupBy(‘a’).avg(‘b’) >>> df2 = df1.where(‘a > 1’) >>> df2.count() 1 結果が参照されたため最適化され,Executor側で実行 Driver側での処理のみ
16Copyright©2019 NTT corp. All Rights Reserved. • 定義したクエリはすぐに実⾏されず,結果が参照された 時にクエリを最適化して実⾏ • クエリが定義された段階で実施されるのは,論理プランの構築 とプランの正しさの検証のみ Lazy Evaluation and Query Planning aa >>> df = spark.read.load('/tmp/t') DataFrame[a: int, b: double, c: string] >>> df1 = df.groupBy(‘a’).avg(‘b’) >>> df2 = df1.where(‘a > 1’) >>> df2.count() 1 >>> df2.explain() == Physical Plan == *(2) HashAggregate(keys=[a#0], functions=[sum(b#1)]) +- Exchange hashpartitioning(a#0, 200) +- *(1) HashAggregate(keys=[a#0], functions=[partial_sum(b#1)]) +- *(1) Project [a#0, b#1] +- *(1) Filter (isnotnull(a#0) && (a#0 > 1)) +- *(1) FileScan parquet [a#0,b#1] Batched: true, Format: Parquet, … whereでの述語処理の順序が変更され最適化
17Copyright©2019 NTT corp. All Rights Reserved. • Spark v2.4.3ではpandasと同様,遅延評価を⾏わず 定義した時に実⾏する動作に変更することも可能 • spark.sql.repl.eagerEval.enabled Eager Mode in PySpark aa >>> df = spark.read.load('/tmp/t') DataFrame[a: int, b: double, c: string] >>> df.groupBy(‘a’).avg(‘b’) DataFrame[a: int, avg(b): double] >>> sql(“SET spark.sql.repl.eagerEval.enabled=true”) >>> df.groupBy(‘a’).avg(‘b’)
18Copyright©2019 NTT corp. All Rights Reserved. • クエリプランを確認してどのように実⾏されたかを正し く理解することは⾮常に重要 • Sparkに限らず,全てのRDBMS的な処理系において • 本⽇の話は時間の関係で,特に重要な⼊⼒部分の最適化 に関連する話題に絞って紹介 • FileScan Optimization • Filter Push Down + Implicit Type Casting • Nested Schema Pruning Understanding Query Plans https://bit.ly/2QFBAMl • クエリプランの理解を深めたい⽅は,右記 の本の16章「The Query Compiler」 などがオススメ
19Copyright©2019 NTT corp. All Rights Reserved. • 物理プランのFileScanノードで重要なのは以下3項⽬ を活⽤して⼊⼒データ量を限定すること FileScan Optimization aa >>> spark.read.load('/tmp/t').explain() == Physical Plan == *(1) FileScan parquet [a#0,b#1,c#2] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/t], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:double,c:string>
20Copyright©2019 NTT corp. All Rights Reserved. FileScan Optimization - PushedFilters aa >>> df2.explain() == Physical Plan == *(2) HashAggregate(keys=[a#0], functions=[sum(b#1)]) +- Exchange hashpartitioning(a#0, 200) +- *(1) HashAggregate(keys=[a#0], functions=[partial_sum(b#1)]) +- *(1) Project [a#0, b#1] +- *(1) Filter (isnotnull(a#0) && (a#0 > 1)) +- *(1) FileScan parquet [a#0,b#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/t], PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,1)], ReadSchema: struct<a:int,b:double> • Filterが隣接している場合,FileScanはその述語を使 ⽤して不要な⾏を読み込まなくても良い • 実際に読み⾶ばすかはData Sourceの実装依存で,Parquetの 場合は最適化を実施
21Copyright©2019 NTT corp. All Rights Reserved. FileScan Optimization - ReadSchema aa >>> df = spark.read.load('/tmp/t') DataFrame[a: int, b: double, c: string] … >>> df2.explain() == Physical Plan == *(2) HashAggregate(keys=[a#0], functions=[sum(b#1)]) +- Exchange hashpartitioning(a#0, 200) +- *(1) HashAggregate(keys=[a#0], functions=[partial_sum(b#1)]) +- *(1) Project [a#0, b#1] +- *(1) Filter (isnotnull(a#0) && (a#0 > 1)) +- *(1) FileScan parquet [a#0,b#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/t], PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,1)], ReadSchema: struct<a:int,b:double> • クエリ内で参照される列がReadSchemaに反映され, FileScanは不要な列を読み込まなくても良い • Parquetの場合は最適化を実施
22Copyright©2019 NTT corp. All Rights Reserved. • パーティション情報を⽤いて隣接するFilterの述語が処 理できる場合,条件を満たすデータのみを読み込む • 以下の例の様にパーティション情報を⽤いて全ての述語が処理 できる場合,物理プランからFilterも除去 FileScan Optimization - PartitionFilters >>> df = spark.range(100).selectExpr('id % 3 AS p', 'id') >>> df.write.partitionBy('p').save('/tmp/t') >>> df.show(5) only showing top 5 rows
23Copyright©2019 NTT corp. All Rights Reserved. • パーティション情報を⽤いて隣接するFilterの述語が処 理できる場合,条件を満たすデータのみを読み込む • 以下の例の様にパーティション情報を⽤いて全ての述語が処理 できる場合,物理プランからFilterも除去 FileScan Optimization - PartitionFilters >>> spark.read.load('/tmp/t').where('p = 0').explain() == Physical Plan == *(1) FileScan parquet [id#0L,p#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/t], PartitionCount: 1, PartitionFilters: [isnotnull(p#1), (p#1 = 0)], PushedFilters: [], ReadSchema: struct<id:bigint> このディレクトリ以下のファイルのみを読み込み
24Copyright©2019 NTT corp. All Rights Reserved. • Filter Push Down + Implicit Type Casting • 型の不⼀致でFileScanのPushedFiltersに反映されないケース があるため注意 The Other Optimization Tips aa >>> df = spark.read.load('/tmp/t') DataFrame[a: int, b: string] >>> df.where('a > 1').explain() == Physical Plan == *(1) Project [a#0, b#1] +- *(1) Filter (isnotnull(a#0) && (a#0 > 1)) +- *(1) FileScan parquet [a#0,b#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/t], PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,1)], ReadSchema: struct<a:int,b:string> 反映されるケース
25Copyright©2019 NTT corp. All Rights Reserved. • Filter Push Down + Implicit Type Casting • 型の不⼀致でFileScanのPushedFiltersに反映されないケース があるため注意 The Other Optimization Tips aa >>> df = spark.read.load('/tmp/t') DataFrame[a: int, b: string] >>> df.where('a > 1.0').explain() == Physical Plan == *(1) Project [a#0, b#1] +- *(1) Filter (isnotnull(a#0) && (cast(a#0 as bigint) > 1)) +- *(1) FileScan parquet [a#0,b#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/t], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:int,b:string> 反映されないケース1
26Copyright©2019 NTT corp. All Rights Reserved. • Filter Push Down + Implicit Type Casting • 型の不⼀致でFileScanのPushedFiltersに反映されないケース があるため注意 The Other Optimization Tips aa >>> df = spark.read.load('/tmp/t') DataFrame[a: int, b: string] >>> df.where('b > 0').explain() == Physical Plan == *(1) Project [a#0, b#1] +- *(1) Filter (isnotnull(b#1) && (cast(b#1 as int) > 0)) +- *(1) FileScan parquet [a#0,b#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/t], PartitionFilters: [], PushedFilters: [IsNotNull(b)], ReadSchema: struct<a:int,b:string> 反映されないケース2
27Copyright©2019 NTT corp. All Rights Reserved. • Nested Schema Pruning • spark.sql.optimizer.nestedSchemaPruning.enabled • v2.4.3ではデフォルトでOFF The Other Optimization Tips aa >>> df = spark.read.load('/tmp/t’) DataFrame[id: bigint, a: struct<c:bigint,d:bigint>] >>> df.select(‘a.c’).explain() == Physical Plan == *(1) Project [a#1.c AS c#10L] +- *(1) FileScan parquet [a#1] Batched: false, Format: Parquet, ReadSchema: struct<a:struct<c:bigint,d:bigint>> # Turns on nested schema pruning >>> sql(“SET spark.sql.optimizer.nestedSchemaPruning.enabled=true”) >>> df.select(‘a.c’).explain() == Physical Plan == *(1) Project [a#1.c AS c#13L] +- *(1) FileScan parquet [a#1] Batched: false, Format: Parquet, ReadSchema: struct<a:struct<c:bigint>>
28Copyright©2019 NTT corp. All Rights Reserved. 2. CODE GENERATION
29Copyright©2019 NTT corp. All Rights Reserved. • 従来のPullベースの処理⽅式はオーバヘッドが⾼く, 計算負荷の⾼いワークロードで⾮効率に • 主要な原因の1つはO(⼊⼒⾏数)回発⽣する物理プラン実装の 仮想関数nextの呼び出しによるオーバヘッド Why Spark Generates Code from Plans? 物理プランのFilterノードの実装例 next next next
30Copyright©2019 NTT corp. All Rights Reserved. • 物理プランから処理が等価な逐次コードを⽣成 • 仮想関数nextの呼び出しの除去 • 中間データがレジスタに残りやすい処理フロー • データ並列化やループ展開などのJITコンパイラ最適化の享受 Why Spark Generates Code from Plans? コード生成
31Copyright©2019 NTT corp. All Rights Reserved. aa Dumping Generated Code • Sparkの⽣成したコードを確認する⽅法 • EXPLAIN結果の先頭にʼ*ʼが付いているノードは内部でコード ⽣成による最適化が適⽤されていることを意味 scala> import org.apache.spark.sql.execution.debug._ scala> val df = sql("SELECT sqrt(a) FROM test WHERE b = 1”) scala> df.explain() == Physical Plan == *(1) Project [SQRT(cast(_1#2 as double)) AS SQRT(CAST(a AS DOUBLE))#18] +- *(1) Filter (_2#3 = 1) +- LocalTableScan [_1#2, _2#3] scala> df.debugCodegen
32Copyright©2019 NTT corp. All Rights Reserved. Sparkが内部で生成したコードの抜粋
33Copyright©2019 NTT corp. All Rights Reserved. Performance Gains from Generated Code • Broadcast-Hash Joinで8.9Xの⾼速化 性能比較用のクエリ
34Copyright©2019 NTT corp. All Rights Reserved. Pitfall: Too Long Generated Code aa >>> df.groupBy().sum(‘c0’, ‘c1’, …).show() … INFO WholeStageCodegenExec: Found too long generated codes and JIT optimization might not work: the bytecode size (8094) is above the limit 8000, and the whole-stage codegen was disabled for this plan (id=2). To avoid this, you can raise the limit `spark.sql.codegen.hugeMethodLimit`: … • 出⼒ログ上で以下の⽂章を⾒かけたら要注意
35Copyright©2019 NTT corp. All Rights Reserved. Pitfall: Too Long Generated Code • OpenJDK HotSpot JVMが単⼀メソッドをJITコンパ イルするbytecodeの最⼤サイズが8000B • これを超えた場合はインタプリタモードで実⾏ • インタープリタモードに遷移した際にJVMの挙動が変 わり,性能が極端に悪化する場合があるので注意 • 特に集約関数(e.g., SUM/AVG)では,Sparkは全ての集約処 理を単⼀メソッドにinline化してコード⽣成 • KURTOSISなどの複雑な組み込み集約関数の場合,単体でこの 閾値を超えるので注意が必要 aa >>> do_agg(num_sum_functions=43) Elapsed: 14.30727505683899s >>> do_agg(44) Elapsed: 15.5922269821167s >>> do_agg(45) # bytecodeサイズが8094Bでインタープリタモードに遷移 Elapsed: 45.58869409561157s 使用したコードは https://bit.ly/2WxX0jM
36Copyright©2019 NTT corp. All Rights Reserved. Pitfall: Too Long Generated Code • 実はSparkがコード⽣成を諦めてPullベースの処理に 切り替える閾値が内部にあり,これを使うことで極端な 性能悪化を回避できる場合も • spark.sql.codegen.hugeMethodLimit aa # デフォルトでhugeMethodLimit=65536 >>> do_agg(45) Elapsed: 45.58869409561157s >>> sql('SET spark.sql.codegen.hugeMethodLimit=8000’) >>> do_agg(45) Elapsed: 16.42603611946106s SPARK-21870で集約関数が8000Bを超えにくいように メソッドを分割する修正を実施中
37Copyright©2019 NTT corp. All Rights Reserved. 3. RDD-BASED COMPUTING
38Copyright©2019 NTT corp. All Rights Reserved. • RDDの粗粒度の変換で計算を表現 • Executor間のデータ交換(shuffle)が必要のない連続する 変換を1つにまとめた単位がStage • 性能チューニングの基本は各Executorのpartition 数とサイズが均等になるように調整すること RDD-based Computing partition partition partition partition RDD1 RDD2 partition partition partition RDD3 partition partition partition RDD4 shuffle Stageの範囲
39Copyright©2019 NTT corp. All Rights Reserved. Spark Web UIs Sparkが起動後,ブラウザ上で http://127.0.0.1:4040 にアクセス
40Copyright©2019 NTT corp. All Rights Reserved. Spark Web UIs
41Copyright©2019 NTT corp. All Rights Reserved. Stage Stats in Spark Web UIs • まずは「Stages」タブで遅いStageを確認
42Copyright©2019 NTT corp. All Rights Reserved. Task Stats in Spark Web UIs Stageの入出力にSkewがないか? Taskが失敗してないか?
43Copyright©2019 NTT corp. All Rights Reserved. • 本⽇の発表概要 • 1. クエリプランとFileScanノードの最適化 • 2. コード⽣成概要と集約処理コードの肥⼤化による性能劣化 • 3. RDDの実⾏とSpark Web UIs • SparkのOSS開発は依然として活発で,性能に関する 傾向はリリース毎で変化することが多い Wrap-up アップグレードの際はクエリ結果が変わらないだけではなく, 性能が悪化していないことを確認することも大切

Taming Distributed/Parallel Query Execution Engine of Apache Spark

  • 1.
    Copyright©2019 NTT corp.All Rights Reserved. Taming Distributed/Parallel Query Execution Engine of Apache Spark Takeshi Yamamuro, NTT
  • 2.
    2Copyright©2019 NTT corp.All Rights Reserved. Who am I?
  • 3.
    3Copyright©2019 NTT corp.All Rights Reserved. Notice - https://bit.ly/30Sh4MU
  • 4.
    4Copyright©2019 NTT corp.All Rights Reserved. Notice - https://bit.ly/2I7Ymsj
  • 5.
    5Copyright©2019 NTT corp.All Rights Reserved. Apache Spark 2015 Year In Review, https://bit.ly/2JaG0He • AMPLab@UCBの成果で,2012年にOSSとして公開 された汎⽤的な分散・並列処理フレームワーク • 現在の最新がv2.4.3,今年の下期にv3.0がリリース予定 • 代表的な特徴はユーザが使いやすいAPI,外部データと の連携,内部での⾼度な最適化 Whatʼs Spark?
  • 6.
    6Copyright©2019 NTT corp.All Rights Reserved. • conda経由でSpark v2.4.3のインストール • テスト実⾏ Quick Start Guide of Spark aa$ conda install –c conda-forge pyspark aa $ cat test.csv 1,a,0.3 2,b,1.7 $ pyspark >>> df = spark.read.csv(‘test.csv’) >>> df.show()
  • 7.
    7Copyright©2019 NTT corp.All Rights Reserved. • 本⽇はSparkにおいて主要な以下の3つの要素に関する 概要と個⼈的に興味のある性能の話をします • 1. 遅延評価と関係代数ベースのクエリ最適化 • 2. コード⽣成による実⾏時のための最適化 • 3. RDDベースの分散・並列処理 Todayʼs Talk
  • 8.
    8Copyright©2019 NTT corp.All Rights Reserved. • Sparkの実⾏処理系の最新概要に関する発表 • Maryann Xue, Kris Mok, and Xingbo Jiang, A Deep Dive into Query Execution Engine of Spark SQL, https://bit.ly/2HLIbRk • Sparkの性能チューニングに関する発表 • Xiao Li, Understanding Query Plans and Spark UIs, https://bit.ly/2WiOm8x The Other Valuable References
  • 9.
    9Copyright©2019 NTT corp.All Rights Reserved. • 個⼈的によく聞くSparkの使⽤⽤途は・・・ • ⾃社のデータがHDFS/S3などの分散ストレージやRDBMSなど のリモート環境に蓄積されており,それらを集約して分析をす る必要がある場合 • ローカル環境にデータはあるがサイズが搭載メモリのものに近 いか,もしくは超えている場合 • 例えばpandasの処理可能なデータサイズは搭載メモリの1/5〜 1/10程度*と⾔われ,⽐較的⼩さい When You Use Spark? * Apache Arrow and the "10 Things I Hate About pandas”, https://bit.ly/2WaSLX8
  • 10.
    10Copyright©2019 NTT corp.All Rights Reserved. • PySpark/pandasの性能を単体ノード上で評価 • store_sales(TPC-DS)のサイズを変化させながら使⽤ • 評価⽤のハードウェア環境は32vcores/244GBメモリ • PySpark⽤に16vcores/10GBメモリを使⽤ Performance on a Single Machine Node 引用: Benchmarking Apache Spark on a Single Node Machine, https://bit.ly/2FcnbD0 Q1: SELECT MAX(ss_list_price) FROM store_sales Q2: SELECT COUNT(distinct ss_customer_sk) FROM store_sales Q3: SELECT SUM(ss_net_profit) FROM store_sales GROUP BY ss_store_sk Q1 Q2 Q3 Q2
  • 11.
    11Copyright©2019 NTT corp.All Rights Reserved. Spark Internal
  • 12.
    12Copyright©2019 NTT corp.All Rights Reserved. Spark Cluster Overview • ユーザが記述したクエリをDriver Programとして起 動し、Sparkの最⼩実⾏単位のTaskに分解,Worker Node上に起動されたExecutorに割り当てることで分 散・並列処理を実現 引用: http://spark.apache.org/docs/latest/cluster-overview.html
  • 13.
    13Copyright©2019 NTT corp.All Rights Reserved. • ⼊⼒クエリは最終的にRDD(Resilient Distributed Datasets)のDAGで表現され実⾏ • RDDは唯⼀のデータ操作のためのAPIだが,現在このAPIを直 接ユーザが使⽤することは⾮推奨 • 性能チューニングは,まずDriver側で最適化されたク エリを確認した後,Executor側の実⾏ログ(Skew有 無など)を確認する流れがお勧め From Declarative Queries to RDDs 1. クエリの最適化 2. コード生成 3. RDDによる実行 Driver側 Executor側
  • 14.
    14Copyright©2019 NTT corp.All Rights Reserved. 1. QUERY OPTIMIZATION
  • 15.
    15Copyright©2019 NTT corp.All Rights Reserved. • 定義したクエリはすぐに実⾏されず,結果が参照された 時にクエリを最適化して実⾏ • クエリが定義された段階で実施されるのは,論理プランの構築 とプランの正しさの検証のみ Lazy Evaluation and Query Planning aa >>> df = spark.read.load('/tmp/t') DataFrame[a: int, b: double, c: string] >>> df1 = df.groupBy(‘a’).avg(‘b’) >>> df2 = df1.where(‘a > 1’) >>> df2.count() 1 結果が参照されたため最適化され,Executor側で実行 Driver側での処理のみ
  • 16.
    16Copyright©2019 NTT corp.All Rights Reserved. • 定義したクエリはすぐに実⾏されず,結果が参照された 時にクエリを最適化して実⾏ • クエリが定義された段階で実施されるのは,論理プランの構築 とプランの正しさの検証のみ Lazy Evaluation and Query Planning aa >>> df = spark.read.load('/tmp/t') DataFrame[a: int, b: double, c: string] >>> df1 = df.groupBy(‘a’).avg(‘b’) >>> df2 = df1.where(‘a > 1’) >>> df2.count() 1 >>> df2.explain() == Physical Plan == *(2) HashAggregate(keys=[a#0], functions=[sum(b#1)]) +- Exchange hashpartitioning(a#0, 200) +- *(1) HashAggregate(keys=[a#0], functions=[partial_sum(b#1)]) +- *(1) Project [a#0, b#1] +- *(1) Filter (isnotnull(a#0) && (a#0 > 1)) +- *(1) FileScan parquet [a#0,b#1] Batched: true, Format: Parquet, … whereでの述語処理の順序が変更され最適化
  • 17.
    17Copyright©2019 NTT corp.All Rights Reserved. • Spark v2.4.3ではpandasと同様,遅延評価を⾏わず 定義した時に実⾏する動作に変更することも可能 • spark.sql.repl.eagerEval.enabled Eager Mode in PySpark aa >>> df = spark.read.load('/tmp/t') DataFrame[a: int, b: double, c: string] >>> df.groupBy(‘a’).avg(‘b’) DataFrame[a: int, avg(b): double] >>> sql(“SET spark.sql.repl.eagerEval.enabled=true”) >>> df.groupBy(‘a’).avg(‘b’)
  • 18.
    18Copyright©2019 NTT corp.All Rights Reserved. • クエリプランを確認してどのように実⾏されたかを正し く理解することは⾮常に重要 • Sparkに限らず,全てのRDBMS的な処理系において • 本⽇の話は時間の関係で,特に重要な⼊⼒部分の最適化 に関連する話題に絞って紹介 • FileScan Optimization • Filter Push Down + Implicit Type Casting • Nested Schema Pruning Understanding Query Plans https://bit.ly/2QFBAMl • クエリプランの理解を深めたい⽅は,右記 の本の16章「The Query Compiler」 などがオススメ
  • 19.
    19Copyright©2019 NTT corp.All Rights Reserved. • 物理プランのFileScanノードで重要なのは以下3項⽬ を活⽤して⼊⼒データ量を限定すること FileScan Optimization aa >>> spark.read.load('/tmp/t').explain() == Physical Plan == *(1) FileScan parquet [a#0,b#1,c#2] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/t], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:int,b:double,c:string>
  • 20.
    20Copyright©2019 NTT corp.All Rights Reserved. FileScan Optimization - PushedFilters aa >>> df2.explain() == Physical Plan == *(2) HashAggregate(keys=[a#0], functions=[sum(b#1)]) +- Exchange hashpartitioning(a#0, 200) +- *(1) HashAggregate(keys=[a#0], functions=[partial_sum(b#1)]) +- *(1) Project [a#0, b#1] +- *(1) Filter (isnotnull(a#0) && (a#0 > 1)) +- *(1) FileScan parquet [a#0,b#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/t], PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,1)], ReadSchema: struct<a:int,b:double> • Filterが隣接している場合,FileScanはその述語を使 ⽤して不要な⾏を読み込まなくても良い • 実際に読み⾶ばすかはData Sourceの実装依存で,Parquetの 場合は最適化を実施
  • 21.
    21Copyright©2019 NTT corp.All Rights Reserved. FileScan Optimization - ReadSchema aa >>> df = spark.read.load('/tmp/t') DataFrame[a: int, b: double, c: string] … >>> df2.explain() == Physical Plan == *(2) HashAggregate(keys=[a#0], functions=[sum(b#1)]) +- Exchange hashpartitioning(a#0, 200) +- *(1) HashAggregate(keys=[a#0], functions=[partial_sum(b#1)]) +- *(1) Project [a#0, b#1] +- *(1) Filter (isnotnull(a#0) && (a#0 > 1)) +- *(1) FileScan parquet [a#0,b#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/t], PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,1)], ReadSchema: struct<a:int,b:double> • クエリ内で参照される列がReadSchemaに反映され, FileScanは不要な列を読み込まなくても良い • Parquetの場合は最適化を実施
  • 22.
    22Copyright©2019 NTT corp.All Rights Reserved. • パーティション情報を⽤いて隣接するFilterの述語が処 理できる場合,条件を満たすデータのみを読み込む • 以下の例の様にパーティション情報を⽤いて全ての述語が処理 できる場合,物理プランからFilterも除去 FileScan Optimization - PartitionFilters >>> df = spark.range(100).selectExpr('id % 3 AS p', 'id') >>> df.write.partitionBy('p').save('/tmp/t') >>> df.show(5) only showing top 5 rows
  • 23.
    23Copyright©2019 NTT corp.All Rights Reserved. • パーティション情報を⽤いて隣接するFilterの述語が処 理できる場合,条件を満たすデータのみを読み込む • 以下の例の様にパーティション情報を⽤いて全ての述語が処理 できる場合,物理プランからFilterも除去 FileScan Optimization - PartitionFilters >>> spark.read.load('/tmp/t').where('p = 0').explain() == Physical Plan == *(1) FileScan parquet [id#0L,p#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/t], PartitionCount: 1, PartitionFilters: [isnotnull(p#1), (p#1 = 0)], PushedFilters: [], ReadSchema: struct<id:bigint> このディレクトリ以下のファイルのみを読み込み
  • 24.
    24Copyright©2019 NTT corp.All Rights Reserved. • Filter Push Down + Implicit Type Casting • 型の不⼀致でFileScanのPushedFiltersに反映されないケース があるため注意 The Other Optimization Tips aa >>> df = spark.read.load('/tmp/t') DataFrame[a: int, b: string] >>> df.where('a > 1').explain() == Physical Plan == *(1) Project [a#0, b#1] +- *(1) Filter (isnotnull(a#0) && (a#0 > 1)) +- *(1) FileScan parquet [a#0,b#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/t], PartitionFilters: [], PushedFilters: [IsNotNull(a), GreaterThan(a,1)], ReadSchema: struct<a:int,b:string> 反映されるケース
  • 25.
    25Copyright©2019 NTT corp.All Rights Reserved. • Filter Push Down + Implicit Type Casting • 型の不⼀致でFileScanのPushedFiltersに反映されないケース があるため注意 The Other Optimization Tips aa >>> df = spark.read.load('/tmp/t') DataFrame[a: int, b: string] >>> df.where('a > 1.0').explain() == Physical Plan == *(1) Project [a#0, b#1] +- *(1) Filter (isnotnull(a#0) && (cast(a#0 as bigint) > 1)) +- *(1) FileScan parquet [a#0,b#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/t], PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: struct<a:int,b:string> 反映されないケース1
  • 26.
    26Copyright©2019 NTT corp.All Rights Reserved. • Filter Push Down + Implicit Type Casting • 型の不⼀致でFileScanのPushedFiltersに反映されないケース があるため注意 The Other Optimization Tips aa >>> df = spark.read.load('/tmp/t') DataFrame[a: int, b: string] >>> df.where('b > 0').explain() == Physical Plan == *(1) Project [a#0, b#1] +- *(1) Filter (isnotnull(b#1) && (cast(b#1 as int) > 0)) +- *(1) FileScan parquet [a#0,b#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/t], PartitionFilters: [], PushedFilters: [IsNotNull(b)], ReadSchema: struct<a:int,b:string> 反映されないケース2
  • 27.
    27Copyright©2019 NTT corp.All Rights Reserved. • Nested Schema Pruning • spark.sql.optimizer.nestedSchemaPruning.enabled • v2.4.3ではデフォルトでOFF The Other Optimization Tips aa >>> df = spark.read.load('/tmp/t’) DataFrame[id: bigint, a: struct<c:bigint,d:bigint>] >>> df.select(‘a.c’).explain() == Physical Plan == *(1) Project [a#1.c AS c#10L] +- *(1) FileScan parquet [a#1] Batched: false, Format: Parquet, ReadSchema: struct<a:struct<c:bigint,d:bigint>> # Turns on nested schema pruning >>> sql(“SET spark.sql.optimizer.nestedSchemaPruning.enabled=true”) >>> df.select(‘a.c’).explain() == Physical Plan == *(1) Project [a#1.c AS c#13L] +- *(1) FileScan parquet [a#1] Batched: false, Format: Parquet, ReadSchema: struct<a:struct<c:bigint>>
  • 28.
    28Copyright©2019 NTT corp.All Rights Reserved. 2. CODE GENERATION
  • 29.
    29Copyright©2019 NTT corp.All Rights Reserved. • 従来のPullベースの処理⽅式はオーバヘッドが⾼く, 計算負荷の⾼いワークロードで⾮効率に • 主要な原因の1つはO(⼊⼒⾏数)回発⽣する物理プラン実装の 仮想関数nextの呼び出しによるオーバヘッド Why Spark Generates Code from Plans? 物理プランのFilterノードの実装例 next next next
  • 30.
    30Copyright©2019 NTT corp.All Rights Reserved. • 物理プランから処理が等価な逐次コードを⽣成 • 仮想関数nextの呼び出しの除去 • 中間データがレジスタに残りやすい処理フロー • データ並列化やループ展開などのJITコンパイラ最適化の享受 Why Spark Generates Code from Plans? コード生成
  • 31.
    31Copyright©2019 NTT corp.All Rights Reserved. aa Dumping Generated Code • Sparkの⽣成したコードを確認する⽅法 • EXPLAIN結果の先頭にʼ*ʼが付いているノードは内部でコード ⽣成による最適化が適⽤されていることを意味 scala> import org.apache.spark.sql.execution.debug._ scala> val df = sql("SELECT sqrt(a) FROM test WHERE b = 1”) scala> df.explain() == Physical Plan == *(1) Project [SQRT(cast(_1#2 as double)) AS SQRT(CAST(a AS DOUBLE))#18] +- *(1) Filter (_2#3 = 1) +- LocalTableScan [_1#2, _2#3] scala> df.debugCodegen
  • 32.
    32Copyright©2019 NTT corp.All Rights Reserved. Sparkが内部で生成したコードの抜粋
  • 33.
    33Copyright©2019 NTT corp.All Rights Reserved. Performance Gains from Generated Code • Broadcast-Hash Joinで8.9Xの⾼速化 性能比較用のクエリ
  • 34.
    34Copyright©2019 NTT corp.All Rights Reserved. Pitfall: Too Long Generated Code aa >>> df.groupBy().sum(‘c0’, ‘c1’, …).show() … INFO WholeStageCodegenExec: Found too long generated codes and JIT optimization might not work: the bytecode size (8094) is above the limit 8000, and the whole-stage codegen was disabled for this plan (id=2). To avoid this, you can raise the limit `spark.sql.codegen.hugeMethodLimit`: … • 出⼒ログ上で以下の⽂章を⾒かけたら要注意
  • 35.
    35Copyright©2019 NTT corp.All Rights Reserved. Pitfall: Too Long Generated Code • OpenJDK HotSpot JVMが単⼀メソッドをJITコンパ イルするbytecodeの最⼤サイズが8000B • これを超えた場合はインタプリタモードで実⾏ • インタープリタモードに遷移した際にJVMの挙動が変 わり,性能が極端に悪化する場合があるので注意 • 特に集約関数(e.g., SUM/AVG)では,Sparkは全ての集約処 理を単⼀メソッドにinline化してコード⽣成 • KURTOSISなどの複雑な組み込み集約関数の場合,単体でこの 閾値を超えるので注意が必要 aa >>> do_agg(num_sum_functions=43) Elapsed: 14.30727505683899s >>> do_agg(44) Elapsed: 15.5922269821167s >>> do_agg(45) # bytecodeサイズが8094Bでインタープリタモードに遷移 Elapsed: 45.58869409561157s 使用したコードは https://bit.ly/2WxX0jM
  • 36.
    36Copyright©2019 NTT corp.All Rights Reserved. Pitfall: Too Long Generated Code • 実はSparkがコード⽣成を諦めてPullベースの処理に 切り替える閾値が内部にあり,これを使うことで極端な 性能悪化を回避できる場合も • spark.sql.codegen.hugeMethodLimit aa # デフォルトでhugeMethodLimit=65536 >>> do_agg(45) Elapsed: 45.58869409561157s >>> sql('SET spark.sql.codegen.hugeMethodLimit=8000’) >>> do_agg(45) Elapsed: 16.42603611946106s SPARK-21870で集約関数が8000Bを超えにくいように メソッドを分割する修正を実施中
  • 37.
    37Copyright©2019 NTT corp.All Rights Reserved. 3. RDD-BASED COMPUTING
  • 38.
    38Copyright©2019 NTT corp.All Rights Reserved. • RDDの粗粒度の変換で計算を表現 • Executor間のデータ交換(shuffle)が必要のない連続する 変換を1つにまとめた単位がStage • 性能チューニングの基本は各Executorのpartition 数とサイズが均等になるように調整すること RDD-based Computing partition partition partition partition RDD1 RDD2 partition partition partition RDD3 partition partition partition RDD4 shuffle Stageの範囲
  • 39.
    39Copyright©2019 NTT corp.All Rights Reserved. Spark Web UIs Sparkが起動後,ブラウザ上で http://127.0.0.1:4040 にアクセス
  • 40.
    40Copyright©2019 NTT corp.All Rights Reserved. Spark Web UIs
  • 41.
    41Copyright©2019 NTT corp.All Rights Reserved. Stage Stats in Spark Web UIs • まずは「Stages」タブで遅いStageを確認
  • 42.
    42Copyright©2019 NTT corp.All Rights Reserved. Task Stats in Spark Web UIs Stageの入出力にSkewがないか? Taskが失敗してないか?
  • 43.
    43Copyright©2019 NTT corp.All Rights Reserved. • 本⽇の発表概要 • 1. クエリプランとFileScanノードの最適化 • 2. コード⽣成概要と集約処理コードの肥⼤化による性能劣化 • 3. RDDの実⾏とSpark Web UIs • SparkのOSS開発は依然として活発で,性能に関する 傾向はリリース毎で変化することが多い Wrap-up アップグレードの際はクエリ結果が変わらないだけではなく, 性能が悪化していないことを確認することも大切