WIFI SSID:Spark+AISummit | Password: UnifiedDataAnalytics
Hongchan Roh, Dooyoung Hwang, SK Telecom Spark AI Usecase in Telco: Network Quality Analysis and Prediction with Geospatial Visualization #UnifiedDataAnalytics #SparkAISummit
Network Quality Visualization Demo 3 youtube demo video link: https://youtu.be/HpDkF3CxEow ▸ Demo shows - Visualize RF quality of cell towers - Height : connection count - Colors : RF quality ▸ Data source - 300,000 cell tower network device logs ▸ Resources - 5 CPU nodes with Intel Xeon Gold 6240 Good Bad This doesn’t exactly reflect real network quality, we generated synthetic data from real one
Contents • Network Quality Analysis • Geospatial Visualization • Network Quality Prediction • Wrap up 4
Network Quality Analysis 5 SK Telecom : The largest telecommunications provider in South Korea • 27 million subscribers • 300,000 cells Target Data: Radio cell tower logs • time-series data with timestamp tags generated every 10 seconds • geospatial data with geographical coordinates (latitude and longitude) translated by cell tower’s location
Network Quality Analysis 6 Data Ingestion Requirements • Ingestion 1.4 million records / sec, (500 byte of records, 200~500 columns) • 120 billion records / day, 60 TB / day • Data retention period: 7 ~ 30 days Query Requirements • web dashboard and ad-hoc queries: response within 3 sec for a specific time and region (cell) predicates for multi sessions • daily batch queries: response within hours for long query pipelines having heavy operations such as joins and aggregations
Problems of legacy architecture 7 Legacy Architecture: Spark with HDFS (2014~2015) • Tried to make partitions reflecting common query predicates (time, region) • Used SSD as a block cache to accelerate I/O performance • Hadoop namenode often crashed for millions of partitions • Both ingestion and query performance could not satisfy the requirements L i n u x O S C P U / D R A M S t o r a g e ( H D D ) … H D F S S p a r k S S D C a c h e Legacy Architecture
New In-memory Datastore for Spark (FlashBase) 8 • Tried to design a new data store for Spark (FlashBase) to support much more partitions L i n u x O S C P U / D R A M S t o r a g e ( H D D ) … H D F S S p a r k S S D C a c h e Legacy Architecture SQL Queries (Web, Jupyter) Spark-SQL Data Loading Data Loader Data Source APIs JDBCFile, HTTP, Kafka forked. DRAM Store customized. Flash Store tiering New Architecture • for query engine, for DRAM key-value store and for SSD key-value store • SSDs as main storage devices for small-sized parallel I/O with short latencies? If we assemble these with some glue logics, can it be ? Best open source candidates to assemble New in-memory (dram/SSD) datastore for Spark (2016)
9 Initial avengers didn’t get along
Problem 1 - Redis Space Amplification Problem 10 Redis occupied DRAM Space 4 times of input data • At least 72 bytes for data structures • 24B for dictEntry • 24B for key redisObject and 24B for value redicObject • 85 bytes for 12 byte key and 1 byte column value FlashBase reduced DRAM usage to 1/4 of original Redis • custom data structure called column value array • store data in column-wise • gather column values in different rows (c++ vector style) • data layout is similar to Apache arrow DRAM is still expensive!
Problem 2 - Rocksdb Write Amplification Problem 11 Rocksdb wrote 40 times of input data to SSDs • Rocksdb consists of multi-level indexes with sorted key values in each level • Key values are migrated from top level to next level by compaction algorithm • Key values are written multiple times (# of levels) with larger update regions (a factor of level multiplier) FlashBase reduced writes to 1/10 of original Rocksdb • Customized rocksdb for compaction job to avoid next level update at most • 5 times better ingestion performance, and 1/10 TCO for SSD replacement The more writes, the faster SSD fault! • SSDs have limitation on the number of drive writes (DWPD) • SSD fault causes service down time and more TCO for replacing SSDs
Query Acceleration 1 - Extreme Partitioning 12 A partition combination for network quality analysis • 300K (cell towers) X 100K (time slots) = 30B (total partitions) Time partitions Cell tower partitions wvcv3 wvcyw wvfj6 … wyfb1 wyf9w 201804221100 201804221105 201804221110 … 201904221105 201904221110 Hadoop File System Node spec: E5 2680 v4 CPU, 256GB DRAM, 20TB SSDs • Up-to 1 billion partitions in a single cluster • 300GB DRAM for 1 billion partitions (150 byte DRAM for a file or a block) FlashBase • Up-to 2 billion partitions in a single node • Needs 15 nodes to store 30 billion partitions
Query Acceleration 2 - Filter Push Down 13 • Custom relation definition to register redis as Spark’s datasource by using data source API v1 (Redis/Rocksdb Relation -> R2 Relation) • Filter / Projection pushdown to Redis/Rocksdb Store by using prunedScan and prunedFilteredScan package com.skt.spark.r2 case class R2Relation ( identifier: String, schema: StructType )(@transient val sqlContext: SQLContext) extends BaseRelation with RedisUtils with Configurable with TableScan with PrunedScan with PrunedFilteredScan with InsertableRelation with Logging { def buildTable(): RedisTable override def buildScan(requiredColumns: Array[String]): RDD[Row] def insert( rdd: RDD[Row] ): Unit }
Query Acceleration 2 - Filter Push Down 14 Partition filtering • Each redis process filters only satisfying partitions by using push downed filter predicates from Spark • prunedScan is only requested to the satisfying partitions Data filtering in pruned scan • Each pruned scan command examines the actual column values are stored in the requested partition • Only the column values satisfying the push downed filter predicates are returned Spark Data Source Filter pushdown • And, Or, Not, Like, Limit • EQ, GT, GTE, LT, LTE, IN, IsNULL, IsNotNULL, EqualNullSafe,
Network Quality Analysis Example 15 Spark with FlashBase Less than 1 sec FlashBase Cluster: 16 nodes (E5 2680 v4 CPU, 256GB DRAM, 20TB SSDs) select * from ue_rf_sum where event_time between '201910070000' and '201910080000' and cell_tower_id = 'snjJlAF5W' and rsrp < -85; Half an hour Spark with HDFS Partition filtering 1/10080 with time Partition filtering 1/(10080 * 30000) with time and cell tower 1user_equipment_radio_frequency_summary table HDFS Cluster: 20 nodes (E5 2650 v4 CPU, 256GB DRAM, 24TB HDDs) Network quality analysis query for one day and a single cell tower • 0.142 trillion (142 billion) records in ue_rf_sum1 table (7 day data, 42TB) • 14,829 satisfying records
Ingestion Performance and other features 16 Node spec: E5 2680 v4 CPU, 256GB DRAM, 20TB SSDs Features Details Ingestion performance 500,000 records/sec/node In-memory datastore DRAM only, DRAM to SSD Tiering Massively Parallel Processing 100 redis processes per a single node Extreme partitioning Up-to 2 billion partitions for a single node Filter acceleration Using fine-grained partitions and push downed filters Column-store Column-store by default (row-store option) Column value transformation Defined by java grammar in schema tbl properties Compression Gzip level compression ratio w/ LZ4 level speed Vector processing Filter and aggr. acceleration (SIMD, AVX) ETC Recovery, replication, scale-out
17 Seven Production Hells
Network OSS Deploy – Web Dashboard (2017) 18 • Deployed to Network Operating System in 2017 • Web Dashboards queries with time and region predicates • Wired/Wireless Quality Analysis • Mobile device quality analysis : Display mobile device quality data per hierarchy Mobile Device Quality
Network OSS Deploy – Batch Queries (2017) 19 • Batch queries with jupyter via R hive libraries • Analysis for coverage hole risk for each cell
Contents • Network Quality Analysis • Geospatial Visualization • Network Quality Prediction • Wrap up 20
Why Geospatial Visualization? 21 • Geospatial Analysis - Gathers, manipulates and displays geographic information system (GIS) data - Requires heavy aggregate computations → Good case to demonstrate real-time big data processing - Some companies demonstrated geospatial analysis to show advantages of GPU database over CPU database → We have tried it with Spark & FlashBase based on CPU
Data Loading Data Loader Data source API File, HTTP, Kafka forked. DRAM Store customized. Flash Store tiering Architecture of Geospatial Visualization 22 Map Rendering HTTP API Vector Tile Spark Job Geospatial objectScan Cmd. • Front-end : MapBoxJS - MapBox uses VectorTile to render on overlay layers. - Get VectorTile via HTTP API
Data Loading Data Loader Data source API File, HTTP, Kafka forked. DRAM Store customized. Flash Store tiering Architecture of Geospatial Visualization 23 Map Rendering HTTP API Vector Tile Spark Job • Front-end : MapBoxJS - MapBox uses VectorTile to render on overlay layers. - Get VectorTile via HTTP API • Back-end Web server - Build VectorTiles with Spark Job. - Apache LIVY : manipulate multiple Spark Contexts simultaneously Geospatial objectScan Cmd. Spark Job
Data Loading Data Loader Data source API File, HTTP, Kafka forked. DRAM Store customized. Flash Store tiering Architecture of Geospatial Visualization 24 Map Rendering HTTP API Vector Tile Spark Job • Front-end : MapBoxJS - MapBox uses VectorTile to render on overlay layers. - Get VectorTile via HTTP API • Back-end Web server - Build VectorTiles with Spark Job. - Apache LIVY : manipulate multiple Spark Contexts simultaneously • Spark Cluster & Geo-Spark - Geo-Spark : support Geospatial UDFs and PredicatesGeospatial objectScan Cmd. Spark Job Data source API
Data Loading Data Loader Data source API File, HTTP, Kafka forked. DRAM Store customized. Flash Store tiering Architecture of Geospatial Visualization 25 Map Rendering HTTP API Vector Tile Spark Job • Front-end : MapBoxJS - MapBox uses VectorTile to render on overlay layers. - Get VectorTile via HTTP API • Back-end Web server - Build VectorTiles with Spark Job. - Apache LIVY : manipulate multiple Spark Contexts simultaneously • Spark Cluster & Geo-Spark - Geo-Spark : support Geospatial UDFs and Predicates • FlashBase - FlashBase stores objects with latitude and longitude. Each data is partitioned by its GeoHash Geospatial objectScan Cmd. Spark Job Data source API forked. DRAM Store customized. Flash Store tiering Data Loader
Data Loading Data Loader Data source API File, HTTP, Kafka forked. DRAM Store customized. Flash Store tiering Architecture of Geospatial Visualization 26 Map Rendering HTTP API Vector Tile Spark Job Geospatial objectScan Cmd. Spark Job Data source API forked. DRAM Store customized. Flash Store tiering Data Loader Problem -Latency issue of HTTP API : Sluggish map loading ! -Building VectorTile needs heavy computation & shuffling for aggregation. 256 X 256 pix. ▸ If web-client shows 20 Tiles, → 1.3 million (256 x 256 x 20) AGG operations are required. → Cause heavy computations & shuffle writing in Spark. ▸ If user scroll map, → all tiles should be re-calculated. Building VectorTile requires AGG GROUP BY pixel.
Data Loading Data Loader Data source API File, HTTP, Kafka forked. DRAM Store customized. Flash Store tiering Architecture of Geospatial Visualization 27 Map Rendering HTTP API Vector Tile Spark Job Scan Cmd. Spark Job Data source API forked. DRAM Store customized. Flash Store tiering Data Loader • Optimization of performance 1.Spark pushdowns aggregation to FlashBase FlashBase sends aggregated results to Spark → Reduce Shuffle writing size and computation of Spark to 1/10. 2. FlashBase accelerates aggregation with vector-processing via Intel’s AVX-512 (Intel Math Kernel Library) → 2 x faster aggregation. → 20 times faster than original GeoSpark (1)Pushdown aggregation (3)Aggregated results (2)Accelerate aggregation via AVX-512
Optimization Detail 28 The Query building features of VectorTile SELECT * FROM pcell WHERE ST_VectorTileAggr('7,109,49’, ‘AVG') 1. ST_VectorTileAggr(arg1, arg2) - Custom predicate which contains aggregation information. - arg1 : zoom level of map & tile pos (x, y) in Globe - arg2 : aggregation type (SUM or AVG) 2. Define & Apply a custom optimization rule - Applied during optimization phase of query plan. - Parse aggregation information from predicate and pushdown it to FlashBase 3. Aggregation in FlashBase - Parallelized computation by FlashBase process count (Generally 100 ~ 200 process / node) - Each process of FlashBase accelerates aggregation using Intel MKL. 256 X 256 pix.
Contents • Network Quality Analysis • Geospatial Visualization • Network Quality Prediction • Wrap up 29
Introduction of Network Quality Prediction 30 • Predict Network Quality Indicators (CQI, RSRP, RSRQ, SINR, …) for anomaly detection and real-time management • Goal : Unify Geospatial visualization & Network Prediction On Spark * CQI : Channel Quality Indicator * RSRP : Reference Signal Received Power * RSRQ : Reference Signal Received Quality * SINR :Signal to Interference Noise Ratio *
We focused on 31 1. Improving deep learning model for forecasting time series data 2. Improving architecture and data pipeline for training and inference
Model for Network Quality Prediction - RNN 32 • RNN type model(Seq2Seq) is common solution for time-series prediction. But not suitable for our network quality prediction. Seq2Seq Actual Forecast Error: MAE Score: Error*100 Cannot predict sudden change!
Memory augmented model 33 memory1 memory2 memory7 current ▪ ▪ ▪ ▪ Attention layer memory3 ▪ ▪ ▪ ▪ ▪ ▪ ▪ ▪ Encoder1 Encoder2▪ ▪ ▪ ▪Encoder1 Encoder1 Encoder1 1-week data Concat FCNN !𝑦#$% Final prediction 1 32 4 5 Current Recent 50 min data with 5 min period Memory Previous 7 days historical data each of which has same time band with current and target. Target Network quality after 5 min • Encoder : 1-NN (Autoregressive term) Encoder1 : ℎ# = 𝑐 + 𝑤% 𝑦# +,-./0+% + … + 𝑤%% 𝑦#+, -./0+%% Encoder2 : ℎ# 2 = 𝑐2 + 𝑤% 2 𝑦#+% + … + 𝑤′%4 𝑦#+%4 1 2 3 4 5
Memory augmented model 34 memory1 memory2 memory7 current ▪ ▪ ▪ ▪ Attention layer memory3 ▪ ▪ ▪ ▪ ▪ ▪ ▪ ▪ Encoder1 Encoder2▪ ▪ ▪ ▪Encoder1 Encoder1 Encoder1 1-week data Concat FCNN !𝑦#$% Final prediction 𝑚# = memory for step 𝑡 𝑐 = current state Attention Layer (1 layer neural-network ) 𝑠𝑐𝑜𝑟𝑒#= 𝑣< tanh(𝑊.[𝑚#; 𝑐]) (𝑣, 𝑊.: weight parameters) 𝛼#= 𝑠𝑜𝑓𝑡𝑚𝑎𝑥(𝑠𝑐𝑜𝑟𝑒#) Attention Vector Attention weighted summation of 𝑚# Fully connected neural-network 1 1 2 3 4 5 2 3 4 5
Memory augmented model - Test result 35 Actual Forecast Error: MAE Score: Error*100 Mem- model Improved predictions for sudden change!
Training & Inference Architecture - Legacy 36 SQL Queries (Web, Jupyter) Spark-SQL Data Loading Data Loader Data Source APIs JDBCFile, HTTP, Kafka forked. DRAM Store customized. Flash Store tiering export Preprocessing Training & Inferencing 1. Export data to CSV from Spark ThriftServer using Hive Client 2. Preprocessing with pandas. 3. Train or infer with TensorFlow CPU
Training & Inference Architecture - Legacy 37 SQL Queries (Web, Jupyter) Spark-SQL Data Loading Data Loader Data Source APIs JDBCFile, HTTP, Kafka forked. DRAM Store customized. Flash Store tiering export Preprocessing Training & Inferencing Problem 1. No in-memory Pipeline between data source and Deep-Learning layer 2. Pre-processing & Inference & Training are performed in single server.
Training & Inference Architecture - New 38 Spark-SQL Data Loading Data Loader Data Source APIs File, HTTP, Kafka forked. DRAM Store customized. Flash Store tiering Build In-memory Pipeline between FlashBase and Intel Analytics ZOO Data Layer And Inferencing & Training Layer are integrated into the same Spark Cluster Also share the same Spark session. Source Code : https://github.com/mnms/ARMemNet-BigDL Intel Analytics Zoo : Used to unify TF model into Spark Pipeline seamlessly. Intel BigDL : inference & training engine The processing of Inferencing & training can be distributed in Spark Cluster. Preprocess RDD of Tensor Model Code of TF DL Training & Inferencing Data Model Spark Cluster 1 1 3 2 3 2 SIMD Acceleration
Comparison between two architectures 39 • Now only inference result : Also has a plan to run the distributed training later • Test workload - 7,722,912 rows = 80,447 cell towers X 8 days X 12 rows (1 hour data with 5 minutes period) - 8 network indicators per row → Input tensor (80,447 X 696) = current input (80,447 X 10 X 8) + Memory input (80,447 X 77 X 8) Pandas + TF Spark + Analytics Zoo Data Export 2.3s N/A Pre-processing 71.96s local 2.56s 3 node Yarn 1.43s Deep Learning Inference 1.06s (CPU) / 0.63s (GPU) 0.68s 0.18s Performed in a single node - Data and computations are distributed in 50 partitions. - Preprocessing and inference are executed in a single Spark job 45 x faster ※ CPU : Intel(R) Xeon(R) Gold 6240 CPU @ 2.60GHz ※ GPU : Nvidia-K80
Comparison between TF CPU and Analytics Zoo 40 • Compare Inference Time • Environment • CPU : Intel(R) Xeon(R) Gold 6240 CPU @ 2.60GHz • Core : 36 Batch Size Elapsed Time (TF-CPU) Elapsed Time (Analytics Zoo Scala on Spark) 32 14.28068566 2.58333056 64 8.387897015 1.446166912 128 4.871953249 0.679720256 256 2.947942972 0.426830048 512 2.030963659 0.400032064 1024 2.012846947 0.395362112 2048 1.44268322 0.430505056 3~5 times faster
Appx. Memory Problem of Spark Driver 41 • collect() function of DataSet sometimes throws OOM while decompressing and deserializing result. → Job Fails and Spark Driver is killed. • Spark supports ‘spark.driver.maxResultSize’ config for this issue - it just reflects a compressed size - Actual result size would be 5x ~ 20x of compressed size. - It is difficult to adjust the config to protect driver from OOM. Result Stage Driver Executors Array of Compressed binaries Compressed result binaries decompress & deserialize Result Array[T] OutOfMemoryException!
Appx. Memory Problem of Spark Driver - Solution 42 • Define collectAsSeqView function in DataSet - Define SeqView which just holds compressed results and decompressing operations - Driver decompresses and deserializes according to each fetching. - Decompressed & deserialized results are collected as garbage after cursor moves to next. - Only compressed binary reside in memory : memory of job can be limited by ‘spark.driver.maxResultSize’ → Completely protect the driver from OOM while collecting results • Define new function in DataSet which returns ‘SeqView’ of result. Result Stage Driver Executors Array of Compressed binaries Compressed result binaries Create SeqView Add Operation of decompressing and deserializing to View Return SeqView[T]
Appx. Memory Problem of Spark Driver - Patch 43 • collectAsSeqView function only uses 10% ~ 20% memory compared to collect function. • Create Spark Pull Request which applies this to thrift server. - PR : SPARK-25224 (https://github.com/apache/spark/pull/22219) - Review in progress - Create Spark Pull Request which applies this to thrift serve
Contents • Network Quality Analysis • Geospatial Visualization • Network Quality Prediction • Wrap up 44
Open Discussion 45 • More partitioning or indexing with less partitions • Spark datasource v2 and aggregation pushdown • Possible new directions of FlashBase for Spark ecosystem • Efficient end to end data pipeline for big data based inference and training
How to use Spark with FlashBase 46 Free binary can be used (Not open sourced yet) • Public Cloud: AWS Marketplace AMI (~19.12.31), Cloud Formation (~20.3.31) • On-premise: github page (~20.3) First contact to us if you want to try FlashBase and get some help • e-mail: flashbase@sktelecom.com • homepage (temporary): https://flashbasedb.github.io
47 Q & A Visit Intel & SKT Demo booth
DON’T FORGET TO RATE AND REVIEW THE SESSIONS SEARCH SPARK + AI SUMMIT

Apache Spark AI Use Case in Telco: Network Quality Analysis and Prediction with Geospatial Visualization

  • 1.
    WIFI SSID:Spark+AISummit |Password: UnifiedDataAnalytics
  • 2.
    Hongchan Roh, DooyoungHwang, SK Telecom Spark AI Usecase in Telco: Network Quality Analysis and Prediction with Geospatial Visualization #UnifiedDataAnalytics #SparkAISummit
  • 3.
    Network Quality VisualizationDemo 3 youtube demo video link: https://youtu.be/HpDkF3CxEow ▸ Demo shows - Visualize RF quality of cell towers - Height : connection count - Colors : RF quality ▸ Data source - 300,000 cell tower network device logs ▸ Resources - 5 CPU nodes with Intel Xeon Gold 6240 Good Bad This doesn’t exactly reflect real network quality, we generated synthetic data from real one
  • 4.
    Contents • Network QualityAnalysis • Geospatial Visualization • Network Quality Prediction • Wrap up 4
  • 5.
    Network Quality Analysis 5 SKTelecom : The largest telecommunications provider in South Korea • 27 million subscribers • 300,000 cells Target Data: Radio cell tower logs • time-series data with timestamp tags generated every 10 seconds • geospatial data with geographical coordinates (latitude and longitude) translated by cell tower’s location
  • 6.
    Network Quality Analysis 6 DataIngestion Requirements • Ingestion 1.4 million records / sec, (500 byte of records, 200~500 columns) • 120 billion records / day, 60 TB / day • Data retention period: 7 ~ 30 days Query Requirements • web dashboard and ad-hoc queries: response within 3 sec for a specific time and region (cell) predicates for multi sessions • daily batch queries: response within hours for long query pipelines having heavy operations such as joins and aggregations
  • 7.
    Problems of legacyarchitecture 7 Legacy Architecture: Spark with HDFS (2014~2015) • Tried to make partitions reflecting common query predicates (time, region) • Used SSD as a block cache to accelerate I/O performance • Hadoop namenode often crashed for millions of partitions • Both ingestion and query performance could not satisfy the requirements L i n u x O S C P U / D R A M S t o r a g e ( H D D ) … H D F S S p a r k S S D C a c h e Legacy Architecture
  • 8.
    New In-memory Datastorefor Spark (FlashBase) 8 • Tried to design a new data store for Spark (FlashBase) to support much more partitions L i n u x O S C P U / D R A M S t o r a g e ( H D D ) … H D F S S p a r k S S D C a c h e Legacy Architecture SQL Queries (Web, Jupyter) Spark-SQL Data Loading Data Loader Data Source APIs JDBCFile, HTTP, Kafka forked. DRAM Store customized. Flash Store tiering New Architecture • for query engine, for DRAM key-value store and for SSD key-value store • SSDs as main storage devices for small-sized parallel I/O with short latencies? If we assemble these with some glue logics, can it be ? Best open source candidates to assemble New in-memory (dram/SSD) datastore for Spark (2016)
  • 9.
  • 10.
    Problem 1 -Redis Space Amplification Problem 10 Redis occupied DRAM Space 4 times of input data • At least 72 bytes for data structures • 24B for dictEntry • 24B for key redisObject and 24B for value redicObject • 85 bytes for 12 byte key and 1 byte column value FlashBase reduced DRAM usage to 1/4 of original Redis • custom data structure called column value array • store data in column-wise • gather column values in different rows (c++ vector style) • data layout is similar to Apache arrow DRAM is still expensive!
  • 11.
    Problem 2 -Rocksdb Write Amplification Problem 11 Rocksdb wrote 40 times of input data to SSDs • Rocksdb consists of multi-level indexes with sorted key values in each level • Key values are migrated from top level to next level by compaction algorithm • Key values are written multiple times (# of levels) with larger update regions (a factor of level multiplier) FlashBase reduced writes to 1/10 of original Rocksdb • Customized rocksdb for compaction job to avoid next level update at most • 5 times better ingestion performance, and 1/10 TCO for SSD replacement The more writes, the faster SSD fault! • SSDs have limitation on the number of drive writes (DWPD) • SSD fault causes service down time and more TCO for replacing SSDs
  • 12.
    Query Acceleration 1- Extreme Partitioning 12 A partition combination for network quality analysis • 300K (cell towers) X 100K (time slots) = 30B (total partitions) Time partitions Cell tower partitions wvcv3 wvcyw wvfj6 … wyfb1 wyf9w 201804221100 201804221105 201804221110 … 201904221105 201904221110 Hadoop File System Node spec: E5 2680 v4 CPU, 256GB DRAM, 20TB SSDs • Up-to 1 billion partitions in a single cluster • 300GB DRAM for 1 billion partitions (150 byte DRAM for a file or a block) FlashBase • Up-to 2 billion partitions in a single node • Needs 15 nodes to store 30 billion partitions
  • 13.
    Query Acceleration 2- Filter Push Down 13 • Custom relation definition to register redis as Spark’s datasource by using data source API v1 (Redis/Rocksdb Relation -> R2 Relation) • Filter / Projection pushdown to Redis/Rocksdb Store by using prunedScan and prunedFilteredScan package com.skt.spark.r2 case class R2Relation ( identifier: String, schema: StructType )(@transient val sqlContext: SQLContext) extends BaseRelation with RedisUtils with Configurable with TableScan with PrunedScan with PrunedFilteredScan with InsertableRelation with Logging { def buildTable(): RedisTable override def buildScan(requiredColumns: Array[String]): RDD[Row] def insert( rdd: RDD[Row] ): Unit }
  • 14.
    Query Acceleration 2- Filter Push Down 14 Partition filtering • Each redis process filters only satisfying partitions by using push downed filter predicates from Spark • prunedScan is only requested to the satisfying partitions Data filtering in pruned scan • Each pruned scan command examines the actual column values are stored in the requested partition • Only the column values satisfying the push downed filter predicates are returned Spark Data Source Filter pushdown • And, Or, Not, Like, Limit • EQ, GT, GTE, LT, LTE, IN, IsNULL, IsNotNULL, EqualNullSafe,
  • 15.
    Network Quality AnalysisExample 15 Spark with FlashBase Less than 1 sec FlashBase Cluster: 16 nodes (E5 2680 v4 CPU, 256GB DRAM, 20TB SSDs) select * from ue_rf_sum where event_time between '201910070000' and '201910080000' and cell_tower_id = 'snjJlAF5W' and rsrp < -85; Half an hour Spark with HDFS Partition filtering 1/10080 with time Partition filtering 1/(10080 * 30000) with time and cell tower 1user_equipment_radio_frequency_summary table HDFS Cluster: 20 nodes (E5 2650 v4 CPU, 256GB DRAM, 24TB HDDs) Network quality analysis query for one day and a single cell tower • 0.142 trillion (142 billion) records in ue_rf_sum1 table (7 day data, 42TB) • 14,829 satisfying records
  • 16.
    Ingestion Performance andother features 16 Node spec: E5 2680 v4 CPU, 256GB DRAM, 20TB SSDs Features Details Ingestion performance 500,000 records/sec/node In-memory datastore DRAM only, DRAM to SSD Tiering Massively Parallel Processing 100 redis processes per a single node Extreme partitioning Up-to 2 billion partitions for a single node Filter acceleration Using fine-grained partitions and push downed filters Column-store Column-store by default (row-store option) Column value transformation Defined by java grammar in schema tbl properties Compression Gzip level compression ratio w/ LZ4 level speed Vector processing Filter and aggr. acceleration (SIMD, AVX) ETC Recovery, replication, scale-out
  • 17.
  • 18.
    Network OSS Deploy– Web Dashboard (2017) 18 • Deployed to Network Operating System in 2017 • Web Dashboards queries with time and region predicates • Wired/Wireless Quality Analysis • Mobile device quality analysis : Display mobile device quality data per hierarchy Mobile Device Quality
  • 19.
    Network OSS Deploy– Batch Queries (2017) 19 • Batch queries with jupyter via R hive libraries • Analysis for coverage hole risk for each cell
  • 20.
    Contents • Network QualityAnalysis • Geospatial Visualization • Network Quality Prediction • Wrap up 20
  • 21.
    Why Geospatial Visualization? 21 •Geospatial Analysis - Gathers, manipulates and displays geographic information system (GIS) data - Requires heavy aggregate computations → Good case to demonstrate real-time big data processing - Some companies demonstrated geospatial analysis to show advantages of GPU database over CPU database → We have tried it with Spark & FlashBase based on CPU
  • 22.
    Data Loading Data Loader Data sourceAPI File, HTTP, Kafka forked. DRAM Store customized. Flash Store tiering Architecture of Geospatial Visualization 22 Map Rendering HTTP API Vector Tile Spark Job Geospatial objectScan Cmd. • Front-end : MapBoxJS - MapBox uses VectorTile to render on overlay layers. - Get VectorTile via HTTP API
  • 23.
    Data Loading Data Loader Data sourceAPI File, HTTP, Kafka forked. DRAM Store customized. Flash Store tiering Architecture of Geospatial Visualization 23 Map Rendering HTTP API Vector Tile Spark Job • Front-end : MapBoxJS - MapBox uses VectorTile to render on overlay layers. - Get VectorTile via HTTP API • Back-end Web server - Build VectorTiles with Spark Job. - Apache LIVY : manipulate multiple Spark Contexts simultaneously Geospatial objectScan Cmd. Spark Job
  • 24.
    Data Loading Data Loader Data sourceAPI File, HTTP, Kafka forked. DRAM Store customized. Flash Store tiering Architecture of Geospatial Visualization 24 Map Rendering HTTP API Vector Tile Spark Job • Front-end : MapBoxJS - MapBox uses VectorTile to render on overlay layers. - Get VectorTile via HTTP API • Back-end Web server - Build VectorTiles with Spark Job. - Apache LIVY : manipulate multiple Spark Contexts simultaneously • Spark Cluster & Geo-Spark - Geo-Spark : support Geospatial UDFs and PredicatesGeospatial objectScan Cmd. Spark Job Data source API
  • 25.
    Data Loading Data Loader Data sourceAPI File, HTTP, Kafka forked. DRAM Store customized. Flash Store tiering Architecture of Geospatial Visualization 25 Map Rendering HTTP API Vector Tile Spark Job • Front-end : MapBoxJS - MapBox uses VectorTile to render on overlay layers. - Get VectorTile via HTTP API • Back-end Web server - Build VectorTiles with Spark Job. - Apache LIVY : manipulate multiple Spark Contexts simultaneously • Spark Cluster & Geo-Spark - Geo-Spark : support Geospatial UDFs and Predicates • FlashBase - FlashBase stores objects with latitude and longitude. Each data is partitioned by its GeoHash Geospatial objectScan Cmd. Spark Job Data source API forked. DRAM Store customized. Flash Store tiering Data Loader
  • 26.
    Data Loading Data Loader Data sourceAPI File, HTTP, Kafka forked. DRAM Store customized. Flash Store tiering Architecture of Geospatial Visualization 26 Map Rendering HTTP API Vector Tile Spark Job Geospatial objectScan Cmd. Spark Job Data source API forked. DRAM Store customized. Flash Store tiering Data Loader Problem -Latency issue of HTTP API : Sluggish map loading ! -Building VectorTile needs heavy computation & shuffling for aggregation. 256 X 256 pix. ▸ If web-client shows 20 Tiles, → 1.3 million (256 x 256 x 20) AGG operations are required. → Cause heavy computations & shuffle writing in Spark. ▸ If user scroll map, → all tiles should be re-calculated. Building VectorTile requires AGG GROUP BY pixel.
  • 27.
    Data Loading Data Loader Data sourceAPI File, HTTP, Kafka forked. DRAM Store customized. Flash Store tiering Architecture of Geospatial Visualization 27 Map Rendering HTTP API Vector Tile Spark Job Scan Cmd. Spark Job Data source API forked. DRAM Store customized. Flash Store tiering Data Loader • Optimization of performance 1.Spark pushdowns aggregation to FlashBase FlashBase sends aggregated results to Spark → Reduce Shuffle writing size and computation of Spark to 1/10. 2. FlashBase accelerates aggregation with vector-processing via Intel’s AVX-512 (Intel Math Kernel Library) → 2 x faster aggregation. → 20 times faster than original GeoSpark (1)Pushdown aggregation (3)Aggregated results (2)Accelerate aggregation via AVX-512
  • 28.
    Optimization Detail 28 The Querybuilding features of VectorTile SELECT * FROM pcell WHERE ST_VectorTileAggr('7,109,49’, ‘AVG') 1. ST_VectorTileAggr(arg1, arg2) - Custom predicate which contains aggregation information. - arg1 : zoom level of map & tile pos (x, y) in Globe - arg2 : aggregation type (SUM or AVG) 2. Define & Apply a custom optimization rule - Applied during optimization phase of query plan. - Parse aggregation information from predicate and pushdown it to FlashBase 3. Aggregation in FlashBase - Parallelized computation by FlashBase process count (Generally 100 ~ 200 process / node) - Each process of FlashBase accelerates aggregation using Intel MKL. 256 X 256 pix.
  • 29.
    Contents • Network QualityAnalysis • Geospatial Visualization • Network Quality Prediction • Wrap up 29
  • 30.
    Introduction of NetworkQuality Prediction 30 • Predict Network Quality Indicators (CQI, RSRP, RSRQ, SINR, …) for anomaly detection and real-time management • Goal : Unify Geospatial visualization & Network Prediction On Spark * CQI : Channel Quality Indicator * RSRP : Reference Signal Received Power * RSRQ : Reference Signal Received Quality * SINR :Signal to Interference Noise Ratio *
  • 31.
    We focused on 31 1.Improving deep learning model for forecasting time series data 2. Improving architecture and data pipeline for training and inference
  • 32.
    Model for NetworkQuality Prediction - RNN 32 • RNN type model(Seq2Seq) is common solution for time-series prediction. But not suitable for our network quality prediction. Seq2Seq Actual Forecast Error: MAE Score: Error*100 Cannot predict sudden change!
  • 33.
    Memory augmented model 33 memory1memory2 memory7 current ▪ ▪ ▪ ▪ Attention layer memory3 ▪ ▪ ▪ ▪ ▪ ▪ ▪ ▪ Encoder1 Encoder2▪ ▪ ▪ ▪Encoder1 Encoder1 Encoder1 1-week data Concat FCNN !𝑦#$% Final prediction 1 32 4 5 Current Recent 50 min data with 5 min period Memory Previous 7 days historical data each of which has same time band with current and target. Target Network quality after 5 min • Encoder : 1-NN (Autoregressive term) Encoder1 : ℎ# = 𝑐 + 𝑤% 𝑦# +,-./0+% + … + 𝑤%% 𝑦#+, -./0+%% Encoder2 : ℎ# 2 = 𝑐2 + 𝑤% 2 𝑦#+% + … + 𝑤′%4 𝑦#+%4 1 2 3 4 5
  • 34.
    Memory augmented model 34 memory1memory2 memory7 current ▪ ▪ ▪ ▪ Attention layer memory3 ▪ ▪ ▪ ▪ ▪ ▪ ▪ ▪ Encoder1 Encoder2▪ ▪ ▪ ▪Encoder1 Encoder1 Encoder1 1-week data Concat FCNN !𝑦#$% Final prediction 𝑚# = memory for step 𝑡 𝑐 = current state Attention Layer (1 layer neural-network ) 𝑠𝑐𝑜𝑟𝑒#= 𝑣< tanh(𝑊.[𝑚#; 𝑐]) (𝑣, 𝑊.: weight parameters) 𝛼#= 𝑠𝑜𝑓𝑡𝑚𝑎𝑥(𝑠𝑐𝑜𝑟𝑒#) Attention Vector Attention weighted summation of 𝑚# Fully connected neural-network 1 1 2 3 4 5 2 3 4 5
  • 35.
    Memory augmented model- Test result 35 Actual Forecast Error: MAE Score: Error*100 Mem- model Improved predictions for sudden change!
  • 36.
    Training & InferenceArchitecture - Legacy 36 SQL Queries (Web, Jupyter) Spark-SQL Data Loading Data Loader Data Source APIs JDBCFile, HTTP, Kafka forked. DRAM Store customized. Flash Store tiering export Preprocessing Training & Inferencing 1. Export data to CSV from Spark ThriftServer using Hive Client 2. Preprocessing with pandas. 3. Train or infer with TensorFlow CPU
  • 37.
    Training & InferenceArchitecture - Legacy 37 SQL Queries (Web, Jupyter) Spark-SQL Data Loading Data Loader Data Source APIs JDBCFile, HTTP, Kafka forked. DRAM Store customized. Flash Store tiering export Preprocessing Training & Inferencing Problem 1. No in-memory Pipeline between data source and Deep-Learning layer 2. Pre-processing & Inference & Training are performed in single server.
  • 38.
    Training & InferenceArchitecture - New 38 Spark-SQL Data Loading Data Loader Data Source APIs File, HTTP, Kafka forked. DRAM Store customized. Flash Store tiering Build In-memory Pipeline between FlashBase and Intel Analytics ZOO Data Layer And Inferencing & Training Layer are integrated into the same Spark Cluster Also share the same Spark session. Source Code : https://github.com/mnms/ARMemNet-BigDL Intel Analytics Zoo : Used to unify TF model into Spark Pipeline seamlessly. Intel BigDL : inference & training engine The processing of Inferencing & training can be distributed in Spark Cluster. Preprocess RDD of Tensor Model Code of TF DL Training & Inferencing Data Model Spark Cluster 1 1 3 2 3 2 SIMD Acceleration
  • 39.
    Comparison between twoarchitectures 39 • Now only inference result : Also has a plan to run the distributed training later • Test workload - 7,722,912 rows = 80,447 cell towers X 8 days X 12 rows (1 hour data with 5 minutes period) - 8 network indicators per row → Input tensor (80,447 X 696) = current input (80,447 X 10 X 8) + Memory input (80,447 X 77 X 8) Pandas + TF Spark + Analytics Zoo Data Export 2.3s N/A Pre-processing 71.96s local 2.56s 3 node Yarn 1.43s Deep Learning Inference 1.06s (CPU) / 0.63s (GPU) 0.68s 0.18s Performed in a single node - Data and computations are distributed in 50 partitions. - Preprocessing and inference are executed in a single Spark job 45 x faster ※ CPU : Intel(R) Xeon(R) Gold 6240 CPU @ 2.60GHz ※ GPU : Nvidia-K80
  • 40.
    Comparison between TFCPU and Analytics Zoo 40 • Compare Inference Time • Environment • CPU : Intel(R) Xeon(R) Gold 6240 CPU @ 2.60GHz • Core : 36 Batch Size Elapsed Time (TF-CPU) Elapsed Time (Analytics Zoo Scala on Spark) 32 14.28068566 2.58333056 64 8.387897015 1.446166912 128 4.871953249 0.679720256 256 2.947942972 0.426830048 512 2.030963659 0.400032064 1024 2.012846947 0.395362112 2048 1.44268322 0.430505056 3~5 times faster
  • 41.
    Appx. Memory Problemof Spark Driver 41 • collect() function of DataSet sometimes throws OOM while decompressing and deserializing result. → Job Fails and Spark Driver is killed. • Spark supports ‘spark.driver.maxResultSize’ config for this issue - it just reflects a compressed size - Actual result size would be 5x ~ 20x of compressed size. - It is difficult to adjust the config to protect driver from OOM. Result Stage Driver Executors Array of Compressed binaries Compressed result binaries decompress & deserialize Result Array[T] OutOfMemoryException!
  • 42.
    Appx. Memory Problemof Spark Driver - Solution 42 • Define collectAsSeqView function in DataSet - Define SeqView which just holds compressed results and decompressing operations - Driver decompresses and deserializes according to each fetching. - Decompressed & deserialized results are collected as garbage after cursor moves to next. - Only compressed binary reside in memory : memory of job can be limited by ‘spark.driver.maxResultSize’ → Completely protect the driver from OOM while collecting results • Define new function in DataSet which returns ‘SeqView’ of result. Result Stage Driver Executors Array of Compressed binaries Compressed result binaries Create SeqView Add Operation of decompressing and deserializing to View Return SeqView[T]
  • 43.
    Appx. Memory Problemof Spark Driver - Patch 43 • collectAsSeqView function only uses 10% ~ 20% memory compared to collect function. • Create Spark Pull Request which applies this to thrift server. - PR : SPARK-25224 (https://github.com/apache/spark/pull/22219) - Review in progress - Create Spark Pull Request which applies this to thrift serve
  • 44.
    Contents • Network QualityAnalysis • Geospatial Visualization • Network Quality Prediction • Wrap up 44
  • 45.
    Open Discussion 45 • Morepartitioning or indexing with less partitions • Spark datasource v2 and aggregation pushdown • Possible new directions of FlashBase for Spark ecosystem • Efficient end to end data pipeline for big data based inference and training
  • 46.
    How to useSpark with FlashBase 46 Free binary can be used (Not open sourced yet) • Public Cloud: AWS Marketplace AMI (~19.12.31), Cloud Formation (~20.3.31) • On-premise: github page (~20.3) First contact to us if you want to try FlashBase and get some help • e-mail: flashbase@sktelecom.com • homepage (temporary): https://flashbasedb.github.io
  • 47.
    47 Q & A VisitIntel & SKT Demo booth
  • 48.
    DON’T FORGET TORATE AND REVIEW THE SESSIONS SEARCH SPARK + AI SUMMIT