|
| 1 | +# Databricks notebook source |
| 2 | + |
| 3 | +# INCLUDE_HEADER_TRUE |
| 4 | +# INCLUDE_FOOTER_TRUE |
| 5 | + |
| 6 | +# COMMAND ---------- |
| 7 | + |
| 8 | +# MAGIC %md |
| 9 | +# MAGIC # Spark SQL |
| 10 | +# MAGIC |
| 11 | +# MAGIC Demonstrate fundamental concepts in Spark SQL using the DataFrame API. |
| 12 | +# MAGIC |
| 13 | +# MAGIC ##### Objectives |
| 14 | +# MAGIC 1. Run a SQL query |
| 15 | +# MAGIC 1. Create a DataFrame from a table |
| 16 | +# MAGIC 1. Write the same query using DataFrame transformations |
| 17 | +# MAGIC 1. Trigger computation with DataFrame actions |
| 18 | +# MAGIC 1. Convert between DataFrames and SQL |
| 19 | +# MAGIC |
| 20 | +# MAGIC ##### Methods |
| 21 | +# MAGIC - <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#spark-session-apis" target="_blank">SparkSession</a>: `sql`, `table` |
| 22 | +# MAGIC - <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html" target="_blank">DataFrame</a>: |
| 23 | +# MAGIC - Transformations: `select`, `where`, `orderBy` |
| 24 | +# MAGIC - Actions: `show`, `count`, `take` |
| 25 | +# MAGIC - Other methods: `printSchema`, `schema`, `createOrReplaceTempView` |
| 26 | + |
| 27 | +# COMMAND ---------- |
| 28 | + |
| 29 | +# MAGIC %run ./Includes/Classroom-Setup-SQL |
| 30 | + |
| 31 | +# COMMAND ---------- |
| 32 | + |
| 33 | +# MAGIC %md |
| 34 | +# MAGIC ## Multiple Interfaces |
| 35 | +# MAGIC Spark SQL is a module for structured data processing with multiple interfaces. |
| 36 | +# MAGIC |
| 37 | +# MAGIC We can interact with Spark SQL in two ways: |
| 38 | +# MAGIC 1. Executing SQL queries |
| 39 | +# MAGIC 1. Working with the DataFrame API. |
| 40 | + |
| 41 | +# COMMAND ---------- |
| 42 | + |
| 43 | +# MAGIC %md |
| 44 | +# MAGIC **Method 1: Executing SQL queries** |
| 45 | +# MAGIC |
| 46 | +# MAGIC This is how we interacted with Spark SQL in the previous lesson. |
| 47 | + |
| 48 | +# COMMAND ---------- |
| 49 | + |
| 50 | +# MAGIC %md |
| 51 | +# MAGIC **Method 2: Working with the DataFrame API** |
| 52 | +# MAGIC |
| 53 | +# MAGIC We can also express Spark SQL queries using the DataFrame API. |
| 54 | +# MAGIC The following cell returns a DataFrame containing the same results as those retrieved above. |
| 55 | + |
| 56 | +# COMMAND ---------- |
| 57 | + |
| 58 | +display(spark.table("products") |
| 59 | + .select("name", "price") |
| 60 | + .where("price < 200") |
| 61 | + .orderBy("price")) |
| 62 | + |
| 63 | +# COMMAND ---------- |
| 64 | + |
| 65 | +# MAGIC %md |
| 66 | +# MAGIC We'll go over the syntax for the DataFrame API later in the lesson, but you can see this builder design pattern allows us to chain a sequence of operations very similar to those we find in SQL. |
| 67 | + |
| 68 | +# COMMAND ---------- |
| 69 | + |
| 70 | +# MAGIC %md |
| 71 | +# MAGIC ## Query Execution |
| 72 | +# MAGIC We can express the same query using any interface. The Spark SQL engine generates the same query plan used to optimize and execute on our Spark cluster. |
| 73 | +# MAGIC |
| 74 | +# MAGIC  |
| 75 | +# MAGIC |
| 76 | +# MAGIC <img src="https://files.training.databricks.com/images/icon_note_32.png" alt="Note"> Resilient Distributed Datasets (RDDs) are the low-level representation of datasets processed by a Spark cluster. In early versions of Spark, you had to write <a href="https://spark.apache.org/docs/latest/rdd-programming-guide.html" target="_blank">code manipulating RDDs directly</a>. In modern versions of Spark you should instead use the higher-level DataFrame APIs, which Spark automatically compiles into low-level RDD operations. |
| 77 | + |
| 78 | +# COMMAND ---------- |
| 79 | + |
| 80 | +# MAGIC %md |
| 81 | +# MAGIC ## Spark API Documentation |
| 82 | +# MAGIC |
| 83 | +# MAGIC To learn how we work with DataFrames in Spark SQL, let's first look at the Spark API documentation. |
| 84 | +# MAGIC The main Spark [documentation](https://spark.apache.org/docs/latest/) page includes links to API docs and helpful guides for each version of Spark. |
| 85 | +# MAGIC |
| 86 | +# MAGIC The [Scala API](https://spark.apache.org/docs/latest/api/scala/org/apache/spark/index.html) and [Python API](https://spark.apache.org/docs/latest/api/python/index.html) are most commonly used, and it's often helpful to reference the documentation for both languages. |
| 87 | +# MAGIC Scala docs tend to be more comprehensive, and Python docs tend to have more code examples. |
| 88 | +# MAGIC |
| 89 | +# MAGIC #### Navigating Docs for the Spark SQL Module |
| 90 | +# MAGIC Find the Spark SQL module by navigating to `org.apache.spark.sql` in the Scala API or `pyspark.sql` in the Python API. |
| 91 | +# MAGIC The first class we'll explore in this module is the `SparkSession` class. You can find this by entering "SparkSession" in the search bar. |
| 92 | + |
| 93 | +# COMMAND ---------- |
| 94 | + |
| 95 | +# MAGIC %md |
| 96 | +# MAGIC ## SparkSession |
| 97 | +# MAGIC The `SparkSession` class is the single entry point to all functionality in Spark using the DataFrame API. |
| 98 | +# MAGIC |
| 99 | +# MAGIC In Databricks notebooks, the SparkSession is created for you, stored in a variable called `spark`. |
| 100 | + |
| 101 | +# COMMAND ---------- |
| 102 | + |
| 103 | +spark |
| 104 | + |
| 105 | +# COMMAND ---------- |
| 106 | + |
| 107 | +# MAGIC %md |
| 108 | +# MAGIC The example from the beginning of this lesson used the SparkSession method `table` to create a DataFrame from the `products` table. Let's save this in the variable `productsDF`. |
| 109 | + |
| 110 | +# COMMAND ---------- |
| 111 | + |
| 112 | +productsDF = spark.table("products") |
| 113 | + |
| 114 | +# COMMAND ---------- |
| 115 | + |
| 116 | +# MAGIC %md |
| 117 | +# MAGIC Below are several additional methods we can use to create DataFrames. All of these can be found in the <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.SparkSession.html" target="_blank">documentation</a> for `SparkSession`. |
| 118 | +# MAGIC |
| 119 | +# MAGIC #### `SparkSession` Methods |
| 120 | +# MAGIC | Method | Description | |
| 121 | +# MAGIC | --- | --- | |
| 122 | +# MAGIC | sql | Returns a DataFrame representing the result of the given query | |
| 123 | +# MAGIC | table | Returns the specified table as a DataFrame | |
| 124 | +# MAGIC | read | Returns a DataFrameReader that can be used to read data in as a DataFrame | |
| 125 | +# MAGIC | range | Create a DataFrame with a column containing elements in a range from start to end (exclusive) with step value and number of partitions | |
| 126 | +# MAGIC | createDataFrame | Creates a DataFrame from a list of tuples, primarily used for testing | |
| 127 | + |
| 128 | +# COMMAND ---------- |
| 129 | + |
| 130 | +# MAGIC %md |
| 131 | +# MAGIC Let's use a SparkSession method to run SQL. |
| 132 | + |
| 133 | +# COMMAND ---------- |
| 134 | + |
| 135 | +resultDF = spark.sql(""" |
| 136 | +SELECT name, price |
| 137 | +FROM products |
| 138 | +WHERE price < 200 |
| 139 | +ORDER BY price |
| 140 | +""") |
| 141 | + |
| 142 | +display(resultDF) |
| 143 | + |
| 144 | +# COMMAND ---------- |
| 145 | + |
| 146 | +# MAGIC %md |
| 147 | +# MAGIC ## DataFrames |
| 148 | +# MAGIC Recall that expressing our query using methods in the DataFrame API returns results in a DataFrame. Let's store this in the variable `budgetDF`. |
| 149 | +# MAGIC |
| 150 | +# MAGIC A **DataFrame** is a distributed collection of data grouped into named columns. |
| 151 | + |
| 152 | +# COMMAND ---------- |
| 153 | + |
| 154 | +budgetDF = (spark.table("products") |
| 155 | + .select("name", "price") |
| 156 | + .where("price < 200") |
| 157 | + .orderBy("price")) |
| 158 | + |
| 159 | +# COMMAND ---------- |
| 160 | + |
| 161 | +# MAGIC %md |
| 162 | +# MAGIC We can use `display()` to output the results of a dataframe. |
| 163 | + |
| 164 | +# COMMAND ---------- |
| 165 | + |
| 166 | +display(budgetDF) |
| 167 | + |
| 168 | +# COMMAND ---------- |
| 169 | + |
| 170 | +# MAGIC %md |
| 171 | +# MAGIC The **schema** defines the column names and types of a dataframe. |
| 172 | +# MAGIC |
| 173 | +# MAGIC Access a dataframe's schema using the `schema` attribute. |
| 174 | + |
| 175 | +# COMMAND ---------- |
| 176 | + |
| 177 | +budgetDF.schema |
| 178 | + |
| 179 | +# COMMAND ---------- |
| 180 | + |
| 181 | +# MAGIC %md |
| 182 | +# MAGIC View a nicer output for this schema using the `printSchema()` method. |
| 183 | + |
| 184 | +# COMMAND ---------- |
| 185 | + |
| 186 | +budgetDF.printSchema() |
| 187 | + |
| 188 | +# COMMAND ---------- |
| 189 | + |
| 190 | +# MAGIC %md |
| 191 | +# MAGIC ## Transformations |
| 192 | +# MAGIC When we created `budgetDF`, we used a series of DataFrame transformation methods e.g. `select`, `where`, `orderBy`. |
| 193 | +# MAGIC |
| 194 | +# MAGIC ``` |
| 195 | +# MAGIC productsDF |
| 196 | +# MAGIC .select("name", "price") |
| 197 | +# MAGIC .where("price < 200") |
| 198 | +# MAGIC .orderBy("price") |
| 199 | +# MAGIC ``` |
| 200 | +# MAGIC Transformations operate on and return DataFrames, allowing us to chain transformation methods together to construct new DataFrames. |
| 201 | +# MAGIC However, these operations can't execute on their own, as transformation methods are **lazily evaluated**. |
| 202 | +# MAGIC |
| 203 | +# MAGIC Running the following cell does not trigger any computation. |
| 204 | + |
| 205 | +# COMMAND ---------- |
| 206 | + |
| 207 | +(productsDF |
| 208 | + .select("name", "price") |
| 209 | + .where("price < 200") |
| 210 | + .orderBy("price")) |
| 211 | + |
| 212 | +# COMMAND ---------- |
| 213 | + |
| 214 | +# MAGIC %md |
| 215 | +# MAGIC ## Actions |
| 216 | +# MAGIC Conversely, DataFrame actions are methods that **trigger computation**. |
| 217 | +# MAGIC Actions are needed to trigger the execution of any DataFrame transformations. |
| 218 | +# MAGIC |
| 219 | +# MAGIC The `show` action causes the following cell to execute transformations. |
| 220 | + |
| 221 | +# COMMAND ---------- |
| 222 | + |
| 223 | +(productsDF |
| 224 | + .select("name", "price") |
| 225 | + .where("price < 200") |
| 226 | + .orderBy("price") |
| 227 | + .show()) |
| 228 | + |
| 229 | +# COMMAND ---------- |
| 230 | + |
| 231 | +# MAGIC %md |
| 232 | +# MAGIC Below are several examples of <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#dataframe-apis" target="_blank">DataFrame</a> actions. |
| 233 | +# MAGIC |
| 234 | +# MAGIC ### DataFrame Action Methods |
| 235 | +# MAGIC | Method | Description | |
| 236 | +# MAGIC | --- | --- | |
| 237 | +# MAGIC | show | Displays the top n rows of DataFrame in a tabular form | |
| 238 | +# MAGIC | count | Returns the number of rows in the DataFrame | |
| 239 | +# MAGIC | describe, summary | Computes basic statistics for numeric and string columns | |
| 240 | +# MAGIC | first, head | Returns the the first row | |
| 241 | +# MAGIC | collect | Returns an array that contains all rows in this DataFrame | |
| 242 | +# MAGIC | take | Returns an array of the first n rows in the DataFrame | |
| 243 | + |
| 244 | +# COMMAND ---------- |
| 245 | + |
| 246 | +# MAGIC %md |
| 247 | +# MAGIC `count` returns the number of records in a DataFrame. |
| 248 | + |
| 249 | +# COMMAND ---------- |
| 250 | + |
| 251 | +budgetDF.count() |
| 252 | + |
| 253 | +# COMMAND ---------- |
| 254 | + |
| 255 | +# MAGIC %md |
| 256 | +# MAGIC `collect` returns an array of all rows in a DataFrame. |
| 257 | + |
| 258 | +# COMMAND ---------- |
| 259 | + |
| 260 | +budgetDF.collect() |
| 261 | + |
| 262 | +# COMMAND ---------- |
| 263 | + |
| 264 | +# MAGIC %md |
| 265 | +# MAGIC ## Convert between DataFrames and SQL |
| 266 | + |
| 267 | +# COMMAND ---------- |
| 268 | + |
| 269 | +# MAGIC %md |
| 270 | +# MAGIC `createOrReplaceTempView` creates a temporary view based on the DataFrame. The lifetime of the temporary view is tied to the SparkSession that was used to create the DataFrame. |
| 271 | + |
| 272 | +# COMMAND ---------- |
| 273 | + |
| 274 | +budgetDF.createOrReplaceTempView("budget") |
| 275 | + |
| 276 | +# COMMAND ---------- |
| 277 | + |
| 278 | +display(spark.sql("SELECT * FROM budget")) |
| 279 | + |
| 280 | +# COMMAND ---------- |
| 281 | + |
| 282 | +# MAGIC %md |
| 283 | +# MAGIC # Spark SQL Lab |
| 284 | +# MAGIC |
| 285 | +# MAGIC ##### Tasks |
| 286 | +# MAGIC 1. Create a DataFrame from the `events` table |
| 287 | +# MAGIC 1. Display the DataFrame and inspect its schema |
| 288 | +# MAGIC 1. Apply transformations to filter and sort `macOS` events |
| 289 | +# MAGIC 1. Count results and take the first 5 rows |
| 290 | +# MAGIC 1. Create the same DataFrame using a SQL query |
| 291 | +# MAGIC |
| 292 | +# MAGIC ##### Methods |
| 293 | +# MAGIC - <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.SparkSession.html?highlight=sparksession" target="_blank">SparkSession</a>: `sql`, `table` |
| 294 | +# MAGIC - <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html" target="_blank">DataFrame</a> transformations: `select`, `where`, `orderBy` |
| 295 | +# MAGIC - <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html" target="_blank">DataFrame</a> actions: `select`, `count`, `take` |
| 296 | +# MAGIC - Other <a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html" target="_blank">DataFrame</a> methods: `printSchema`, `schema`, `createOrReplaceTempView` |
| 297 | + |
| 298 | +# COMMAND ---------- |
| 299 | + |
| 300 | +# MAGIC %md |
| 301 | +# MAGIC ### 1. Create a DataFrame from the `events` table |
| 302 | +# MAGIC - Use SparkSession to create a DataFrame from the `events` table |
| 303 | + |
| 304 | +# COMMAND ---------- |
| 305 | + |
| 306 | +# TODO |
| 307 | +eventsDF = FILL_IN |
| 308 | + |
| 309 | +# COMMAND ---------- |
| 310 | + |
| 311 | +# MAGIC %md |
| 312 | +# MAGIC ### 2. Display DataFrame and inspect schema |
| 313 | +# MAGIC - Use methods above to inspect DataFrame contents and schema |
| 314 | + |
| 315 | +# COMMAND ---------- |
| 316 | + |
| 317 | +# TODO |
| 318 | + |
| 319 | +# COMMAND ---------- |
| 320 | + |
| 321 | +# MAGIC %md |
| 322 | +# MAGIC ### 3. Apply transformations to filter and sort `macOS` events |
| 323 | +# MAGIC - Filter for rows where `device` is `macOS` |
| 324 | +# MAGIC - Sort rows by `event_timestamp` |
| 325 | +# MAGIC |
| 326 | +# MAGIC <img src="https://files.training.databricks.com/images/icon_hint_32.png" alt="Hint"> Use single and double quotes in your filter SQL expression |
| 327 | + |
| 328 | +# COMMAND ---------- |
| 329 | + |
| 330 | +# TODO |
| 331 | +macDF = (eventsDF |
| 332 | + .FILL_IN |
| 333 | +) |
| 334 | + |
| 335 | +# COMMAND ---------- |
| 336 | + |
| 337 | +# MAGIC %md |
| 338 | +# MAGIC ### 4. Count results and take first 5 rows |
| 339 | +# MAGIC - Use DataFrame actions to count and take rows |
| 340 | + |
| 341 | +# COMMAND ---------- |
| 342 | + |
| 343 | +# TODO |
| 344 | +numRows = macDF.FILL_IN |
| 345 | +rows = macDF.FILL_IN |
| 346 | + |
| 347 | +# COMMAND ---------- |
| 348 | + |
| 349 | +# MAGIC %md |
| 350 | +# MAGIC **CHECK YOUR WORK** |
| 351 | + |
| 352 | +# COMMAND ---------- |
| 353 | + |
| 354 | +from pyspark.sql import Row |
| 355 | + |
| 356 | +assert(numRows == 1938215) |
| 357 | +assert(len(rows) == 5) |
| 358 | +assert(type(rows[0]) == Row) |
| 359 | + |
| 360 | +# COMMAND ---------- |
| 361 | + |
| 362 | +# MAGIC %md |
| 363 | +# MAGIC ### 5. Create the same DataFrame using SQL query |
| 364 | +# MAGIC - Use SparkSession to run a SQL query on the `events` table |
| 365 | +# MAGIC - Use SQL commands to write the same filter and sort query used earlier |
| 366 | + |
| 367 | +# COMMAND ---------- |
| 368 | + |
| 369 | +# TODO |
| 370 | +macSQLDF = spark.FILL_IN |
| 371 | + |
| 372 | +display(macSQLDF) |
| 373 | + |
| 374 | +# COMMAND ---------- |
| 375 | + |
| 376 | +# MAGIC %md |
| 377 | +# MAGIC **CHECK YOUR WORK** |
| 378 | +# MAGIC - You should only see `macOS` values in the `device` column |
| 379 | +# MAGIC - The fifth row should be an event with timestamp `1592539226602157` |
| 380 | + |
| 381 | +# COMMAND ---------- |
| 382 | + |
| 383 | +verify_rows = macSQLDF.take(5) |
| 384 | +assert (macSQLDF.select("device").distinct().count() == 1 and len(verify_rows) == 5 and verify_rows[0]['device'] == "macOS"), "Incorrect filter condition" |
| 385 | +assert (verify_rows[4]['event_timestamp'] == 1592539226602157), "Incorrect sorting" |
| 386 | +del verify_rows |
| 387 | + |
| 388 | +# COMMAND ---------- |
| 389 | + |
| 390 | +# MAGIC %md |
| 391 | +# MAGIC ### Classroom Cleanup |
| 392 | + |
| 393 | +# COMMAND ---------- |
| 394 | + |
| 395 | +# MAGIC %run ./Includes/Classroom-Cleanup |
0 commit comments