@doanduyhai Spark/Cassandra connector API, Best Practices & Use-Cases DuyHai DOAN, Technical Advocate
@doanduyhai Who Am I ?! Duy Hai DOAN Cassandra technical advocate •  talks, meetups, confs •  open-source devs (Achilles, …) •  OSS Cassandra point of contact ☞ duy_hai.doan@datastax.com ☞ @doanduyhai 2
@doanduyhai Datastax! •  Founded in April 2010 •  We contribute a lot to Apache Cassandra™ •  400+ customers (25 of the Fortune 100), 200+ employees •  Headquarter in San Francisco Bay area •  EU headquarter in London, offices in France and Germany •  Datastax Enterprise = OSS Cassandra + extra features 3
@doanduyhai Agenda! •  Connector API by example •  Best practices •  Use-cases •  The “Big Data Platform” 4
Spark/Cassandra connector API! Spark Core! Spark SQL! Spark Streaming! !
@doanduyhai Connector architecture! All Cassandra types supported and converted to Scala types Server side data filtering (SELECT … WHERE …) Use Java-driver underneath ! Scala and Java support 6
@doanduyhai Connector architecture – Core API! Cassandra tables exposed as Spark RDDs Read from and write to Cassandra Mapping of Cassandra tables and rows to Scala objects •  CassandraRow •  Scala case class (object mapper) •  Scala tuples 7
Spark Core https://github.com/doanduyhai/Cassandra-Spark-Demo
@doanduyhai Connector architecture – Spark SQL ! Mapping of Cassandra table to SchemaRDD •  CassandraSQLRow à SparkRow •  custom query plan •  push predicates to CQL for early filtering SELECT * FROM user_emails WHERE login = ‘jdoe’; 9
Spark SQL https://github.com/doanduyhai/Cassandra-Spark-Demo
@doanduyhai Connector architecture – Spark Streaming ! Streaming data INTO Cassandra table •  trivial setup •  be careful about your Cassandra data model when having an infinite stream !!! Streaming data OUT of Cassandra tables (CDC) ? •  work in progress … 11
Spark Streaming https://github.com/doanduyhai/Cassandra-Spark-Demo
Q & R ! "!
Spark/Cassandra best practices! Data locality! Failure handling! Cross-region operations!
@doanduyhai Cluster deployment! C* SparkM SparkW C* SparkW C* SparkW C* SparkW C* SparkW Stand-alone cluster 15
@doanduyhai Remember token ranges ?! A: ]0, X/8] B: ] X/8, 2X/8] C: ] 2X/8, 3X/8] D: ] 3X/8, 4X/8] E: ] 4X/8, 5X/8] F: ] 5X/8, 6X/8] G: ] 6X/8, 7X/8] H: ] 7X/8, X] n1 n2 n3 n4 n5 n6 n7 n8 A B C D E F G H 16
@doanduyhai Data Locality! C* SparkM SparkW C* SparkW C* SparkW C* SparkW C* SparkW Spark partition RDD Cassandra tokens ranges 17
@doanduyhai Data Locality! C* SparkM SparkW C* SparkW C* SparkW C* SparkW C* SparkW Use Murmur3Partitioner 18
@doanduyhai Read data locality! Read from Cassandra 19
@doanduyhai Read data locality! Spark shuffle operations 20
@doanduyhai Write to Cassandra without data locality! Async batches fan-out writes to Cassandra 21 Because of shuffle, original data locality is lost
@doanduyhai Or repartition before write ! Write to Cassandra rdd.repartitionByCassandraReplica("keyspace","table") 22
@doanduyhai Write data locality! 23 •  either stream data in Spark layer using repartitionByCassandraReplica() •  or flush data to Cassandra by async batches •  in any case, there will be data movement on network (sorry no magic)
@doanduyhai Joins with data locality! 24 CREATE TABLE artists(name text, style text, … PRIMARY KEY(name)); CREATE TABLE albums(title text, artist text, year int,… PRIMARY KEY(title)); val join: CassandraJoinRDD[(String,Int), (String,String)] = sc.cassandraTable[(String,Int)](KEYSPACE, ALBUMS) // Select only useful columns for join and processing .select("artist","year") .as((_:String, _:Int)) // Repartition RDDs by "artists" PK, which is "name" .repartitionByCassandraReplica(KEYSPACE, ARTISTS) // Join with "artists" table, selecting only "name" and "country" columns .joinWithCassandraTable[(String,String)](KEYSPACE, ARTISTS, SomeColumns("name","country")) .on(SomeColumns("name"))
@doanduyhai Joins pipeline with data locality! 25 LOCAL READ FROM CASSANDRA
@doanduyhai Joins pipeline with data locality! 26 SHUFFLE DATA WITH SPARK
@doanduyhai Joins pipeline with data locality! 27 REPARTITION TO MAP CASSANDRA REPLICA
@doanduyhai Joins pipeline with data locality! 28 JOIN WITH DATA LOCALITY
@doanduyhai Joins pipeline with data locality! 29 ANOTHER ROUND OF SHUFFLING
@doanduyhai Joins pipeline with data locality! 30 REPARTITION AGAIN FOR CASSANDRA
@doanduyhai Joins pipeline with data locality! 31 SAVE TO CASSANDRA WITH LOCALITY
@doanduyhai Perfect data locality scenario! 32 •  read localy from Cassandra •  use operations that do not require shuffle in Spark (map, filter, …) •  repartitionbyCassandraReplica() •  à to a table having same partition key as original table •  save back into this Cassandra table Sanitize, validate, normalize, transform data USE CASE
@doanduyhai Failure handling! Stand-alone cluster 33 C* SparkM SparkW C* SparkW C* SparkW C* SparkW C* SparkW
@doanduyhai Failure handling! What if 1 node down ? What if 1 node overloaded ? 34 C* SparkM SparkW C* SparkW C* SparkW C* SparkW C* SparkW
@doanduyhai Failure handling! What if 1 node down ? What if 1 node overloaded ? ☞ Spark masterwill re-assign the job to another worker 35 C* SparkM SparkW C* SparkW C* SparkW C* SparkW C* SparkW
@doanduyhai Failure handling! Oh no, my data locality !!! 36
@doanduyhai Failure handling! 37
@doanduyhai Data Locality Impl! 38 Remember RDD interface ? abstract'class'RDD[T](…)'{' ' @DeveloperApi' ' def'compute(split:'Partition,'context:'TaskContext):'Iterator[T]' ' ' protected'def'getPartitions:'Array[Partition]' ' ' ' protected'def'getPreferredLocations(split:'Partition):'Seq[String]'='Nil'''''''' }'
@doanduyhai Data Locality Impl! 39 def getPreferredLocations(split: Partition): Cassandra replicas IP address corresponding to this Spark partition
@doanduyhai Failure handling! If RF > 1 the Spark master chooses the next preferred location, which is a replica 😎 Tune parameters: ①  spark.locality.wait ②  spark.locality.wait.process ③  spark.locality.wait.node 40 C* SparkM SparkW C* SparkW C* SparkW C* SparkW C* SparkW
@doanduyhai val confDC1 = new SparkConf(true) .setAppName("data_migration") .setMaster("master_ip") .set("spark.cassandra.connection.host", "DC_1_hostnames") .set("spark.cassandra.connection.local_dc", "DC_1") val confDC2 = new SparkConf(true) .setAppName("data_migration") .setMaster("master_ip") .set("spark.cassandra.connection.host", "DC_2_hostnames") .set("spark.cassandra.connection.local_dc", "DC_2 ") val sc = new SparkContext(confDC1) sc.cassandraTable[Performer](KEYSPACE,PERFORMERS) .map[Performer](???) .saveToCassandra(KEYSPACE,PERFORMERS) (CassandraConnector(confDC2),implicitly[RowWriterFactory[Performer]]) Cross-DC operations! 41
@doanduyhai val confCluster1 = new SparkConf(true) .setAppName("data_migration") .setMaster("master_ip") .set("spark.cassandra.connection.host", "cluster_1_hostnames") val confCluster2 = new SparkConf(true) .setAppName("data_migration") .setMaster("master_ip") .set("spark.cassandra.connection.host", "cluster_2_hostnames") val sc = new SparkContext(confCluster1) sc.cassandraTable[Performer](KEYSPACE,PERFORMERS) .map[Performer](???) .saveToCassandra(KEYSPACE,PERFORMERS) (CassandraConnector(confCluster2),implicitly[RowWriterFactory[Performer]]) Cross-cluster operations! 42
Q & R ! "!
Spark/Cassandra use-cases! Data cleaning! Schema migration! Analytics! !
@doanduyhai Use Cases! Load data from various sources Analytics (join, aggregate, transform, …) Sanitize, validate, normalize, transform data Schema migration, Data conversion 45
@doanduyhai Data cleaning use-cases! 46 Bug in your application ? Dirty input data ? ☞ Spark job to clean it up! (perfect data locality) Sanitize, validate, normalize, transform data
Data Cleaning https://github.com/doanduyhai/Cassandra-Spark-Demo
@doanduyhai Schema migration use-cases! 48 Business requirements change with time ? Current data model no longer relevant ? ☞ Spark job to migrate data ! Schema migration, Data conversion
Data Migration https://github.com/doanduyhai/Cassandra-Spark-Demo
@doanduyhai Analytics use-cases! 50 Given existing tables of performers and albums, I want: ①  top 10 most common music styles (pop,rock, RnB, …) ? ②  performer productivity(albums count) by origin country and by decade ? ☞ Spark job to compute analytics ! Analytics (join, aggregate, transform, …)
@doanduyhai Analytics pipeline! 51 ①  Read from production transactional tables ②  Perform aggregation with Spark ③  Save back data into dedicated tables for fast visualization ④  Repeat step ①
Analytics https://github.com/doanduyhai/Cassandra-Spark-Demo
Q & R ! "!
The “Big Data Platform”!
@doanduyhai Our vision! 55 We had a dream … to provide a Big Data Platform … built for the performance & high availabitly demands of IoT, web & mobile applications
@doanduyhai Until now in Datastax Enterprise! Cassandra + Solr in same JVM Unlock full text search power for Cassandra CQL syntax extension 56 SELECT * FROM users WHERE solr_query = ‘age:[33 TO *] AND gender:male’; SELECT * FROM users WHERE solr_query = ‘lastname:*schwei?er’;
Cassandra + Solr
@doanduyhai Now with Spark! Cassandra + Spark Unlock full analytics power for Cassandra Spark/Cassandra connector 58
@doanduyhai Tomorrow (DSE 4.7)! Cassandra + Spark + Solr Unlock full text search + analytics power for Cassandra 59
@doanduyhai Tomorrow (DSE 4.7)! Cassandra + Spark + Solr Unlock full text search + analytics power for Cassandra 60
@doanduyhai The idea! ①  Filter the maximum with Cassandra and Solr ②  Fetch only small data set in memory ③  Aggregate with Spark ☞ near real time interactive analytics query possible if restrictive criteria 61
@doanduyhai Datastax Entreprise 4.7! With a 3rd component for full text search … how to preserve data locality ? 62
@doanduyhai Stand-alone search cluster caveat! The bad way v1: perform search from the Spark « driver program » 63
@doanduyhai Stand-alone search cluster caveat! The bad way v2: search from Spark workers with restricted routing 64
@doanduyhai Stand-alone search cluster caveat! The bad way v3: search from Cassandra nodes with a connector to Solr 65
@doanduyhai Stand-alone search cluster caveat! The ops won’t be your friend •  3 clusters to manage: Spark, Cassandra & Solr/whatever •  lots of moving parts Impacts on Spark jobs •  increased response time due to latency •  the 99.9 percentile can be very slow 66
@doanduyhai Datastax Entreprise 4.7! The right way, distributed « local search » 67
@doanduyhai val join: CassandraJoinRDD[(String,Int), (String,String)] = sc.cassandraTable[(String,Int)](KEYSPACE, ALBUMS) // Select only useful columns for join and processing .select("artist","year").where("solr_query = 'style:*rock* AND ratings:[3 TO *]' ") .as((_:String, _:Int)) .repartitionByCassandraReplica(KEYSPACE, ARTISTS) .joinWithCassandraTable[(String,String)](KEYSPACE, ARTISTS, SomeColumns("name","country")) .on(SomeColumns("name")).where("solr_query = 'age:[20 TO 30]' ") Datastax Entreprise 4.7! ①  compute Spark partitions using Cassandra token ranges ②  on each partition, use Solr for local data filtering (no distributed query!) ③  fetch data back into Spark for aggregations 68
@doanduyhai Datastax Entreprise 4.7! 69 SELECT … FROM … WHERE token(#partition)> 3X/8 AND token(#partition)<= 4X/8 AND solr_query='full text search expression'; 1 2 3 Advantages of same JVM Cassandra + Solr integration 1 Single-pass local full text search (no fan out) 2 Data retrieval Token Range : ] 3X/8, 4X/8]
@doanduyhai Datastax Entreprise 4.7! Scalable solution: x2 volume à x2 nodes à constant processing time 70
Cassandra + Spark + Solr https://github.com/doanduyhai/Cassandra-Spark-Demo
Q & R ! "!
Thank You @doanduyhai duy_hai.doan@datastax.com https://academy.datastax.com/

Spark cassandra connector.API, Best Practices and Use-Cases