Dataframes & Spark SQL
Spark SQL & Dataframes Spark module for Structured Data Processing Spark SQL
Spark SQL & Dataframes Integrated ○ Provides DataFrames ○ Mix SQL queries & Spark programs Spark SQL
Spark SQL & Dataframes Uniform Data Access ○ Source: ■ HDFS, ■ Hive ■ Relational Databases ○ Avro, Parquet, ORC, JSON ○ You can even join data across these sources. ○ Hive Compatibility ○ Standard Connectivity Spark SQL
Spark SQL & Dataframes DataFrames 1 sandeep 2 ted 3 thomas 4 priya 5 kush RDD Unstructured Need code for processing
Spark SQL & Dataframes DataFrames 1 sandeep 2 ted 3 thomas 4 priya 5 kush RDD ID Name 1 sandeep 2 ted 3 thomas 4 priya 5 kush Data Frame Unstructured Structured Need code for processing Can use SQL or R like syntax: df.sql("select Id where name = 'priya'") head(where(df, df$ID > 21))
Spark SQL & Dataframes ● Collection with named columns ● Distributed ● <> Same as database table ● <> A data frame in R/Python Data Frames col1 col2 col3 Partition1 Partition2
Spark SQL & Dataframes Data Frames col1 col2 col3 Partition1 Partition2 Structured data: CSV, JSON Hive RDBMS RDDs Can be constructed from
Spark SQL & Dataframes Data Frames DataFrame API is available in col1 col2 col3 Partition1 Partition2
Spark SQL & Dataframes ● Available in Spark 2.0x onwards. ● Using usual interfaces ○ Spark-shell ○ Spark Application ○ Pyspark ○ Java ○ etc. Getting Started
Spark SQL & Dataframes $ export HADOOP_CONF_DIR=/etc/hadoop/conf/ $ export YARN_CONF_DIR=/etc/hadoop/conf/ Getting Started
Spark SQL & Dataframes $ export HADOOP_CONF_DIR=/etc/hadoop/conf/ $ export YARN_CONF_DIR=/etc/hadoop/conf/ $ ls /usr/ bin games include jdk64 lib64 local share spark1.6 spark2.0.2 tmp etc hdp java lib libexec sbin spark1.2.1 spark2.0.1 src Getting Started
Spark SQL & Dataframes $ export HADOOP_CONF_DIR=/etc/hadoop/conf/ $ export YARN_CONF_DIR=/etc/hadoop/conf/ $ ls /usr/ bin games include jdk64 lib64 local share spark1.6 spark2.0.2 tmp etc hdp java lib libexec sbin spark1.2.1 spark2.0.1 src $ /usr/spark2.0.2/bin/spark-shell Getting Started
Spark SQL & Dataframes Getting Started Spark context Web UI available at http://172.31.60.179:4040 Spark context available as 'sc' (master = local[*], app id = local-1498489557917). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/_,_/_/ /_/_ version 2.0.2 /_/ Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_91) Type in expressions to have them evaluated. Type :help for more information. scala>
Spark SQL & Dataframes Getting Started Spark context Web UI available at http://172.31.60.179:4040 Spark context available as 'sc' (master = local[*], app id = local-1498489557917). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/_,_/_/ /_/_ version 2.0.2 /_/ Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_91) Type in expressions to have them evaluated. Type :help for more information. scala>
Spark SQL & Dataframes import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() Starting Point: SparkSession
Spark SQL & Dataframes import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() //For implicit conversions, e.g. RDDs to DataFrames import spark.implicits._ Starting Point: SparkSession
Spark SQL & Dataframes Creating DataFrames from JSON In web console or ssh: $ hadoop fs -cat /data/spark/people.json {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
Spark SQL & Dataframes var df = spark.read.json("/data/spark/people.json") // Displays the content of the DataFrame to stdout df.show() Creating DataFrames from JSON scala> df.show() +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
Spark SQL & Dataframes Creating DataFrames from JSON scala> df.show() +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ Original JSON: {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} var df = spark.read.json("/data/spark/people.json") # Displays the content of the DataFrame to stdout df.show()
Spark SQL & Dataframes # Print the schema in a tree format df.printSchema() root |-- age: long (nullable = true) |-- name: string (nullable = true) DataFrame Operations {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
Spark SQL & Dataframes # Select only the "name" column df.select("name").show() +-------+ | name| +-------+ |Michael| | Andy| | Justin| +-------+ DataFrame Operations {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
Spark SQL & Dataframes # Increment the age by 1 df.select($"name",$"age" + 1).show() +-------+---------+ | name|(age + 1)| +-------+---------+ |Michael| null| | Andy| 31| | Justin| 20| +-------+---------+ DataFrame Operations {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
Spark SQL & Dataframes # Select people older than 21 df.filter($"age"> 21).show() +---+----+ |age|name| +---+----+ | 30|Andy| +---+----+ DataFrame Operations {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
Spark SQL & Dataframes # Count people by age df.groupBy("age").count().show() +----+-----+ | age|count| +----+-----+ | 19| 1| |null| 1| | 30| 1| +----+-----+ #SQL Equivalent Select age, count(*) from df group by age DataFrame Operations {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
Spark SQL & Dataframes Running SQL Queries Programmatically
Spark SQL & Dataframes // Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") Running SQL Queries Programmatically
Spark SQL & Dataframes // Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") Running SQL Queries Programmatically
Spark SQL & Dataframes Running SQL Queries Programmatically // Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
Spark SQL & Dataframes Datasets ● Similar to RDDs ● instead Java serialization or Kryo ● use a specialized Encoder ● use Encoder to serialize Encoders ● Are dynamically generated code ● Perform operations with deserializing Datasets
Spark SQL & Dataframes // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) Creating Datasets
Spark SQL & Dataframes case class Person(name: String, age: Long) // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ Creating Datasets
Spark SQL & Dataframes val path = "/data/spark/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ Creating Datasets
Spark SQL & Dataframes Interoperating with RDDs How to convert an RDD into dataframe? RDD ID Name 1 sandeep 2 ted 3 thomas 4 priya 5 kush Data Frame 1 sandeep 2 ted 3 thomas 4 priya 5 kush
Spark SQL & Dataframes Interoperating with RDDs Two ways to convert RDDs to DF: a. Inferring the Schema Using Reflection b. RDD ID Name 1 sandeep 2 ted 3 thomas 4 priya 5 kush Data Frame 1 sandeep 2 ted 3 thomas 4 priya 5 kush
Spark SQL & Dataframes Interoperating with RDDs Two ways to convert RDDs to DF: a. Inferring the Schema Using Reflection b. Programmatically Specifying the Schema RDD ID Name 1 sandeep 2 ted 3 thomas 4 priya 5 kush Data Frame 1 sandeep 2 ted 3 thomas 4 priya 5 kush
Spark SQL & Dataframes Inferring the Schema Using Reflection ● Spark SQL can convert an RDDs with case classes to a DataFrame ● The names of case class arguments are read using reflection and become columns ● Case classes can be nested or contain complex types ● Let us try to convert people.txt into dataframe people.txt: Michael, 29 Andy, 30 Justin, 19
Spark SQL & Dataframes Inferring the Schema Using Reflection https://github.com/cloudxlab/bigdata/blob/master/spark/examples/dataframes/rdd_to_df.scala
Spark SQL & Dataframes scala> import spark.implicits._ import spark.implicits._ Inferring the Schema Using Reflection
Spark SQL & Dataframes scala> import spark.implicits._ import spark.implicits._ scala> case class Person(name: String, age: Long) defined class Person Inferring the Schema Using Reflection
Spark SQL & Dataframes scala> import spark.implicits._ import spark.implicits._ scala> case class Person(name: String, age: Long) defined class Person scala> val textRDD = sc.textFile("/data/spark/people.txt") textRDD: org.apache.spark.rdd.RDD[String] = /data/spark/people.txt MapPartitionsRDD[3] at textFile at <console>:30 Inferring the Schema Using Reflection
Spark SQL & Dataframes scala> import spark.implicits._ import spark.implicits._ scala> case class Person(name: String, age: Long) defined class Person scala> val textRDD = sc.textFile("/data/spark/people.txt") textRDD: org.apache.spark.rdd.RDD[String] = /data/spark/people.txt MapPartitionsRDD[3] at textFile at <console>:30 scala> val arrayRDD = textRDD.map(_.split(",")) arrayRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[4] at map at <console>:32 Inferring the Schema Using Reflection
Spark SQL & Dataframes scala> import spark.implicits._ import spark.implicits._ scala> case class Person(name: String, age: Long) defined class Person scala> val textRDD = sc.textFile("/data/spark/people.txt") textRDD: org.apache.spark.rdd.RDD[String] = /data/spark/people.txt MapPartitionsRDD[3] at textFile at <console>:30 scala> val arrayRDD = textRDD.map(_.split(",")) arrayRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[4] at map at <console>:32 scala> val personRDD = arrayRDD.map(attributes => Person(attributes(0), attributes(1).trim.toInt)) personRDD: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[5] at map at <console>:36 Inferring the Schema Using Reflection
Spark SQL & Dataframes scala> val peopleDF = personRDD.toDF() peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint] Inferring the Schema Using Reflection
Spark SQL & Dataframes scala> val peopleDF = personRDD.toDF() peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint] scala> peopleDF.show() +-------+---+ | name|age| +-------+---+ |Michael| 29| | Andy| 30| | Justin| 19| +-------+---+ Inferring the Schema Using Reflection
Spark SQL & Dataframes // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by Spark val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") Inferring the Schema Using Reflection
Spark SQL & Dataframes // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by Spark val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // The columns of a row in the result can be accessed by field index teenagersDF.map(teenager => "Name: " + teenager(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ Inferring the Schema Using Reflection
Spark SQL & Dataframes // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by Spark val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // The columns of a row in the result can be accessed by field index teenagersDF.map(teenager => "Name: " + teenager(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // or by field name teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ Inferring the Schema Using Reflection
Spark SQL & Dataframes Inferring the Schema Using Reflection // No pre-defined encoders for Dataset[Map[K,V]], define explicitly implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] // Primitive types and case classes can be also defined as // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder() // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect() // Array(Map("name" -> "Justin", "age" -> 19))
Spark SQL & Dataframes ● When case classes can't be defined during time of coding a. E.g. The fields expected in case classes are passed as arguments ● We need to programmatically create the dataframe: Programmatically Specifying the Schema
Spark SQL & Dataframes Programmatically Specifying the Schema ● When case classes can't be defined during time of coding a. E.g. The fields expected in case classes are passed as arguments ● We need to programmatically create the dataframe: 1. Create RDD of Row objects 2. Create schema represented by StructType 3. Apply schema with createDataFrame
Spark SQL & Dataframes Programmatically Specifying the Schema people.txt: Michael, 29 Andy, 30 Justin, 19 val schemaString = "name age"
Spark SQL & Dataframes import org.apache.spark.sql.types._ import org.apache.spark.sql._ Programmatically Specifying the Schema
Spark SQL & Dataframes Programmatically Specifying the Schema import org.apache.spark.sql.types._ import org.apache.spark.sql._ // The schema is encoded in a string // User provided variable val schemaString = "name age" val filename = "/data/spark/people.txt" val fieldsArray = schemaString.split(" ") val fields = fieldsArray.map( f => StructField(f, StringType, nullable = true) ) val schema = StructType(fields)
Spark SQL & Dataframes Programmatically Specifying the Schema import org.apache.spark.sql.types._ import org.apache.spark.sql._ // The schema is encoded in a string // User provided variable val schemaString = "name age" val filename = "/data/spark/people.txt" val fieldsArray = schemaString.split(" ") val fields = fieldsArray.map( f => StructField(f, StringType, nullable = true) ) val schema = StructType(fields) val peopleRDD = spark.sparkContext.textFile(filename) val rowRDD = peopleRDD.map(_.split(",")).map( attributes => Row.fromSeq(attributes) ) val peopleDF = spark.createDataFrame(rowRDD, schema)
Spark SQL & Dataframes Programmatically Specifying the Schema import org.apache.spark.sql.types._ import org.apache.spark.sql._ // The schema is encoded in a string // User provided variable val schemaString = "name age" val filename = "/data/spark/people.txt" val fieldsArray = schemaString.split(" ") val fields = fieldsArray.map( f => StructField(f, StringType, nullable = true) ) val schema = StructType(fields) val peopleRDD = spark.sparkContext.textFile(filename) val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row.fromSeq(attributes)) val peopleDF = spark.createDataFrame(rowRDD, schema) peopleDF.show() +-------+---+ | name|age| +-------+---+ |Michael| 29| | Andy| 30| | Justin| 19| +-------+---+
Thank you! Spark SQL & Dataframes

Apache Spark - Dataframes & Spark SQL - Part 1 | Big Data Hadoop Spark Tutorial | CloudxLab

  • 1.
  • 2.
    Spark SQL &Dataframes Spark module for Structured Data Processing Spark SQL
  • 3.
    Spark SQL &Dataframes Integrated ○ Provides DataFrames ○ Mix SQL queries & Spark programs Spark SQL
  • 4.
    Spark SQL &Dataframes Uniform Data Access ○ Source: ■ HDFS, ■ Hive ■ Relational Databases ○ Avro, Parquet, ORC, JSON ○ You can even join data across these sources. ○ Hive Compatibility ○ Standard Connectivity Spark SQL
  • 5.
    Spark SQL &Dataframes DataFrames 1 sandeep 2 ted 3 thomas 4 priya 5 kush RDD Unstructured Need code for processing
  • 6.
    Spark SQL &Dataframes DataFrames 1 sandeep 2 ted 3 thomas 4 priya 5 kush RDD ID Name 1 sandeep 2 ted 3 thomas 4 priya 5 kush Data Frame Unstructured Structured Need code for processing Can use SQL or R like syntax: df.sql("select Id where name = 'priya'") head(where(df, df$ID > 21))
  • 7.
    Spark SQL &Dataframes ● Collection with named columns ● Distributed ● <> Same as database table ● <> A data frame in R/Python Data Frames col1 col2 col3 Partition1 Partition2
  • 8.
    Spark SQL &Dataframes Data Frames col1 col2 col3 Partition1 Partition2 Structured data: CSV, JSON Hive RDBMS RDDs Can be constructed from
  • 9.
    Spark SQL &Dataframes Data Frames DataFrame API is available in col1 col2 col3 Partition1 Partition2
  • 10.
    Spark SQL &Dataframes ● Available in Spark 2.0x onwards. ● Using usual interfaces ○ Spark-shell ○ Spark Application ○ Pyspark ○ Java ○ etc. Getting Started
  • 11.
    Spark SQL &Dataframes $ export HADOOP_CONF_DIR=/etc/hadoop/conf/ $ export YARN_CONF_DIR=/etc/hadoop/conf/ Getting Started
  • 12.
    Spark SQL &Dataframes $ export HADOOP_CONF_DIR=/etc/hadoop/conf/ $ export YARN_CONF_DIR=/etc/hadoop/conf/ $ ls /usr/ bin games include jdk64 lib64 local share spark1.6 spark2.0.2 tmp etc hdp java lib libexec sbin spark1.2.1 spark2.0.1 src Getting Started
  • 13.
    Spark SQL &Dataframes $ export HADOOP_CONF_DIR=/etc/hadoop/conf/ $ export YARN_CONF_DIR=/etc/hadoop/conf/ $ ls /usr/ bin games include jdk64 lib64 local share spark1.6 spark2.0.2 tmp etc hdp java lib libexec sbin spark1.2.1 spark2.0.1 src $ /usr/spark2.0.2/bin/spark-shell Getting Started
  • 14.
    Spark SQL &Dataframes Getting Started Spark context Web UI available at http://172.31.60.179:4040 Spark context available as 'sc' (master = local[*], app id = local-1498489557917). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/_,_/_/ /_/_ version 2.0.2 /_/ Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_91) Type in expressions to have them evaluated. Type :help for more information. scala>
  • 15.
    Spark SQL &Dataframes Getting Started Spark context Web UI available at http://172.31.60.179:4040 Spark context available as 'sc' (master = local[*], app id = local-1498489557917). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _ / _ / _ `/ __/ '_/ /___/ .__/_,_/_/ /_/_ version 2.0.2 /_/ Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_91) Type in expressions to have them evaluated. Type :help for more information. scala>
  • 16.
    Spark SQL &Dataframes import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() Starting Point: SparkSession
  • 17.
    Spark SQL &Dataframes import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() //For implicit conversions, e.g. RDDs to DataFrames import spark.implicits._ Starting Point: SparkSession
  • 18.
    Spark SQL &Dataframes Creating DataFrames from JSON In web console or ssh: $ hadoop fs -cat /data/spark/people.json {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
  • 19.
    Spark SQL &Dataframes var df = spark.read.json("/data/spark/people.json") // Displays the content of the DataFrame to stdout df.show() Creating DataFrames from JSON scala> df.show() +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
  • 20.
    Spark SQL &Dataframes Creating DataFrames from JSON scala> df.show() +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+ Original JSON: {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} var df = spark.read.json("/data/spark/people.json") # Displays the content of the DataFrame to stdout df.show()
  • 21.
    Spark SQL &Dataframes # Print the schema in a tree format df.printSchema() root |-- age: long (nullable = true) |-- name: string (nullable = true) DataFrame Operations {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
  • 22.
    Spark SQL &Dataframes # Select only the "name" column df.select("name").show() +-------+ | name| +-------+ |Michael| | Andy| | Justin| +-------+ DataFrame Operations {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
  • 23.
    Spark SQL &Dataframes # Increment the age by 1 df.select($"name",$"age" + 1).show() +-------+---------+ | name|(age + 1)| +-------+---------+ |Michael| null| | Andy| 31| | Justin| 20| +-------+---------+ DataFrame Operations {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
  • 24.
    Spark SQL &Dataframes # Select people older than 21 df.filter($"age"> 21).show() +---+----+ |age|name| +---+----+ | 30|Andy| +---+----+ DataFrame Operations {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
  • 25.
    Spark SQL &Dataframes # Count people by age df.groupBy("age").count().show() +----+-----+ | age|count| +----+-----+ | 19| 1| |null| 1| | 30| 1| +----+-----+ #SQL Equivalent Select age, count(*) from df group by age DataFrame Operations {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19}
  • 26.
    Spark SQL &Dataframes Running SQL Queries Programmatically
  • 27.
    Spark SQL &Dataframes // Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") Running SQL Queries Programmatically
  • 28.
    Spark SQL &Dataframes // Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") Running SQL Queries Programmatically
  • 29.
    Spark SQL &Dataframes Running SQL Queries Programmatically // Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() +----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
  • 30.
    Spark SQL &Dataframes Datasets ● Similar to RDDs ● instead Java serialization or Kryo ● use a specialized Encoder ● use Encoder to serialize Encoders ● Are dynamically generated code ● Perform operations with deserializing Datasets
  • 31.
    Spark SQL &Dataframes // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) Creating Datasets
  • 32.
    Spark SQL &Dataframes case class Person(name: String, age: Long) // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ Creating Datasets
  • 33.
    Spark SQL &Dataframes val path = "/data/spark/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ Creating Datasets
  • 34.
    Spark SQL &Dataframes Interoperating with RDDs How to convert an RDD into dataframe? RDD ID Name 1 sandeep 2 ted 3 thomas 4 priya 5 kush Data Frame 1 sandeep 2 ted 3 thomas 4 priya 5 kush
  • 35.
    Spark SQL &Dataframes Interoperating with RDDs Two ways to convert RDDs to DF: a. Inferring the Schema Using Reflection b. RDD ID Name 1 sandeep 2 ted 3 thomas 4 priya 5 kush Data Frame 1 sandeep 2 ted 3 thomas 4 priya 5 kush
  • 36.
    Spark SQL &Dataframes Interoperating with RDDs Two ways to convert RDDs to DF: a. Inferring the Schema Using Reflection b. Programmatically Specifying the Schema RDD ID Name 1 sandeep 2 ted 3 thomas 4 priya 5 kush Data Frame 1 sandeep 2 ted 3 thomas 4 priya 5 kush
  • 37.
    Spark SQL &Dataframes Inferring the Schema Using Reflection ● Spark SQL can convert an RDDs with case classes to a DataFrame ● The names of case class arguments are read using reflection and become columns ● Case classes can be nested or contain complex types ● Let us try to convert people.txt into dataframe people.txt: Michael, 29 Andy, 30 Justin, 19
  • 38.
    Spark SQL &Dataframes Inferring the Schema Using Reflection https://github.com/cloudxlab/bigdata/blob/master/spark/examples/dataframes/rdd_to_df.scala
  • 39.
    Spark SQL &Dataframes scala> import spark.implicits._ import spark.implicits._ Inferring the Schema Using Reflection
  • 40.
    Spark SQL &Dataframes scala> import spark.implicits._ import spark.implicits._ scala> case class Person(name: String, age: Long) defined class Person Inferring the Schema Using Reflection
  • 41.
    Spark SQL &Dataframes scala> import spark.implicits._ import spark.implicits._ scala> case class Person(name: String, age: Long) defined class Person scala> val textRDD = sc.textFile("/data/spark/people.txt") textRDD: org.apache.spark.rdd.RDD[String] = /data/spark/people.txt MapPartitionsRDD[3] at textFile at <console>:30 Inferring the Schema Using Reflection
  • 42.
    Spark SQL &Dataframes scala> import spark.implicits._ import spark.implicits._ scala> case class Person(name: String, age: Long) defined class Person scala> val textRDD = sc.textFile("/data/spark/people.txt") textRDD: org.apache.spark.rdd.RDD[String] = /data/spark/people.txt MapPartitionsRDD[3] at textFile at <console>:30 scala> val arrayRDD = textRDD.map(_.split(",")) arrayRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[4] at map at <console>:32 Inferring the Schema Using Reflection
  • 43.
    Spark SQL &Dataframes scala> import spark.implicits._ import spark.implicits._ scala> case class Person(name: String, age: Long) defined class Person scala> val textRDD = sc.textFile("/data/spark/people.txt") textRDD: org.apache.spark.rdd.RDD[String] = /data/spark/people.txt MapPartitionsRDD[3] at textFile at <console>:30 scala> val arrayRDD = textRDD.map(_.split(",")) arrayRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[4] at map at <console>:32 scala> val personRDD = arrayRDD.map(attributes => Person(attributes(0), attributes(1).trim.toInt)) personRDD: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[5] at map at <console>:36 Inferring the Schema Using Reflection
  • 44.
    Spark SQL &Dataframes scala> val peopleDF = personRDD.toDF() peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint] Inferring the Schema Using Reflection
  • 45.
    Spark SQL &Dataframes scala> val peopleDF = personRDD.toDF() peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint] scala> peopleDF.show() +-------+---+ | name|age| +-------+---+ |Michael| 29| | Andy| 30| | Justin| 19| +-------+---+ Inferring the Schema Using Reflection
  • 46.
    Spark SQL &Dataframes // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by Spark val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") Inferring the Schema Using Reflection
  • 47.
    Spark SQL &Dataframes // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by Spark val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // The columns of a row in the result can be accessed by field index teenagersDF.map(teenager => "Name: " + teenager(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ Inferring the Schema Using Reflection
  • 48.
    Spark SQL &Dataframes // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by Spark val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // The columns of a row in the result can be accessed by field index teenagersDF.map(teenager => "Name: " + teenager(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // or by field name teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ Inferring the Schema Using Reflection
  • 49.
    Spark SQL &Dataframes Inferring the Schema Using Reflection // No pre-defined encoders for Dataset[Map[K,V]], define explicitly implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] // Primitive types and case classes can be also defined as // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder() // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect() // Array(Map("name" -> "Justin", "age" -> 19))
  • 50.
    Spark SQL &Dataframes ● When case classes can't be defined during time of coding a. E.g. The fields expected in case classes are passed as arguments ● We need to programmatically create the dataframe: Programmatically Specifying the Schema
  • 51.
    Spark SQL &Dataframes Programmatically Specifying the Schema ● When case classes can't be defined during time of coding a. E.g. The fields expected in case classes are passed as arguments ● We need to programmatically create the dataframe: 1. Create RDD of Row objects 2. Create schema represented by StructType 3. Apply schema with createDataFrame
  • 52.
    Spark SQL &Dataframes Programmatically Specifying the Schema people.txt: Michael, 29 Andy, 30 Justin, 19 val schemaString = "name age"
  • 53.
    Spark SQL &Dataframes import org.apache.spark.sql.types._ import org.apache.spark.sql._ Programmatically Specifying the Schema
  • 54.
    Spark SQL &Dataframes Programmatically Specifying the Schema import org.apache.spark.sql.types._ import org.apache.spark.sql._ // The schema is encoded in a string // User provided variable val schemaString = "name age" val filename = "/data/spark/people.txt" val fieldsArray = schemaString.split(" ") val fields = fieldsArray.map( f => StructField(f, StringType, nullable = true) ) val schema = StructType(fields)
  • 55.
    Spark SQL &Dataframes Programmatically Specifying the Schema import org.apache.spark.sql.types._ import org.apache.spark.sql._ // The schema is encoded in a string // User provided variable val schemaString = "name age" val filename = "/data/spark/people.txt" val fieldsArray = schemaString.split(" ") val fields = fieldsArray.map( f => StructField(f, StringType, nullable = true) ) val schema = StructType(fields) val peopleRDD = spark.sparkContext.textFile(filename) val rowRDD = peopleRDD.map(_.split(",")).map( attributes => Row.fromSeq(attributes) ) val peopleDF = spark.createDataFrame(rowRDD, schema)
  • 56.
    Spark SQL &Dataframes Programmatically Specifying the Schema import org.apache.spark.sql.types._ import org.apache.spark.sql._ // The schema is encoded in a string // User provided variable val schemaString = "name age" val filename = "/data/spark/people.txt" val fieldsArray = schemaString.split(" ") val fields = fieldsArray.map( f => StructField(f, StringType, nullable = true) ) val schema = StructType(fields) val peopleRDD = spark.sparkContext.textFile(filename) val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row.fromSeq(attributes)) val peopleDF = spark.createDataFrame(rowRDD, schema) peopleDF.show() +-------+---+ | name|age| +-------+---+ |Michael| 29| | Andy| 30| | Justin| 19| +-------+---+
  • 57.