A Tale of Three Apache Spark APIs: RDDs, DataFrames & Datasets Jules S. Damji Spark Summit EU, Dublin 2017 @2twitme
Spark Community Evangelist & Developer Advocate @ Databricks Developer Advocate @ Hortonworks Software engineering @: Sun Microsystems, Netscape, @Home, VeriSign, Scalix, Centrify, LoudCloud/Opsware, ProQuest
Agenda… Why are we here today? • Resilient Distributed Datasets (RDDs) • Structure in Spark • DataFrames and Datasets • Demo • Q & A
Not the Tale of Three Kings..
Resilient Distributed Dataset (RDD)
What are RDDs?
A Resilient Distributed Dataset (RDD) 1. Distributed Data Abstraction Logical Model Across Distributed Storage S3 or HDFS
2. Resilient& Immutable RDD RDD RDDT RDD à Tà RDD -> RDD T T = Transformation
3. Compile-time Type-safe Integer RDD String or Text RDD Double or Binary RDD
4. Unstructured/Structured Data: Text (logs, tweets, articles, social)
Structured Tabular data..
5. Lazy RDD RDD RDDT RDD à Tà RDD à Tà RDD T T = Transformation A = Action RDD RDD RDDT A
Why Use RDDs? • … OfferControl & flexibility • ... Low-levelAPI • ... Type-safe • ... Encourage how-to
Some code toread Wikipedia val rdd	=	sc.textFile("/mnt/wikipediapagecounts.gz") val	parsedRDD	=	rdd.flatMap	{ line	=>	line.split("""s+""") match	{ case	Array(project,	page,	numRequests,	_)	=>	Some((project,	page,	numRequests)) case _	=>	None } } //	filter	only	English	pages	;	count	pages	and	requests	to	it. parsedRDD.filter	{ case (project,	page,	numRequests)	=>	project	==	"en"	}. map	{ case (_,	page,	numRequests)	=>	(page,	numRequests)	}. reduceByKey(_	+	_). take(100).	foreach	{ case (page,	requests)	=>	println(s"$page:	$requests")	}
When to Use RDDs? • ... Low-levelAPI & control of dataset • ... Dealing with unstrucrureddata (media streamsor texts) • ... Manipulate data with lambda functions than DSL • ... Don’t care schemaor structureof data • ... Sacrifice optimization, performance & inefficiecies
What’s the Problem?
What’s the problem? • ... Express how-to solution, not what-to • ... Not optimized by Spark • ... Slow for non-JVM languages like Python • ... Inadverdent inefficiecies
Inadvertent inefficiencies in RDDs parsedRDD.filter	{	case (project,	page,	numRequests) =>	project	==	"en"	}. map	{	case (_,	page,	numRequests) =>	(page,	numRequests) }. reduceByKey(_	+	_). filter	{	case (page,	_)	=>	!	isSpecialPage(page)	}. take(100). foreach	{	case	(project,	requests)	=>	println	(s"project:	$requests")	}
Structured in Spark DataFrames & Datasets APIs
Background:What is in an RDD? •Dependencies • Partitions (with optional localityinfo) • Compute function: Partition=>Iterator[T] Opaque Computation & Opaque Data
Structured APIs In Spark 23 SQL DataFrames Datasets Syntax Errors Analysis Errors Runtime Compile Time Runtime Compile Time Compile Time Runtime Analysiserrorsarereported beforea distributedjob starts
Unificationof APIs in Spark 2.0
DataFrame API code. //	convert	RDD	->	DF	with	column	names val df	=	parsedRDD.toDF("project", "page",	"numRequests") //filter, groupBy,	sum,	and	then	agg() df.filter($"project" ===	"en"). groupBy($"page"). agg(sum($"numRequests").as("count")). limit(100). show(100) project page numRequests en 23 45 en 24 200
Take DataFrame à SQL Table à Query df. createOrReplaceTempView(("edits") val results	=	spark.sql("""SELECT	page,	sum(numRequests) AS	count	FROM	edits	WHERE	project	=	'en'	GROUP	BY	page LIMIT	100""") results.show(100) project page numRequests en 23 45 en 24 200
Easy to write code... Believe it! from	pyspark.sql.functions	import	avg dataRDD	=	sc.parallelize([("Jim",	20),	("Anne",	31),	("Jim",	30)]) dataDF	=	dataRDD.toDF(["name",	"age"]) #	Using	RDD code	to	compute	aggregate	average (dataRDD.map(lambda	(x,y):	(x,	(y,1)))	.reduceByKey(lambda	x,y:	(x[0]	+y[0],	x[1] +y[1]))	.map(lambda	(x,	(y,	z)):	(x,	y	/	z))) #	Using	DataFrame dataDF.groupBy("name").agg(avg("age")) name age Jim 20 Ann 31 Jim 30
Why structure APIs? data.map { case (dept, age) => dept -> (age, 1) } .reduceByKey { case ((a1, c1), (a2, c2)) => (a1 + a2, c1 + c2)} .map { case (dept, (age, c)) => dept -> age / c } select dept, avg(age) from data group by 1 SQL DataFrame RDD data.groupBy("dept").avg("age")
29 Using Catalyst in Spark SQL Unresolved Logical Plan Logical Plan Optimized Logical Plan RDDs Selected Physical Plan Analysis Logical Optimization Physical Planning CostModel Physical Plans Code Generation Catalog Analysis: analyzing a logical plan to resolve references Logical Optimization: logical plan optimization Physical Planning: Physical planning Code Generation: Compile parts of the query to Java bytecode SQL AST DataFrame Datasets
Physical Plan with Predicate Pushdown and Column Pruning join optimized scan (events) optimized scan (users) LogicalPlan filter join Physical Plan join scan (users)eventsfile userstable 30 scan (events) filter users.join(events,	users("id")	===	events("uid"))	. filter(events("date") >	"2015-01-01") DataFrame Optimization
Type-safe: operate on domain objects with compiled lambda functions 8 Dataset API in Spark 2.x val df = spark.read.json("people.json") / / Convert data to domain objects. case class Person(name: String, age: I nt ) val ds: Dataset[Person] = df.as[Person] val = filterDS = ds . f i l t er( p= >p. age > 3)
DataFrames are Faster than RDDs
Datasets < Memory RDDs
Datasets Faster…
Why When DataFrames & Datasets • Structured Data schema • Code optimization & performance • Space efficiency with Tungsten • High-level APIs and DSL • Strong Type-safety • Ease-of-use & Readability • What-to-do
43 Spark Core (RDD) Catalyst DataFrame/DatasetSQL ML Pipelines Structured Streaming { JSON } JDBC and more… FoundationalSpark2.x Components Spark SQL GraphFrames TensorFrames DL Pipelines DataSource API
Source:michaelmalak Puttingall Together: Conclusion
Demo
http://dbricks.co/2sK35XT
https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html http://scala-phase.org/talks/rdds-dataframes-datasets-2016-06-16/#/ + Resources
Resources • Getting Started Guide with Apache Spark on Databricks • docs.databricks.com • Spark Programming Guide • https://databricks.com/blog/2016/01/04/introducing-apache-spark- datasets.html • https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark- apis-rdds-dataframes-and-datasets.html • https://github.com/bmc/rdds-dataframes-datasets-presentation-2016 • Databricks Engineering Blogs
Thank you! Do you have any questionsformy preparedanswers?
TEAM About Databricks Started Spark project (now Apache Spark) at UC Berkeley in 2009 PRODUCT Unified Analytics Platform MISSION Making Big Data Simple
The UnifiedAnalyticsPlatform
0% 10% 20% 30% 40% 50% 60% 70% 80% 90% 100% Q1 Q2 Q3 Q4 Title Blue Orange Green Use this chart to start
Here are some icons to use - scalable DB Benefits DB Features General / Data Science Icons can berecolored within Powerpoint — see: format picture/ picturecolor / recolor Orange, Green, and Black versions (no recoloration necessary)can befound in go/icons
More icons Industries Security SparkBenefits SparkFeatures
Misc Even more Misc
Slide for Large Question or Section Headers
Thank You Parting words or contact informationgo here.

A Tale of Three Apache Spark APIs: RDDs, DataFrames, and Datasets with Jules Damji

  • 1.
    A Tale ofThree Apache Spark APIs: RDDs, DataFrames & Datasets Jules S. Damji Spark Summit EU, Dublin 2017 @2twitme
  • 2.
    Spark Community Evangelist& Developer Advocate @ Databricks Developer Advocate @ Hortonworks Software engineering @: Sun Microsystems, Netscape, @Home, VeriSign, Scalix, Centrify, LoudCloud/Opsware, ProQuest
  • 3.
    Agenda… Why are wehere today? • Resilient Distributed Datasets (RDDs) • Structure in Spark • DataFrames and Datasets • Demo • Q & A
  • 4.
    Not the Taleof Three Kings..
  • 5.
  • 6.
  • 7.
    A Resilient DistributedDataset (RDD) 1. Distributed Data Abstraction Logical Model Across Distributed Storage S3 or HDFS
  • 8.
    2. Resilient& Immutable RDDRDD RDDT RDD à Tà RDD -> RDD T T = Transformation
  • 9.
    3. Compile-time Type-safe IntegerRDD String or Text RDD Double or Binary RDD
  • 10.
    4. Unstructured/Structured Data:Text (logs, tweets, articles, social)
  • 11.
  • 12.
    5. Lazy RDD RDDRDDT RDD à Tà RDD à Tà RDD T T = Transformation A = Action RDD RDD RDDT A
  • 15.
    Why Use RDDs? •… OfferControl & flexibility • ... Low-levelAPI • ... Type-safe • ... Encourage how-to
  • 16.
    Some code toreadWikipedia val rdd = sc.textFile("/mnt/wikipediapagecounts.gz") val parsedRDD = rdd.flatMap { line => line.split("""s+""") match { case Array(project, page, numRequests, _) => Some((project, page, numRequests)) case _ => None } } // filter only English pages ; count pages and requests to it. parsedRDD.filter { case (project, page, numRequests) => project == "en" }. map { case (_, page, numRequests) => (page, numRequests) }. reduceByKey(_ + _). take(100). foreach { case (page, requests) => println(s"$page: $requests") }
  • 17.
    When to UseRDDs? • ... Low-levelAPI & control of dataset • ... Dealing with unstrucrureddata (media streamsor texts) • ... Manipulate data with lambda functions than DSL • ... Don’t care schemaor structureof data • ... Sacrifice optimization, performance & inefficiecies
  • 18.
  • 19.
    What’s the problem? •... Express how-to solution, not what-to • ... Not optimized by Spark • ... Slow for non-JVM languages like Python • ... Inadverdent inefficiecies
  • 20.
    Inadvertent inefficiencies inRDDs parsedRDD.filter { case (project, page, numRequests) => project == "en" }. map { case (_, page, numRequests) => (page, numRequests) }. reduceByKey(_ + _). filter { case (page, _) => ! isSpecialPage(page) }. take(100). foreach { case (project, requests) => println (s"project: $requests") }
  • 21.
  • 22.
    Background:What is inan RDD? •Dependencies • Partitions (with optional localityinfo) • Compute function: Partition=>Iterator[T] Opaque Computation & Opaque Data
  • 23.
    Structured APIs InSpark 23 SQL DataFrames Datasets Syntax Errors Analysis Errors Runtime Compile Time Runtime Compile Time Compile Time Runtime Analysiserrorsarereported beforea distributedjob starts
  • 24.
  • 25.
    DataFrame API code. // convert RDD -> DF with column names valdf = parsedRDD.toDF("project", "page", "numRequests") //filter, groupBy, sum, and then agg() df.filter($"project" === "en"). groupBy($"page"). agg(sum($"numRequests").as("count")). limit(100). show(100) project page numRequests en 23 45 en 24 200
  • 26.
    Take DataFrame àSQL Table à Query df. createOrReplaceTempView(("edits") val results = spark.sql("""SELECT page, sum(numRequests) AS count FROM edits WHERE project = 'en' GROUP BY page LIMIT 100""") results.show(100) project page numRequests en 23 45 en 24 200
  • 27.
    Easy to writecode... Believe it! from pyspark.sql.functions import avg dataRDD = sc.parallelize([("Jim", 20), ("Anne", 31), ("Jim", 30)]) dataDF = dataRDD.toDF(["name", "age"]) # Using RDD code to compute aggregate average (dataRDD.map(lambda (x,y): (x, (y,1))) .reduceByKey(lambda x,y: (x[0] +y[0], x[1] +y[1])) .map(lambda (x, (y, z)): (x, y / z))) # Using DataFrame dataDF.groupBy("name").agg(avg("age")) name age Jim 20 Ann 31 Jim 30
  • 28.
    Why structure APIs? data.map{ case (dept, age) => dept -> (age, 1) } .reduceByKey { case ((a1, c1), (a2, c2)) => (a1 + a2, c1 + c2)} .map { case (dept, (age, c)) => dept -> age / c } select dept, avg(age) from data group by 1 SQL DataFrame RDD data.groupBy("dept").avg("age")
  • 29.
    29 Using Catalyst inSpark SQL Unresolved Logical Plan Logical Plan Optimized Logical Plan RDDs Selected Physical Plan Analysis Logical Optimization Physical Planning CostModel Physical Plans Code Generation Catalog Analysis: analyzing a logical plan to resolve references Logical Optimization: logical plan optimization Physical Planning: Physical planning Code Generation: Compile parts of the query to Java bytecode SQL AST DataFrame Datasets
  • 30.
    Physical Plan with PredicatePushdown and Column Pruning join optimized scan (events) optimized scan (users) LogicalPlan filter join Physical Plan join scan (users)eventsfile userstable 30 scan (events) filter users.join(events, users("id") === events("uid")) . filter(events("date") > "2015-01-01") DataFrame Optimization
  • 31.
    Type-safe: operate on domainobjects with compiled lambda functions 8 Dataset API in Spark 2.x val df = spark.read.json("people.json") / / Convert data to domain objects. case class Person(name: String, age: I nt ) val ds: Dataset[Person] = df.as[Person] val = filterDS = ds . f i l t er( p= >p. age > 3)
  • 32.
  • 33.
  • 34.
  • 35.
    Why When DataFrames &Datasets • Structured Data schema • Code optimization & performance • Space efficiency with Tungsten • High-level APIs and DSL • Strong Type-safety • Ease-of-use & Readability • What-to-do
  • 36.
    43 Spark Core (RDD) Catalyst DataFrame/DatasetSQL MLPipelines Structured Streaming { JSON } JDBC and more… FoundationalSpark2.x Components Spark SQL GraphFrames TensorFrames DL Pipelines DataSource API
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
    Resources • Getting StartedGuide with Apache Spark on Databricks • docs.databricks.com • Spark Programming Guide • https://databricks.com/blog/2016/01/04/introducing-apache-spark- datasets.html • https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark- apis-rdds-dataframes-and-datasets.html • https://github.com/bmc/rdds-dataframes-datasets-presentation-2016 • Databricks Engineering Blogs
  • 42.
    Thank you! Do youhave any questionsformy preparedanswers?
  • 43.
    TEAM About Databricks Started Sparkproject (now Apache Spark) at UC Berkeley in 2009 PRODUCT Unified Analytics Platform MISSION Making Big Data Simple
  • 44.
  • 45.
    0% 10% 20%30% 40% 50% 60% 70% 80% 90% 100% Q1 Q2 Q3 Q4 Title Blue Orange Green Use this chart to start
  • 46.
    Here are someicons to use - scalable DB Benefits DB Features General / Data Science Icons can berecolored within Powerpoint — see: format picture/ picturecolor / recolor Orange, Green, and Black versions (no recoloration necessary)can befound in go/icons
  • 47.
  • 48.
  • 49.
    Slide for LargeQuestion or Section Headers
  • 50.
    Thank You Parting wordsor contact informationgo here.