Streaming Big Data with Apache Spark, Kafka and Cassandra Helena Edelson @helenaedelson ©2014 DataStax Confidential. Do not distribute without consent. 1
1 Delivering Meaning In Near-Real Time At High Velocity 2 Overview Of Spark Streaming, Cassandra, Kafka and Akka 3 The Spark Cassandra Connector 4 Integration In Big Data Applications © 2014 DataStax, All Rights Reserved. 2
Who Is This Person? • Using Scala in production since 2010! • Spark Cassandra Connector Committer! • Akka (Cluster) Contributor ! • Scala Driver for Cassandra Committer ! • @helenaedelson! • https://github.com/helena ! • Senior Software Engineer, Analytics Team, DataStax! • Spent the last few years as a Senior Cloud Engineer Analytic Analytic
Analytic Analytic Search Spark WordCount
Delivering Meaning In Near-Real Time At High Velocity At Massive Scale ©2014 DataStax Confidential. Do not distribute without consent. 5
Strategies • Partition For Scale! • Replicate For Resiliency! • Share Nothing! • Asynchronous Message Passing! • Parallelism! • Isolation! • Location Transparency
What We Need • Fault Tolerant ! • Failure Detection! • Fast - low latency, distributed, data locality! • Masterless, Decentralized Cluster Membership! • Span Racks and DataCenters! • Hashes The Node Ring ! • Partition-Aware! • Elasticity ! • Asynchronous - message-passing system! • Parallelism! • Network Topology Aware!
Why Akka Rocks • location transparency! • fault tolerant! • async message passing! • non-deterministic! • share nothing! • actor atomicity (w/in actor)! !
Apache Kafka! From Joe Stein
Lambda Architecture Spark SQL structured Spark Core Cassandra Spark Streaming real-time MLlib machine learning GraphX graph Apache ! Kafka
Apache Spark and Spark Streaming ©2014 DataStax Confidential. Do not distribute without consent. 12 The Drive-By Version
Analytic Analytic Search What Is Apache Spark • Fast, general cluster compute system! • Originally developed in 2009 in UC Berkeley’s AMPLab! • Fully open sourced in 2010 – now at Apache Software Foundation! • Distributed, Scalable, Fault Tolerant
Apache Spark - Easy to Use & Fast • 10x faster on disk,100x faster in memory than Hadoop MR! • Works out of the box on EMR! • Fault Tolerant Distributed Datasets! • Batch, iterative and streaming analysis! • In Memory Storage and Disk ! • Integrates with Most File and Storage Options Up to 100× faster (2-10× on disk) Analytic 2-5× less code Analytic Search
Spark Components Spark SQL structured Spark Core Spark Streaming real-time MLlib machine learning GraphX graph
• Functional • On the JVM • Capture functions and ship them across the network • Static typing - easier to control performance • Leverage Scala REPL for the Spark REPL http://apache-spark-user-list.1001560.n3.nabble.com/Why-Scala-tp6536p6538. html Analytic Analytic Search Why Scala?
Spark Versus Spark Streaming zillions of bytes gigabytes per second
Input & Output Sources
Analytic Analytic Search Spark Streaming Kinesis,'S3'
Common Use Cases applications sensors web mobile phones intrusion detection malfunction detection site analytics network metrics analysis fraud detection dynamic process optimisation recommendations location based ads log processing supply chain planning sentiment analysis …
DStream - Micro Batches • Continuous sequence of micro batches! • More complex processing models are possible with less effort! • Streaming computations as a series of deterministic batch computations on small time intervals DStream μBatch (ordinary RDD) μBatch (ordinary RDD) μBatch (ordinary RDD) Processing of DStream = Processing of μBatches, RDDs
DStreams representing the stream of raw data received from streaming sources. ! !•!Basic sources: Sources directly available in the StreamingContext API! !•!Advanced sources: Sources available through external modules available in Spark as artifacts 22 InputDStreams
Spark Streaming Modules GroupId ArtifactId Latest Version org.apache.spark spark-streaming-kinesis-asl_2.10 1.1.0 org.apache.spark spark-streaming-mqtt_2.10 1.1.0 all (7) org.apache.spark spark-streaming-zeromq_2.10 1.1.0 all (7) org.apache.spark spark-streaming-flume_2.10 1.1.0 all (7) org.apache.spark spark-streaming-flume-sink_2.10 1.1.0 org.apache.spark spark-streaming-kafka_2.10 1.1.0 all (7) org.apache.spark spark-streaming-twitter_2.10 1.1.0 all (7)
Streaming Window Operations // where pairs are (word,count) pairsStream .flatMap { case (k,v) => (k,v.value) } .reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10)) .saveToCassandra(keyspace,table) 24
Streaming Window Operations • window(Duration, Duration) • countByWindow(Duration, Duration) • reduceByWindow(Duration, Duration) • countByValueAndWindow(Duration, Duration) • groupByKeyAndWindow(Duration, Duration) • reduceByKeyAndWindow((V, V) => V, Duration, Duration)
Ten Things About Cassandra You always wanted to know but were afraid to ask ©2014 DataStax Confidential. Do not distribute without consent. 26
Quick intro to Cassandra • Shared nothing • Masterless peer-to-peer • Based on Dynamo • Highly Scalable • Spans DataCenters
Apache Cassandra • Elasticity - scale to as many nodes as you need, when you need! •! Always On - No single point of failure, Continuous availability!! !•! Masterless peer to peer architecture! •! Designed for Replication! !•! Flexible Data Storage! !•! Read and write to any node syncs across the cluster! !•! Operational simplicity - with all nodes in a cluster being the same, there is no complex configuration to manage
Easy to use • CQL - familiar syntax! • Friendly to programmers! • Paxos for locking CREATE TABLE users (! username varchar,! firstname varchar,! lastname varchar,! email list<varchar>,! password varchar,! created_date timestamp,! PRIMARY KEY (username)! ); INSERT INTO users (username, firstname, lastname, ! email, password, created_date)! VALUES ('hedelson','Helena','Edelson',! [‘helena.edelson@datastax.com'],'ba27e03fd95e507daf2937c937d499ab','2014-11-15 13:50:00');! INSERT INTO users (username, firstname, ! lastname, email, password, created_date)! VALUES ('pmcfadin','Patrick','McFadin',! ['patrick@datastax.com'],! 'ba27e03fd95e507daf2937c937d499ab',! '2011-06-20 13:50:00')! IF NOT EXISTS;
Spark Cassandra Connector ©2014 DataStax Confidential. Do not distribute without consent. 30
Spark On Cassandra • Server-Side filters (where clauses)! • Cross-table operations (JOIN, UNION, etc.)! • Data locality-aware (speed)! • Data transformation, aggregation, etc. ! • Natural Time Series Integration
Spark Cassandra Connector • Loads data from Cassandra to Spark! • Writes data from Spark to Cassandra! • Implicit Type Conversions and Object Mapping! • Implemented in Scala (offers a Java API)! • Open Source ! • Exposes Cassandra Tables as Spark RDDs + Spark DStreams
Spark Cassandra Connector !https://github.com/datastax/spark-cassandra-connector C* User Application Spark-Cassandra Connector Cassandra C* C* C* Spark Executor C* Java (Soon Scala) Driver
Use Cases: KillrWeather • Get data by weather station! • Get data for a single date and time! • Get data for a range of dates and times! • Compute, store and quickly retrieve daily, monthly and annual aggregations of data! Data Model to support queries • Store raw data per weather station • Store time series in order: most recent to oldest • Compute and store aggregate data in the stream • Set TTLs on historic data
Cassandra Data Model user_name tweet_id publication_date message pkolaczk 4b514a41-­‐9cf... 2014-­‐08-­‐21 10:00:04 ... pkolaczk 411bbe34-­‐11d... 2014-­‐08-­‐24 16:03:47 ... ewa 73847f13-­‐771... 2014-­‐08-­‐21 10:00:04 ... ewa 41106e9a-­‐5ff... 2014-­‐08-­‐21 10:14:51 ... row partitioning key partition primary key data columns
Spark Data Model A1 A2 A3 A4 A5 A6 A7 A8 map B1 B2 B3 B4 B5 B6 B7 B8 filter B1 B2 B5 B7 B8 C reduce Resilient Distributed Dataset! ! A Scala collection:! • immutable! • iterable! • serializable! • distributed! • parallel! • lazy evaluation
Spark Cassandra Example val conf = new SparkConf(loadDefaults = true) .set("spark.cassandra.connection.host", "127.0.0.1") .setMaster("spark://127.0.0.1:7077") ! val sc = new SparkContext(conf) ! ! val table: CassandraRDD[CassandraRow] = sc.cassandraTable("keyspace", "tweets") ! val ssc = new StreamingContext(sc, Seconds(30)) val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafka.kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY) stream.map(_._2).countByValue().saveToCassandra("demo", "wordcount") ssc.start() ssc.awaitTermination() ! Initialization CassandraRDD Stream Initialization Transformations and Action
Spark Cassandra Example val sc = new SparkContext(..) val ssc = new StreamingContext(sc, Seconds(5)) ! val stream = TwitterUtils.createStream(ssc, auth, filters, StorageLevel.MEMORY_ONLY_SER_2) val transform = (cruft: String) => Pattern.findAllIn(cruft).flatMap(_.stripPrefix("#")) /** Note that Cassandra is doing the sorting for you here. */ stream.flatMap(_.getText.toLowerCase.split("""s+""")) .map(transform) .countByValueAndWindow(Seconds(5), Seconds(5)) .transform((rdd, time) => rdd.map { case (term, count) => (term, count, now(time))}) .saveToCassandra(keyspace, suspicious, SomeColumns(“suspicious", "count", “timestamp"))
Reading: From C* To Spark row representation keyspace table val table = sc .cassandraTable[CassandraRow]("keyspace", "tweets") .select("user_name", "message") .where("user_name = ?", "ewa") server side column and row selection
CassandraRDD class CassandraRDD[R](..., keyspace: String, table: String, ...) extends RDD[R](...) { // Splits the table into multiple Spark partitions, // each processed by single Spark Task override def getPartitions: Array[Partition] // Returns names of hosts storing given partition (for data locality!) override def getPreferredLocations(split: Partition): Seq[String] // Returns iterator over Cassandra rows in the given partition override def compute(split: Partition, context: TaskContext): Iterator[R] }
CassandraStreamingRDD /** RDD representing a Cassandra table for Spark Streaming. * @see [[com.datastax.spark.connector.rdd.CassandraRDD]] */ class CassandraStreamingRDD[R] private[connector] ( sctx: StreamingContext, connector: CassandraConnector, keyspace: String, table: String, columns: ColumnSelector = AllColumns, where: CqlWhereClause = CqlWhereClause.empty, readConf: ReadConf = ReadConf())( implicit ct : ClassTag[R], @transient rrf: RowReaderFactory[R]) extends CassandraRDD[R](sctx.sparkContext, connector, keyspace, table, columns, where, readConf)
Paging Reads with .cassandraTable • Page size is configurable! • Controls how many CQL rows to fetch at a time, when fetching a single partition! • Connector returns an iterator for rows to Spark! • Spark iterates over this, lazily ! • Handled by the java driver as well as spark
ResultSet Paging and Pre-Fetching Node 1 Client Cassandra Node 1 request a page data process data request a page data request a page Node 2 Client Cassandra Node 2 request a page data process data request a page data request a page
Co-locate Spark and C* for Best Performance 44 C* C* C* Spark Worker C* Spark Worker Spark Master Spark Worker Running Spark Workers on the same nodes as your C* Cluster will save network hops when reading and writing
The Key To Speed - Data Locality • LocalNodeFirstLoadBalancingPolicy! • Decides what node will become the coordinator for the given mutation/read! • Selects local node first and then nodes in the local DC in random order! • Once that node receives the request it will be distributed ! • Proximal Node Sort Defined by the C* snitch! Analytic •https://github.com/apache/cassandra/blob/trunk/src/java/org/ apache/cassandra/locator/DynamicEndpointSnitch.java#L155- Analytic L190 Search
Column Type Conversions trait TypeConverter[T] extends Serializable { def targetTypeTag: TypeTag[T] def convertPF: PartialFunction[Any, T] } ! class CassandraRow(...) { def get[T](index: Int)(implicit c: TypeConverter[T]): T = … def get[T](name: String)(implicit c: TypeConverter[T]): T = … … } ! implicit object IntConverter extends TypeConverter[Int] { def targetTypeTag = implicitly[TypeTag[Int]] convertPF = { case def x: Number => x.intValue case x: String => x.toInt } }
Rows to Case Class Instances val row: CassandraRow = table.first case class Tweet(userName: String, tweetId: UUID, publicationDate: Date, message: String) extends Serializable ! val tweets = sc.cassandraTable[Tweet]("db", "tweets") Scala Cassandra message message column1 column_1 userName user_name Scala Cassandra Message Message column1 column1 userName userName Recommended convention:
Convert Rows To Tuples ! val tweets = sc .cassandraTable[(String, String)]("db", "tweets") .select("userName", "message") When returning tuples, always use select to ! specify the column order!
Handling Unsupported Types case class Tweet( userName: String, tweetId: my.private.implementation.of.UUID, publicationDate: Date, message: String) ! val tweets: CassandraRDD[Tweet] = sc.cassandraTable[Tweet]("db", "tweets") Current behavior: runtime error Future: compile-time error (macros!) Macros could even parse Cassandra schema!
Connector Code and Docs https://github.com/datastax/spark-cassandra-connector! ! ! ! ! ! ! Add It To Your Project:! ! val connector = "com.datastax.spark" %% "spark-­‐cassandra-­‐connector" % "1.1.0-­‐beta2"
What’s New In Spark? •Petabyte sort record! • Myth busting: ! • Spark is in-memory. It doesn’t work with big data! • It’s too expensive to buy a cluster with enough memory to fit our data! •Application Integration! • Tableau, Trifacta, Talend, ElasticSearch, Cassandra! •Ongoing development for Spark 1.2! • Python streaming, new MLib API, Yarn scaling…
Recent / Pending Spark Updates •Usability, Stability & Performance Improvements ! • Improved Lightning Fast Shuffle!! • Monitoring the performance long running or complex jobs! •Public types API to allow users to create SchemaRDD’s! •Support for registering Python, Scala, and Java lambda functions as UDF ! •Dynamic bytecode generation significantly speeding up execution for queries that perform complex expression evaluation
Recent Spark Streaming Updates •Apache Flume: a new pull-based mode (simplifying deployment and providing high availability)! •The first of a set of streaming machine learning algorithms is introduced with streaming linear regression.! •Rate limiting has been added for streaming inputs
Recent MLib Updates • New library of statistical packages which provides exploratory analytic functions *stratified sampling, correlations, chi-squared tests, creating random datasets…)! • Utilities for feature extraction (Word2Vec and TF-IDF) and feature transformation (normalization and standard scaling). ! • Support for nonnegative matrix factorization and SVD via Lanczos. ! • Decision tree algorithm has been added in Python and Java. ! • Tree aggregation primitive! • Performance improves across the board, with improvements of around
Recent/Pending Connector Updates •Spark SQL (came in 1.1)! •Python API! •Official Scala Driver for Cassandra! •Removing last traces of Thrift!!! •Performance Improvements! • Token-aware data repartitioning! • Token-aware saving! • Wide-row support - no costly groupBy call
Resources •Spark Cassandra Connector ! https://github.com/datastax/spark-cassandra-connector ! •Apache Cassandra http://cassandra.apache.org! •Apache Spark http://spark.apache.org ! •Apache Kafka http://kafka.apache.org ! •Akka http://akka.io Analytic Analytic
www.spark-summit.org
Thanks for listening! SEPTEMBER 10 -­‐ 11, 2014 | SAN FRANCISCO, CALIF. | THE WESTIN ST. FRANCIS HOTEL Cassandra Summit

Delivering Meaning In Near-Real Time At High Velocity In Massive Scale with Apache: Spark, Kafka, Cassandra and Akka

  • 1.
    Streaming Big Datawith Apache Spark, Kafka and Cassandra Helena Edelson @helenaedelson ©2014 DataStax Confidential. Do not distribute without consent. 1
  • 2.
    1 Delivering MeaningIn Near-Real Time At High Velocity 2 Overview Of Spark Streaming, Cassandra, Kafka and Akka 3 The Spark Cassandra Connector 4 Integration In Big Data Applications © 2014 DataStax, All Rights Reserved. 2
  • 3.
    Who Is ThisPerson? • Using Scala in production since 2010! • Spark Cassandra Connector Committer! • Akka (Cluster) Contributor ! • Scala Driver for Cassandra Committer ! • @helenaedelson! • https://github.com/helena ! • Senior Software Engineer, Analytics Team, DataStax! • Spent the last few years as a Senior Cloud Engineer Analytic Analytic
  • 4.
    Analytic Analytic Search Spark WordCount
  • 5.
    Delivering Meaning InNear-Real Time At High Velocity At Massive Scale ©2014 DataStax Confidential. Do not distribute without consent. 5
  • 7.
    Strategies • PartitionFor Scale! • Replicate For Resiliency! • Share Nothing! • Asynchronous Message Passing! • Parallelism! • Isolation! • Location Transparency
  • 8.
    What We Need • Fault Tolerant ! • Failure Detection! • Fast - low latency, distributed, data locality! • Masterless, Decentralized Cluster Membership! • Span Racks and DataCenters! • Hashes The Node Ring ! • Partition-Aware! • Elasticity ! • Asynchronous - message-passing system! • Parallelism! • Network Topology Aware!
  • 9.
    Why Akka Rocks • location transparency! • fault tolerant! • async message passing! • non-deterministic! • share nothing! • actor atomicity (w/in actor)! !
  • 10.
  • 11.
    Lambda Architecture SparkSQL structured Spark Core Cassandra Spark Streaming real-time MLlib machine learning GraphX graph Apache ! Kafka
  • 12.
    Apache Spark andSpark Streaming ©2014 DataStax Confidential. Do not distribute without consent. 12 The Drive-By Version
  • 13.
    Analytic Analytic Search What Is Apache Spark • Fast, general cluster compute system! • Originally developed in 2009 in UC Berkeley’s AMPLab! • Fully open sourced in 2010 – now at Apache Software Foundation! • Distributed, Scalable, Fault Tolerant
  • 14.
    Apache Spark -Easy to Use & Fast • 10x faster on disk,100x faster in memory than Hadoop MR! • Works out of the box on EMR! • Fault Tolerant Distributed Datasets! • Batch, iterative and streaming analysis! • In Memory Storage and Disk ! • Integrates with Most File and Storage Options Up to 100× faster (2-10× on disk) Analytic 2-5× less code Analytic Search
  • 15.
    Spark Components SparkSQL structured Spark Core Spark Streaming real-time MLlib machine learning GraphX graph
  • 16.
    • Functional •On the JVM • Capture functions and ship them across the network • Static typing - easier to control performance • Leverage Scala REPL for the Spark REPL http://apache-spark-user-list.1001560.n3.nabble.com/Why-Scala-tp6536p6538. html Analytic Analytic Search Why Scala?
  • 17.
    Spark Versus SparkStreaming zillions of bytes gigabytes per second
  • 18.
  • 19.
    Analytic Analytic Search Spark Streaming Kinesis,'S3'
  • 20.
    Common Use Cases applications sensors web mobile phones intrusion detection malfunction detection site analytics network metrics analysis fraud detection dynamic process optimisation recommendations location based ads log processing supply chain planning sentiment analysis …
  • 21.
    DStream - MicroBatches • Continuous sequence of micro batches! • More complex processing models are possible with less effort! • Streaming computations as a series of deterministic batch computations on small time intervals DStream μBatch (ordinary RDD) μBatch (ordinary RDD) μBatch (ordinary RDD) Processing of DStream = Processing of μBatches, RDDs
  • 22.
    DStreams representing thestream of raw data received from streaming sources. ! !•!Basic sources: Sources directly available in the StreamingContext API! !•!Advanced sources: Sources available through external modules available in Spark as artifacts 22 InputDStreams
  • 23.
    Spark Streaming Modules GroupId ArtifactId Latest Version org.apache.spark spark-streaming-kinesis-asl_2.10 1.1.0 org.apache.spark spark-streaming-mqtt_2.10 1.1.0 all (7) org.apache.spark spark-streaming-zeromq_2.10 1.1.0 all (7) org.apache.spark spark-streaming-flume_2.10 1.1.0 all (7) org.apache.spark spark-streaming-flume-sink_2.10 1.1.0 org.apache.spark spark-streaming-kafka_2.10 1.1.0 all (7) org.apache.spark spark-streaming-twitter_2.10 1.1.0 all (7)
  • 24.
    Streaming Window Operations // where pairs are (word,count) pairsStream .flatMap { case (k,v) => (k,v.value) } .reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10)) .saveToCassandra(keyspace,table) 24
  • 25.
    Streaming Window Operations • window(Duration, Duration) • countByWindow(Duration, Duration) • reduceByWindow(Duration, Duration) • countByValueAndWindow(Duration, Duration) • groupByKeyAndWindow(Duration, Duration) • reduceByKeyAndWindow((V, V) => V, Duration, Duration)
  • 26.
    Ten Things AboutCassandra You always wanted to know but were afraid to ask ©2014 DataStax Confidential. Do not distribute without consent. 26
  • 27.
    Quick intro toCassandra • Shared nothing • Masterless peer-to-peer • Based on Dynamo • Highly Scalable • Spans DataCenters
  • 28.
    Apache Cassandra •Elasticity - scale to as many nodes as you need, when you need! •! Always On - No single point of failure, Continuous availability!! !•! Masterless peer to peer architecture! •! Designed for Replication! !•! Flexible Data Storage! !•! Read and write to any node syncs across the cluster! !•! Operational simplicity - with all nodes in a cluster being the same, there is no complex configuration to manage
  • 29.
    Easy to use • CQL - familiar syntax! • Friendly to programmers! • Paxos for locking CREATE TABLE users (! username varchar,! firstname varchar,! lastname varchar,! email list<varchar>,! password varchar,! created_date timestamp,! PRIMARY KEY (username)! ); INSERT INTO users (username, firstname, lastname, ! email, password, created_date)! VALUES ('hedelson','Helena','Edelson',! [‘helena.edelson@datastax.com'],'ba27e03fd95e507daf2937c937d499ab','2014-11-15 13:50:00');! INSERT INTO users (username, firstname, ! lastname, email, password, created_date)! VALUES ('pmcfadin','Patrick','McFadin',! ['patrick@datastax.com'],! 'ba27e03fd95e507daf2937c937d499ab',! '2011-06-20 13:50:00')! IF NOT EXISTS;
  • 30.
    Spark Cassandra Connector ©2014 DataStax Confidential. Do not distribute without consent. 30
  • 31.
    Spark On Cassandra • Server-Side filters (where clauses)! • Cross-table operations (JOIN, UNION, etc.)! • Data locality-aware (speed)! • Data transformation, aggregation, etc. ! • Natural Time Series Integration
  • 32.
    Spark Cassandra Connector • Loads data from Cassandra to Spark! • Writes data from Spark to Cassandra! • Implicit Type Conversions and Object Mapping! • Implemented in Scala (offers a Java API)! • Open Source ! • Exposes Cassandra Tables as Spark RDDs + Spark DStreams
  • 33.
    Spark Cassandra Connector !https://github.com/datastax/spark-cassandra-connector C* User Application Spark-Cassandra Connector Cassandra C* C* C* Spark Executor C* Java (Soon Scala) Driver
  • 34.
    Use Cases: KillrWeather • Get data by weather station! • Get data for a single date and time! • Get data for a range of dates and times! • Compute, store and quickly retrieve daily, monthly and annual aggregations of data! Data Model to support queries • Store raw data per weather station • Store time series in order: most recent to oldest • Compute and store aggregate data in the stream • Set TTLs on historic data
  • 35.
    Cassandra Data Model user_name tweet_id publication_date message pkolaczk 4b514a41-­‐9cf... 2014-­‐08-­‐21 10:00:04 ... pkolaczk 411bbe34-­‐11d... 2014-­‐08-­‐24 16:03:47 ... ewa 73847f13-­‐771... 2014-­‐08-­‐21 10:00:04 ... ewa 41106e9a-­‐5ff... 2014-­‐08-­‐21 10:14:51 ... row partitioning key partition primary key data columns
  • 36.
    Spark Data Model A1 A2 A3 A4 A5 A6 A7 A8 map B1 B2 B3 B4 B5 B6 B7 B8 filter B1 B2 B5 B7 B8 C reduce Resilient Distributed Dataset! ! A Scala collection:! • immutable! • iterable! • serializable! • distributed! • parallel! • lazy evaluation
  • 37.
    Spark Cassandra Example val conf = new SparkConf(loadDefaults = true) .set("spark.cassandra.connection.host", "127.0.0.1") .setMaster("spark://127.0.0.1:7077") ! val sc = new SparkContext(conf) ! ! val table: CassandraRDD[CassandraRow] = sc.cassandraTable("keyspace", "tweets") ! val ssc = new StreamingContext(sc, Seconds(30)) val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafka.kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY) stream.map(_._2).countByValue().saveToCassandra("demo", "wordcount") ssc.start() ssc.awaitTermination() ! Initialization CassandraRDD Stream Initialization Transformations and Action
  • 38.
    Spark Cassandra Example val sc = new SparkContext(..) val ssc = new StreamingContext(sc, Seconds(5)) ! val stream = TwitterUtils.createStream(ssc, auth, filters, StorageLevel.MEMORY_ONLY_SER_2) val transform = (cruft: String) => Pattern.findAllIn(cruft).flatMap(_.stripPrefix("#")) /** Note that Cassandra is doing the sorting for you here. */ stream.flatMap(_.getText.toLowerCase.split("""s+""")) .map(transform) .countByValueAndWindow(Seconds(5), Seconds(5)) .transform((rdd, time) => rdd.map { case (term, count) => (term, count, now(time))}) .saveToCassandra(keyspace, suspicious, SomeColumns(“suspicious", "count", “timestamp"))
  • 39.
    Reading: From C*To Spark row representation keyspace table val table = sc .cassandraTable[CassandraRow]("keyspace", "tweets") .select("user_name", "message") .where("user_name = ?", "ewa") server side column and row selection
  • 40.
    CassandraRDD class CassandraRDD[R](...,keyspace: String, table: String, ...) extends RDD[R](...) { // Splits the table into multiple Spark partitions, // each processed by single Spark Task override def getPartitions: Array[Partition] // Returns names of hosts storing given partition (for data locality!) override def getPreferredLocations(split: Partition): Seq[String] // Returns iterator over Cassandra rows in the given partition override def compute(split: Partition, context: TaskContext): Iterator[R] }
  • 41.
    CassandraStreamingRDD /** RDDrepresenting a Cassandra table for Spark Streaming. * @see [[com.datastax.spark.connector.rdd.CassandraRDD]] */ class CassandraStreamingRDD[R] private[connector] ( sctx: StreamingContext, connector: CassandraConnector, keyspace: String, table: String, columns: ColumnSelector = AllColumns, where: CqlWhereClause = CqlWhereClause.empty, readConf: ReadConf = ReadConf())( implicit ct : ClassTag[R], @transient rrf: RowReaderFactory[R]) extends CassandraRDD[R](sctx.sparkContext, connector, keyspace, table, columns, where, readConf)
  • 42.
    Paging Reads with.cassandraTable • Page size is configurable! • Controls how many CQL rows to fetch at a time, when fetching a single partition! • Connector returns an iterator for rows to Spark! • Spark iterates over this, lazily ! • Handled by the java driver as well as spark
  • 43.
    ResultSet Paging andPre-Fetching Node 1 Client Cassandra Node 1 request a page data process data request a page data request a page Node 2 Client Cassandra Node 2 request a page data process data request a page data request a page
  • 44.
    Co-locate Spark andC* for Best Performance 44 C* C* C* Spark Worker C* Spark Worker Spark Master Spark Worker Running Spark Workers on the same nodes as your C* Cluster will save network hops when reading and writing
  • 45.
    The Key ToSpeed - Data Locality • LocalNodeFirstLoadBalancingPolicy! • Decides what node will become the coordinator for the given mutation/read! • Selects local node first and then nodes in the local DC in random order! • Once that node receives the request it will be distributed ! • Proximal Node Sort Defined by the C* snitch! Analytic •https://github.com/apache/cassandra/blob/trunk/src/java/org/ apache/cassandra/locator/DynamicEndpointSnitch.java#L155- Analytic L190 Search
  • 46.
    Column Type Conversions trait TypeConverter[T] extends Serializable { def targetTypeTag: TypeTag[T] def convertPF: PartialFunction[Any, T] } ! class CassandraRow(...) { def get[T](index: Int)(implicit c: TypeConverter[T]): T = … def get[T](name: String)(implicit c: TypeConverter[T]): T = … … } ! implicit object IntConverter extends TypeConverter[Int] { def targetTypeTag = implicitly[TypeTag[Int]] convertPF = { case def x: Number => x.intValue case x: String => x.toInt } }
  • 47.
    Rows to CaseClass Instances val row: CassandraRow = table.first case class Tweet(userName: String, tweetId: UUID, publicationDate: Date, message: String) extends Serializable ! val tweets = sc.cassandraTable[Tweet]("db", "tweets") Scala Cassandra message message column1 column_1 userName user_name Scala Cassandra Message Message column1 column1 userName userName Recommended convention:
  • 48.
    Convert Rows ToTuples ! val tweets = sc .cassandraTable[(String, String)]("db", "tweets") .select("userName", "message") When returning tuples, always use select to ! specify the column order!
  • 49.
    Handling Unsupported Types case class Tweet( userName: String, tweetId: my.private.implementation.of.UUID, publicationDate: Date, message: String) ! val tweets: CassandraRDD[Tweet] = sc.cassandraTable[Tweet]("db", "tweets") Current behavior: runtime error Future: compile-time error (macros!) Macros could even parse Cassandra schema!
  • 50.
    Connector Code andDocs https://github.com/datastax/spark-cassandra-connector! ! ! ! ! ! ! Add It To Your Project:! ! val connector = "com.datastax.spark" %% "spark-­‐cassandra-­‐connector" % "1.1.0-­‐beta2"
  • 51.
    What’s New InSpark? •Petabyte sort record! • Myth busting: ! • Spark is in-memory. It doesn’t work with big data! • It’s too expensive to buy a cluster with enough memory to fit our data! •Application Integration! • Tableau, Trifacta, Talend, ElasticSearch, Cassandra! •Ongoing development for Spark 1.2! • Python streaming, new MLib API, Yarn scaling…
  • 52.
    Recent / PendingSpark Updates •Usability, Stability & Performance Improvements ! • Improved Lightning Fast Shuffle!! • Monitoring the performance long running or complex jobs! •Public types API to allow users to create SchemaRDD’s! •Support for registering Python, Scala, and Java lambda functions as UDF ! •Dynamic bytecode generation significantly speeding up execution for queries that perform complex expression evaluation
  • 53.
    Recent Spark StreamingUpdates •Apache Flume: a new pull-based mode (simplifying deployment and providing high availability)! •The first of a set of streaming machine learning algorithms is introduced with streaming linear regression.! •Rate limiting has been added for streaming inputs
  • 54.
    Recent MLib Updates • New library of statistical packages which provides exploratory analytic functions *stratified sampling, correlations, chi-squared tests, creating random datasets…)! • Utilities for feature extraction (Word2Vec and TF-IDF) and feature transformation (normalization and standard scaling). ! • Support for nonnegative matrix factorization and SVD via Lanczos. ! • Decision tree algorithm has been added in Python and Java. ! • Tree aggregation primitive! • Performance improves across the board, with improvements of around
  • 55.
    Recent/Pending Connector Updates •Spark SQL (came in 1.1)! •Python API! •Official Scala Driver for Cassandra! •Removing last traces of Thrift!!! •Performance Improvements! • Token-aware data repartitioning! • Token-aware saving! • Wide-row support - no costly groupBy call
  • 56.
    Resources •Spark CassandraConnector ! https://github.com/datastax/spark-cassandra-connector ! •Apache Cassandra http://cassandra.apache.org! •Apache Spark http://spark.apache.org ! •Apache Kafka http://kafka.apache.org ! •Akka http://akka.io Analytic Analytic
  • 57.
  • 58.
    Thanks for listening! SEPTEMBER 10 -­‐ 11, 2014 | SAN FRANCISCO, CALIF. | THE WESTIN ST. FRANCIS HOTEL Cassandra Summit