Pyspark week-9
Few Python Fundamentals
========================
#include
import sys
These modules consists of functions, classes, methods and variables.
Functions are not bound to any classname or object.
methods are bound to classes or its objects..
A module is a python file which holds functions, classes, methods and variables.
import time
print(time.time())
import time as t
print(t.time())
#below will import all the things
import datetime
print(datetime.date.today())
#below is to import specific things and not all
from datetime import date
print(date.today())
so when we are importing specific functions then we can call the function directly.
Module
How to import it
How to call the functions within that module
How to import full module, Partial module
__name__ Global Variable
__main__
if we run the python file directly then the global variable __name__ is set to __main__
but if we have it indirectly then the value of __name__ is set to the name of the file
file1
======
print("name of this module is ", __name__)
file2
=======
import module1
print("in the second file the module name is ",__name__)
if __name__ == "__main__":
print("Executed when invoked directly")
else:
print("executed when imported")
# python comment
// scala comment
a=5
In python, unlike statically typed languages like c or java, there is no need to specifically declare
the data type of the variable. In dynamically typed languages like python, the interpreter itself
predicts the datatype.
Named Function
================
def sum(a,b):
return a+b
total = sum(3,4)
print(total)
Anonymous functions are referred to as lambda functions in python.
Few more differences between scala & python
=============================================
1. Case
var totalCount = 10
//camel case in scala
total_count
//snake case
2. in scala single line comment is done using //
and for multiline we use
/*
*/
but in python we use # to comment
# this is a comment
3. in scala we were using foreach
but in python foreach is not allowed.
in scala
import org.apache.spark.SparkContext
object First extends App {
val arr = Array(1,2,3,4,5)
arr.foreach(println)
in python
arr = [1,2,3,4,5]
for a in arr:
print(a)
4. scala
(x => x.split(" ")) //anonymous function
python
(lambda x : x.split(" ")) //lambda function
word count problem that we wrote in scala
==========================================
import org.apache.spark.SparkContext
object First extends App {
//Common lines
val sc = new SparkContext("local[*]","wordcount")
val input = sc.textFile("/Users/trendytech/Desktop/data/input/file.txt")
//one input row will give multiple output rows
val words = input.flatMap(x => x.split(" "))
//one input row will give one output row only
val wordCounts = words.map(x=> (x, 1))
//take two rows , and does aggregation and returns one row
val finalCount = wordCounts.reduceByKey((x,y) => x+y)
//action
finalCount.collect.foreach(println)
Pyspark code
==============
from pyspark import SparkContext
# common lines
sc = SparkContext("local[*]", "wordcount")
input = sc.textFile("/Users/trendytech/Desktop/data/input/file.txt")
# one input row will give multiple output rows
words = input.flatMap(lambda x: x.split(" "))
# one input row will be giving one output row
word_counts= words.map(lambda x: (x, 1))
final_count = word_counts.reduceByKey(lambda x, y: x + y)
result = final_count.collect()
for a in result:
print(a)
=================
1. Change the logging level
sc.setLogLevel("ERROR")
2. __name__
__main__
3. Holding the program
from sys import stdin
stdin.readline()
4. DAG
localhost:4040
pyspark uses api library
scala was connnecting to spark core directly.
hence scala dag matches to our code but pyspark dag does not.
from pyspark import SparkContext
from sys import stdin
if __name__ == "__main__":
# common lines
sc = SparkContext("local[*]", "wordcount")
# sc.setLogLevel("ERROR")
input = sc.textFile("/Users/trendytech/Desktop/data/search_data.txt")
# one input row will give multiple output rows
words = input.flatMap(lambda x: x.split(" "))
# one input row will be giving one output row
word_counts= words.map(lambda x: (x, 1))
final_count = word_counts.reduceByKey(lambda x, y: x + y)
result = final_count.collect()
for a in result:
print(a)
else:
print("Not executed directly")
stdin.readline()
========
1. Lowercase
word_counts= words.map(lambda x: (x.lower(), 1))
2. countByValue
from pyspark import SparkContext
from sys import stdin
if __name__ == "__main__":
# common lines
sc = SparkContext("local[*]", "wordcount")
# sc.setLogLevel("ERROR")
input = sc.textFile("/Users/trendytech/Desktop/data/search_data.txt")
# one input row will give multiple output rows
words = input.flatMap(lambda x: x.split(" "))
# one input row will be giving one output row
word_counts= words.map(lambda x: (x.lower()))
final_count = word_counts.countByValue()
print(final_count)
else:
print("Not executed directly")
3. sortByKey
from pyspark import SparkContext
from sys import stdin
if __name__ == "__main__":
# common lines
sc = SparkContext("local[*]", "wordcount")
# sc.setLogLevel("ERROR")
input = sc.textFile("/Users/trendytech/Desktop/data/search_data.txt")
# one input row will give multiple output rows
words = input.flatMap(lambda x: x.split(" "))
# one input row will be giving one output row
word_counts= words.map(lambda x: (x.lower(), 1))
final_count = word_counts.reduceByKey(lambda x, y: x + y).map(lambda x: (x[1],x[0]))
result = final_count.sortByKey(False).map(lambda x: (x[1],x[0])).collect()
for a in result:
print(a)
else:
print("Not executed directly")
4. sortBy
from pyspark import SparkContext
from sys import stdin
if __name__ == "__main__":
# common lines
sc = SparkContext("local[*]", "wordcount")
# sc.setLogLevel("ERROR")
input = sc.textFile("/Users/trendytech/Desktop/data/search_data.txt")
# one input row will give multiple output rows
words = input.flatMap(lambda x: x.split(" "))
# one input row will be giving one output row
word_counts= words.map(lambda x: (x.lower(), 1))
final_count = word_counts.reduceByKey(lambda x, y: x + y)
result = final_count.sortBy(lambda x: x[1], False).collect()
for a in result:
print(a)
else:
print("Not executed directly")
stdin.readline()
1. Customers who have spent the most
=====================================
from pyspark import SparkContext
sc = SparkContext("local[*]","customer-orders")
rdd1 = sc.textFile("/Users/trendytech/Desktop/data/customer-orders.csv")
rdd2 = rdd1.map(lambda x: (x.split(",")[0],float(x.split(",")[2])))
rdd3 = rdd2.reduceByKey(lambda x,y: x+y)
rdd4 = rdd3.sortBy(lambda x: x[1],False)
result = rdd4.collect()
for a in result:
print(a)
2. Movie Rating
================
from pyspark import SparkContext
sc = SparkContext("local[*]","movie-data")
lines = sc.textFile("/Users/trendytech/Desktop/data/movie-data.data")
ratings = lines.map(lambda x: (x.split("\t")[2],1))
result = ratings.reduceByKey(lambda x,y: x+y).collect()
for a in result:
print(a)
3. Average number of Friends
=============================
def parseLine(line):
fields = line.split(",")
age = int(fields[2])
numFriends = int(fields[3])
return (age,numFriends)
from pyspark import SparkContext
sc = SparkContext("local[*]","FriendsByAge")
lines = sc.textFile("/Users/trendytech/Desktop/data/friends-data.csv")
rdd = lines.map(parseLine)
# (33,385) input
#(33,(385,1)) output
#(33,(3000,5))
#in scala we used to access the elements of tuple using x._1 , x._2
#in python we access the elements of tuple using x[0],x[2]
totalsByAge = rdd.mapValues(lambda x: (x,1)).reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1]))
averagesByAge = totalsByAge.mapValues(lambda x:x[0]/x[1])
result = averagesByAge.collect()
Solution 1 :
val rdd1 = spark.sparkContext.textFile(“/Users/trendytech/Desktop/dataset1”)
rdd1.collect().foreach(println)
val rdd2 = rdd1.map(line => {
val fields = line.split(“,”)
if (fields(1).toInt > 18)
(fields(0),fields(1),fields(2),”Y”)
else
(fields(0),fields(1),fields(2),”N”)
})
rdd2.collect().foreach(println)
TRENDYTECH 9108179578
Solution 2 :
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
import scala.math.min
/** Find the minimum temperature by weather station */
object MinTemperatures {
def parseLine(line:String)= {
val fields = line.split(“,”)
val stationID = fields(0)
val entryType = fields(2)
val temperature = fields(3)
(stationID, entryType, temperature)
}
Cont...
TRENDYTECH 9108179578
/** Our main function where the action happens */
def main(args: Array[String]) {
// Set the log level to only print errors
Logger.getLogger(“org”).setLevel(Level.ERROR)
// Create a SparkContext using every core of the local machine
val sc = new SparkContext(“local[*]”, “MinTemperatures”)
// Read each line of input data
val lines = sc.textFile(“/Users/sumitm/Desktop/spark-data/temp-data.csv”)
// Convert to (stationID, entryType, temperature) tuples
val parsedLines = lines.map(parseLine)
// Filter out all but TMIN entries
val minTemps = parsedLines.filter(x => x._2 == “TMIN”) Cont...
TRENDYTECH 9108179578
// Convert to (stationID, temperature)
val stationTemps = minTemps.map(x => (x._1, x._3.toFloat))
// Reduce by stationID retaining the minimum temperature found
val minTempsByStation = stationTemps.reduceByKey( (x,y) => min(x,y))
// Collect, format, and print the results
val results = minTempsByStation.collect()
for (result <- results.sorted) {
val station = result._1
val temp = result._2
val formattedTemp = f”$temp%.2f F”
println(s”$station minimum temperature: $formattedTemp”)
}
}
}
TRENDYTECH 9108179578
1
Apache Spark Assignment
Finding Out Popular Courses for a Online Training
Company
Problem Statement:
We have been approached by an Online Training Company which
provides training videos.The company provides different online
courses to the users and each course comprises of multiple
chapters.Courses have been assigned courseIds and Chapters in a
course have chapterIDs.Note that chapterIds are unique.No two
courses can have same chapterId.Each Course has a title also.
Also, Users viewing the courses are given UserIds.
The company wants us to prepare an Apache Spark based report
which will enable them to figure out which are the most popular
courses on their training portal.
The popularity of a course will depend on the total score it
receives.The score which a course will get will be based on few
business rules which are explained later in the document in
detail.Please Refer the Business Rules** Section.To give a brief
overview, consider if a course has 20 chapters and a user views 18
chapters out of those, then the user has watched 90% of the course
and the course will get a good score of 10 say.Like that each
course can be watched by multiple users and scores will
accumulate for that course.
Final Ranking Chart Report will look like below, Starting with
most popular course on the top.
Score Title
X1 Y1
X2 Y2
.… .…
.… .…
2
Dataset Details:
Dataset1:
We have three csv files views-1.csv, views-2.csv and views-3.csv
Also we have a viewsHeader.csv file which has the header information:
userId, chapterId, dateAndTime
This will be a log of viewings, showing which user has viewed which
chapter. These chapterIds belong to courses. Also chapterIds are
unique.
Here is the sample raw data:
But as we can see from this raw data ,there is no reference to courses in
this raw data.
Dataset 2: So we have a much smaller dataset, chapters.csv which is a
mapping of chapterIds to courseId.
Sample Raw Data:
3
Dataset 3:
There is another dump file (titles.csv) containing courseIds against their
titles - we will use this at the end to print the results,so that we also get
the title of each course when we get the popular courses result in the
end.
Note: For every exercise ,the expected output of the sample raw data is
provided .But this is not the actual output. This has been provided for
understanding purpose only.
You will work on actual data and output will be based on the csv files
provided to you.
Solve the following Exercises:
Exercise 1:
Find Out how many Chapters are there per course.
Expected Output of Sample Raw Data:
4
Exercise 2: Your job is to produce a ranking chart detailing based on
which are the most popular courses by score.
**Business Rules:
Scores which each courseId gets, will be based on few rules:
Like ,If a user sticks through most of the course, that's more
deserving of "points" than if someone bails out just a quarter
way through the course. Following is the scoring system
algorithm followed:
If a user watches more than 90% of the course, the course
gets 10 points
If a user watches > 50% but <90% , it scores 4
If a user watches > 25% but < 50% it scores 2
Less than 25% is no score
Expected Output of Sample Raw Data:
Exercise 3: As a final touch, get the titles using a final (small
data) join. You can then sort by total score (descending) to get
the results in a pleasant format with most popular score on top.
5
**Hints
Exercise 1:
Build an RDD containing a key of courseId together with the
number of chapters on that course. You can use a map
transformation here.
courseId count
1 1
1 1
2 1
3 1
3 1
Exercise 2:
Step 1: Removing Duplicate Views .
the same user watching the same chapter doesn't count, so we
can call distinct to remove duplications.
Step 2: Joining to get Course Id in the RDD
6
This isn't very meaningful as it is - we know for example that
user 14 has watched 96 and 97, but are they from the same
course, or are they different courses???? We need to join the
data together. As the common column in both RDDs is the
chapterId, we need both Rdds to be keyed on that.
You need to do a map to switch the key and the value. Now
that you have two RDDs, each keyed on chapterId, you can join
them together.
And now after joining you will have:
7
Step 3: Drop the Chapter Id
As each "row" in the RDD now represents "a chapter from this
course has been viewed", the chapterIds are no longer relevant.
You can get rid of them at this point - this will avoid dealing
with tricky nested tuples later on. At the same time, given
we're counting how many chapters of a course each user
watched, we will be counting shortly, so we can drop a "1" in
for each user/course combination.
Expected RDD after this step:
Step 4 - Count Views for User/Course
We can now run a reduce as usual..
8
We can read these results as "user 14 watched 2 chapters of 1;
user 13 watched 1 chapter of course 1; user 14 watched 2
chapters of course 2. etc".
Step 5 - Drop the userId
The user id is no longer relevant. We've needed it up until now
to ensure that for example, 10 users watching 1 chapter
doesn't count the same as 1 user watching 10 chapters!
Another map transformation should get you...
To rephrase the previous step, we now read this as "someone
watched 2 chapters of course 1. Somebody different watched 1
chapter of course 1; someone watched 1 chapter of course 2,
etc".
Step 6: of how many chapters?
9
The scoring is based on what percentage of the total course
they watched. So we will need to get the total number of
chapters for each course into our RDD:
"Someone watched 2 chapters of 3. Somebody different
watched 1 chapter of 3".
Step 7: Convert to percentages
This should be an easy mapping.
Step 8: Convert to scores
As described earlier, percentages are converted to scores:
10
Step 9: Add up the total scores
Exercise 3:
You have to associate titles with the course.Use Inner Join.Then
later as a finishing job you will not require courseIds any more
as the titles are available.You can sort the data by most popular
course to display data in format - Score, Title .
***********************************************************************************
.
Pyspark week-10
================
BigData campaign data in the spark with scala
from pyspark import SparkContext
sc = SparkContext("local[*]","KeywordAmount")
initial_rdd = sc.textFile("/Users/trendytech/Desktop/data/bigdata-campaign-data.csv")
mapped_input = initial_rdd.map(lambda x: (float(x.split(",")[10]),x.split(",")[0]))
words = mapped_input.flatMapValues(lambda x: x.split(" "))
final_mapped = words.map(lambda x: (x[1].lower(),x[0]))
total = final_mapped.reduceByKey(lambda x,y: x+y)
sorted = total.sortBy(lambda x: x[1],False)
result = sorted.take(20)
for x in result:
print(x)
=========
from pyspark import SparkContext
def loadBoringWords():
boring_words = set(line.strip() for line in
open("/Users/trendytech/Desktop/data/boringwords.txt"))
return boring_words
sc = SparkContext("local[*]","KeywordAmount")
name_set = sc.broadcast(loadBoringWords())
initial_rdd = sc.textFile("/Users/trendytech/Desktop/data/bigdata-campaign-data.csv")
mapped_input = initial_rdd.map(lambda x: (float(x.split(",")[10]),x.split(",")[0]))
words = mapped_input.flatMapValues(lambda x: x.split(" "))
final_mapped = words.map(lambda x: (x[1].lower(),x[0]))
filtered_rdd = final_mapped.filter(lambda x: x[0] not in name_set.value)
total = filtered_rdd.reduceByKey(lambda x,y: x+y)
sorted = total.sortBy(lambda x: x[1],False)
result = sorted.take(20)
for x in result:
print(x)
Accumulator example
=====================
from pyspark import SparkContext
def blankLineChecker(line):
if(len(line) == 0):
myaccum.add(1)
sc = SparkContext("local[*]","AccumulatorExample")
myrdd = sc.textFile("/Users/trendytech/Desktop/data/samplefile.txt")
myaccum = sc.accumulator(0.0)
myrdd.foreach(blankLineChecker)
print(myaccum.value)
=======
you can use foreach on a rdd but not on a local variable example list
a = rdd.collect
======
from pyspark import SparkContext
sc = SparkContext("local[*]", "logLevelCount")
sc.setLogLevel("INFO")
if __name__ == "__main__":
my_list = ["WARN: Tuesday 4 September 0405",
"ERROR: Tuesday 4 September 0408",
"ERROR: Tuesday 4 September 0408",
"ERROR: Tuesday 4 September 0408",
"ERROR: Tuesday 4 September 0408",
"ERROR: Tuesday 4 September 0408"]
original_logs_rdd = sc.parallelize(my_list)
else:
original_logs_rdd = sc.textFile("/Users/trendytech/Desktop/data/logsample.txt")
print("inside the else part")
new_pair_rdd = original_logs_rdd.map(lambda x:(x.split(":")[0],1))
resultant_rdd = new_pair_rdd.reduceByKey(lambda x,y: x+y)
result = resultant_rdd.collect()
for x in result:
print(x)
=========
bigLog.txt 10 million log level entries
groupByKey
reduceByKey
from pyspark import SparkContext
# Set the log level to only print errors
sc = SparkContext("local[*]", "LogLevelCount")
sc.setLogLevel("INFO")
# Create a SparkContext using every core of the local machine
base_rdd = sc.textFile("/Users/trendytech/Desktop/data/bigLog.txt")
mapped_rdd = base_rdd.map(lambda x: (x.split(":")[0], x.split(":")[1]))
grouped_rdd = mapped_rdd.groupByKey()
final_rdd = grouped_rdd.map(lambda x: (x[0], len(x[1])))
result = final_rdd.collect()
for x in result:
print(x)
============
from pyspark import SparkContext
sc = SparkContext("local[*]", "LogLevelCount")
sc.setLogLevel("INFO")
base_rdd = sc.textFile("/Users/trendytech/Desktop/data/bigLog.txt")
mapped_rdd = base_rdd.map(lambda x: (x.split(":")[0], 1))
reduced_rdd = mapped_rdd.reduceByKey(lambda x,y: x+y)
result = reduced_rdd.collect()
for x in result:
print(x)
==========
Miscellaneous things
=====================
1)
scala
=======
val a = 1 to 100
val base = sc.parallelize(a)
base.reduce((x,y) => x+y)
pyspark
=========
a = range(1,101)
base = sc. parallelize(a)
base.reduce(lambda x,y: x+y)
2)
input = sc.textFile("/Users/trendytech/Desktop/data/customer-orders.csv")
input.saveAsTextFile("/Users/trendytech/Desktop/data/output10")
3. Count - this is an action and works the same way as we saw in scala codes.
4. sc.defaultParallelism
5. get the num of partitions in an rdd
rdd.getNumPartitions()
6.my_list = ("WARN: Tuesday 4 September 0405",
"ERROR: Tuesday 4 September 0408",
"ERROR: Tuesday 4 September 0408",
"ERROR: Tuesday 4 September 0408",
"ERROR: Tuesday 4 September 0408",
"ERROR: Tuesday 4 September 0408")
original_logs_rdd = sc.parallelize(my_list)
original_logs_rdd.getNumPartitions()
7) sc.defaultMinPartitions - 2
8) repartition
9) coalesce
Solution 1:
//Assignemnet-Problem 1
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
object Spark_Assignment_windowdata extends App
{ //creating sparkConf object
val sparkConf = new SparkConf()
sparkConf.set(“spark.app.name”, “Assignment windowdata”)
sparkConf.set(“spark.master”,”local[2]”) 9108179578
//Step1 -creating a spark session
val spark = SparkSession.builder()
.config(sparkConf)
.getOrCreate()
//Step 2 - Setting the logging level to Error
Logger.getLogger(“org”).setLevel(Level.ERROR)
// Step 3 Explicit schema definition programmatically using
StructType val windowdataSchema = StructType(List(
StructField(“Country”,StringType),
StructField(“weeknum”,IntegerType),
StructField(“numinvoices”,IntegerType),
StructField(“totalquantity”,IntegerType),
StructField(“invoicevalue”,DoubleType)
)) 9108179578
// Step 3 contd.. Loading the file and creation of dataframe using dataframe reader API, using
explicitly specified schema
val windowdataDF = spark.read
.format(“csv”)
.schema(windowdataSchema)
.option(“path”, “C:/xyz/TrendyTech/Spark_data/structuredAPI/windowdata.csv”)
.load()
//print first 20 records of the dataframe
windowdataDF.show()
//Step 4: Saving the data in Parquet format using Dataframe Writer API
//Data is two-level partitioned on Country and weeknum column , these columns have low
cardinality //Default output format is parquet
/* windowdataDF.write
.partitionBy(“Country”, “weeknum”)
.mode(SaveMode.Overwrite)
.option(“path”,”C:/xyz/TrendyTech/Spark_data/structuredAPI/Output/windowdata_output”)
.save()
TRENDYTECH */ 9108179578
//Step 5: Save the Dataframe to Avro Format and also partitioning data by Country column
windowdataDF.write
.format(“avro”)
.partitionBy(“Country”)
.mode(SaveMode.Overwrite)
.option(“path”,”C:/xyz/TrendyTech/Spark_data/structuredAPI/Output/windowdata_avrooutput”)
.save()
TRENDYTECH 9108179578
Solution 2:
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
object WEEK11_SOLUTION_2_WINDOWDATA extends App {
// Setting the Logging~Level To ERROR
Logger.getLogger("org").setLevel(Level.ERROR)
// define a schema for employee record data using a case class
case class windowData(Country: String, Weeknum: Int, NumInvoices: Int, TotalQuantity: Int,
InvoiceValue: String)
//Create Spark Config Object
val sparkConf = new SparkConf()
sparkConf.set("spark.app.name", "WEEK11_SOLUTION_2_WINDOWDATA")
sparkConf.set("spark.master", "local[2]")
//Create Spark Session
val spark = SparkSession.builder()
.config(sparkConf)
.getOrCreate()
import spark.implicits._
val windowDataDF = spark.sparkContext.textFile("G:/TRENDY~TECH/WEEK-
11/Assignment_Dataset/windowdata-201021-002706.csv") //.toDF
.map(_.split(","))
.map(e => windowData(e(0), e(1).trim.toInt, e(2).trim.toInt, e(3).trim.toInt, e(4)))
.toDF()
.repartition(8)
windowDataDF.write
.format("json")
.mode(SaveMode.Overwrite)
.option("path", "G:/TRENDY~TECH/WEEK-
11/Assignment_Dataset/OutPut_Prb2/windowData_jsonoutput")
.save()
//windowDataDF.show()
spark.stop()
scala.io.StdIn.readLine()
}
TRENDYTECH 9108179578
TRENDYTECH 9108179578
Pyspark week 11
================
1. cache & persist
2. spark-submit
from sys import stdin
sc = SparkContext("local[*]","PremiumCustomers")
base_rdd = sc.textFile("/Users/trendytech/Desktop/data/customer-orders.csv")
mapped_input = base_rdd.map(lambda x: (x.split(",")[0],float(x.split(",")[2])))
total_by_customer = mapped_input.reduceByKey(lambda x,y: x+y)
premium_customers = total_by_customer.filter(lambda x: x[1] > 5000)
doubled_amount = premium_customers.map(lambda
x:(x[0],x[1]*2)).persist(StorageLevel.MEMORY_ONLY)
result = doubled_amount.collect()
for x in result:
print(x)
print(doubled_amount.count())
stdin.readline()
spark-submit /Users/trendytech/PycharmProjects/pysparklearning/module1.py
Ratings.dat
Movies.dat
find the top rated movies
1. atleast 1000 people have rated
2. rating should be > 4.5
finding top movies (scala code)
==================================
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkContext
object JoinDemo extends App {
Logger.getLogger("org").setLevel(Level.ERROR)
val sc = new SparkContext("local[*]","joindemo")
val ratingsRdd= sc.textFile("/Users/trendytech/Desktop/data/ratings.dat")
val mappedRdd = ratingsRdd.map(x => {
val fields = x.split("::")
(fields(1),fields(2))
})
val newmappedRdd= mappedRdd.mapValues(x=>(x.toFloat,1.0))
val reduceRdd = newmappedRdd.reduceByKey((x,y) => (x._1+y._1, x._2+y._2))
val filteredRdd = reduceRdd.filter(x=>x._2._1 > 1000)
val finalRdd = filteredRdd.mapValues(x => x._1/x._2).filter(x => x._2 > 4.5)
val moviesRdd= sc.textFile("/Users/trendytech/Desktop/data/movies.dat")
val moviesmappedRdd = moviesRdd.map(x => {
val fields = x.split("::")
(fields(0),(fields(1),fields(2)))
})
val joinedRdd = moviesmappedRdd.join(finalRdd)
val topMoviesRdd = joinedRdd.map(x=>x._2._1)
topMoviesRdd.collect.foreach(println)
}
=========================
equivalent pyspark code
=========================
from pyspark import SparkContext
sc = SparkContext("local[*]","joindemo")
ratings_rdd = sc.textFile("/Users/trendytech/Desktop/data/ratings.dat")
mapped_rdd = ratings_rdd.map(lambda x: (x.split("::")[1], x.split("::")[2]))
new_mapped_rdd = mapped_rdd.mapValues(lambda x: (float(x),1.0))
reduce_rdd = new_mapped_rdd.reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1]))
filtered_rdd = reduce_rdd.filter(lambda x: x[1][0] > 1000)
final_rdd = filtered_rdd.mapValues(lambda x: x[0]/x[1]).filter(lambda x: x[1] > 4.5)
movies_rdd= sc.textFile("/Users/trendytech/Desktop/data/movies.dat")
movies_mapped_rdd = movies_rdd.map(lambda x: (x.split("::")[0],(x.split("::")[1],x.split("::")[2])))
joined_rdd = movies_mapped_rdd.join(final_rdd)
top_movies_rdd = joined_rdd.map(lambda x: x[1][0])
result = top_movies_rdd.collect()
for x in result:
print(x)
Structured API's
==================
DataFrame, DataSets, Spark SQL
DataSets are not supported.
SparkSession
from pyspark import SparkConf
from pyspark.sql import SparkSession
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
orderDf = spark.read.csv("/Users/trendytech/Desktop/data/orders.csv")
orderDf.show()
spark.stop()
=======
find the total orders placed by each customer where customer id > 10000
from pyspark import SparkConf
from pyspark.sql import SparkSession
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
orderDf =
spark.read.option("header",True).option("inferSchema",True).csv("/Users/trendytech/Desktop/da
ta/orders.csv")
groupedDf = orderDf.repartition(4) \
.where("order_customer_id > 10000") \
.select("order_id","order_customer_id") \
.groupBy("order_customer_id") \
.count()
groupedDf.show()
===============
1. wrong column name
so when we give a column name which does not exist then the error is shown at runtime and
not at compile time.
2. standard way
orderDf = spark.read.format("csv")\
.option("header",True)\
.option("inferSchema",True)\
.option("path","/Users/trendytech/Desktop/data/orders.csv")\
.load()
orderDf = spark.read.format("json")\
.option("path","/Users/trendytech/Desktop/data/orders.json")\
.load()
1
Spark StructuredAPIs -Assignment Solutions
Assignment 1 :
Given 2 Datasets employee.json and dept.json
We need to calculate the count of employees against each department. Use Structured
APIs.
Code:
//Find the count of employees against each department
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.sql.functions._
object Assignment1_Week12 extends App{
//Setting the Log Level
Logger.getLogger("org").setLevel(Level.ERROR)
//Setting the spark conf
val sparkConf = new SparkConf()
sparkConf.set("spark.app.name","Assignment1_Week12")
sparkConf.set("spark.master","local[2]")
//Creating Spark Session
val spark = SparkSession.builder()
.config(sparkConf)
.getOrCreate()
//Load the department data into a Dataframe using dataframe reader API
val deptDf = spark.read
.format("json")
.option("path","C:/TrendyTech/SparkExamples/dept.json")
.load()
// deptDf.show()
// deptDf.printSchema()
2
//Load the employee data into a Dataframe using dataframe reader API
val employeeDf = spark.read
.format("json")
.option("path","C:/TrendyTech/SparkExamples/employee.json")
.load()
// employeeDf.show()
// employeeDf.printSchema()
//Joining of two dataframes using left outer join, with department dataframe on left
side
val joinCondition = deptDf.col("deptid") === employeeDf.col("deptid")//join
condition
val joinType = "left" //joinType
val joinedDf = deptDf.join(employeeDf, joinCondition, joinType) //Joining of two
dataframes
//drop the ambiguous column deptid of employee dataframe,from the joined
Dataframe
val joinedDfNew = joinedDf.drop(employeeDf.col("deptid"))
//Use first function so as to get other columns also along with aggregated columns
joinedDfNew.groupBy("deptid").agg(count("empname").as("empcount"),first("deptNam
e").as ("deptName")).dropDuplicates("deptName").show()
spark.stop()
}
Output:
Assignment 2
3
Find the top movies as shown in spark practical 18 using broadcast join. Use
Dataframes or Datasets to solve it this time.
Code:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.sql.functions._
object Assignment2_Week12 extends App {
//Setting the Log Level
Logger.getLogger("org").setLevel(Level.ERROR)
//Setting the spark conf
val sparkConf = new SparkConf()
sparkConf.set("spark.app.name","Assignment2_Week12")
sparkConf.set("spark.master","local[2]")
//Creating Spark Session
val spark = SparkSession.builder()
.config(sparkConf)
.getOrCreate()
//Creation of a ratings dataframe using a case class approach
case class Ratings(userid:Int,movieid:Int,rating:Int,timestamp:String)//create a
case-class that represents the schema
//Creation of base RDD for ratings data
val ratingsRDD =
spark.sparkContext.textFile("C:/TrendyTech/SparkExamples/ratings.dat")//ratings data
does not have a schema, so first loading to an RDD
// map the RDD elements into instances of the case class
val caseClassSchemaRDD = ratingsRDD.map(x => x.split("::")).map(x =>
Ratings(x(0).toInt,x(1).toInt,x(2).toInt,x(3)) )
4
//Transform to a Dataframe:
import spark.implicits._
val ratingsDf = caseClassSchemaRDD.toDF()
// ratingsDf.show()
// ratingsDf.printSchema()
//Creation of base RDD for movies data
val moviesRDD =
spark.sparkContext.textFile("C:/TrendyTech/SparkExamples/movies.dat")
//defining the schema using case class
case class Movies(movieid:Int,moviename:String,genre:String)
val moviestransformedRDD = moviesRDD.map(line => line.split("::")).map(fields =>
Movies(fields(0).toInt,fields(1),fields(2)) )
val moviesNewDf =
moviestransformedRDD.toDF().select("movieid","moviename")
// moviesNewDf.show()
//moviesNewDf.printSchema()
val transformedmovieDf = ratingsDf.groupBy("movieid")
.agg(count("rating").as("movieViewCount"),avg("rating").as("avgMovieRating"))
.orderBy(desc("movieViewCount"))
//transformedmovieDf.show()
val popularMoviesDf = transformedmovieDf.filter("movieViewCount > 1000 AND
avgMovieRating > 4.5")
// popularMoviesDf.show()
5
//Now we want to associate the Movie names also, so we use a broadcast join
spark.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")
val joinCondition = popularMoviesDf.col("movieid") ===
moviesNewDf.col("movieid") //join condition
val joinType = "inner" //type of
join
val finalPopularMoviesDf =
popularMoviesDf.join(broadcast(moviesNewDf),joinCondition,joinType).drop(popularM
oviesDf.col("movieid")).sort(desc("avgMovieRating")) //joining the 2 dataframes using
broadcast join where movies data is the smaller dataset
finalPopularMoviesDf.drop("movieViewCount","movieid","avgMovieRating").show(false
)
spark.stop()
Output:
Assignment 3
File A is a text file of size 1.2 GB in HDFS at location /loc/x. It contains match by match
statistics of runs scored by all the batsman in the history of cricket.
File B is a text file of size 1.2 MB present in local dir /loc/y. It contains list of batsman
playing in cricket world cup 2019.
6
File A:
1 Rohit_Sharma India 200 100.2
1 Virat_Kohli India 100 98.02
1 Steven_Smith Aus 77 79.23
35 Clive_Lloyd WI 29 37.00
243 Rohit_Sharma India 23 150.00
243 Faf_du_Plesis SA 17 35.06
File B:
Rohit_Sharma India
Steven_Smith Aus
Virat_Kohli India
Find the batsman participating in 2019 who has the best average of scoring runs in his
career. Solve this using Dataframes or Datasets.
** File is tab separated.Headers not part of file
Code:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.log4j.Level
import org.apache.log4j.Logger
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.Row
object Assignment3_Week12 extends App {
//Setting the Log Level
Logger.getLogger("org").setLevel(Level.ERROR)
//Setting the spark conf
val sparkConf = new SparkConf()
sparkConf.set("spark.app.name","Assignment3_Week12")
sparkConf.set("spark.master","local[2]")
//Creating Spark Session
val spark = SparkSession.builder()
.config(sparkConf)
.getOrCreate()
7
//Case class creation
case class BatsmenHistory(MatchNumber:Int,Batsman:String,Team:String,
RunsScored:Int,StrikeRate:Double)
//Creation of base RDD for historical data
val batsmenHistoryRDD =
spark.sparkContext.textFile("C:/TrendyTech/SparkExamples/FileA_BatsmenDetails_Histo
ry.txt")
val batsmenHistorySchemaRDD = batsmenHistoryRDD.map(line =>
line.split("\t")).map(fields =>
BatsmenHistory(fields(0).toInt,fields(1),fields(2),fields(3).toInt,fields(4).toDouble) )
// Dataframe creation
import spark.implicits._
val batsmenHistoryDf = batsmenHistorySchemaRDD.toDF()
//batsmenHistoryDf.show()
//batsmenHistoryDf.printSchema()
//Calculating Average runs scored by a batsman in history, with highest average at top
val batsmenBestRunsAvgHistoryDf =
batsmenHistoryDf.groupBy("Batsman").agg(avg("RunsScored").as("AverageRunsScored"))
.select("Batsman","AverageRunsScored")
//batsmenBestRunsAvgHistoryDf.sort(col("AverageRunsScored").desc).show()
//create a base RDD from input data of worldcup
val batsmenWorldCupRDD =
spark.sparkContext.textFile("C:/TrendyTech/SparkExamples/FileB_BatsemenDetails_Wor
ldcup2019.txt")
//Alternative Approach instead of using case class ,though case class can also be used
instead-
8
//Programmatically create an explicit schema of the worldcup 2019 file:
val batsmenworldcupSchema = StructType(List(
StructField("batsman",StringType,false),
StructField("team",StringType)
))
//Convert RDD[Array(String)] to RDD[Row].
val batsmenWorldCupRowRDD = batsmenWorldCupRDD.map(line =>
line.split("\t")).map( fields => Row(fields(0),fields(1)))
//Apply the explicitly defined Struct Type schema to the RDD[Row]
val batsmenWorldCupDf = spark.createDataFrame(batsmenWorldCupRowRDD,
batsmenworldcupSchema)
batsmenWorldCupDf.show()
batsmenWorldCupDf.printSchema()
//autoBroadcast Join is turned off
spark.sql("SET spark.sql.autoBroadcastJoinThreshold = -1")
val joinCondition = batsmenBestRunsAvgHistoryDf.col("Batsman") ===
batsmenWorldCupDf.col("batsman")
val joinType = "inner"
//Using broadcast join
val finalBestBatsmenPlayingWorldCupDf =
batsmenBestRunsAvgHistoryDf.join(broadcast(batsmenWorldCupDf),joinCondition,joinT
ype).drop (batsmenBestRunsAvgHistoryDf.col("Batsman"))
finalBestBatsmenPlayingWorldCupDf.orderBy(desc("AverageRunsScored")).show()
spark.stop()
Output:
+-----------------+------------+
|AverageRunsScored| batsman|
9
+-----------------+------------+
| 111.5|Rohit_Sharma|
| 100.0| Virat_Kohli|
| 77.0|Steven_Smith|
+-----------------+------------+
**********************************************************************
Problem 1:
Given 2 Datasets employee.json and dept.json
We need to calculate the count of employees against each department. Use
Structured API’s.
Sample output
depName,deptid,empcount
IT,11,1
HR,21,1
Marketing,31,1
Fin,41,2
Admin,51,0
TRENDYTECH 9108179578
Problem 2:
Find the top movies as shown in spark practical 18 using broadcast join. Use
Dataframes or Datasets to solve it this time.
Problem 3:
File A is a text file of size 1.2 GB in HDFS at location /loc/x. It contains match by
match statistics of runs scored by all the batsman in the history of cricket.
File B is a text file of size 1.2 MB present in local dir /loc/y. It contains list of
batsman playing in cricket world cup 2019.
TRENDYTECH 9108179578
File A:
MatchNumber Batsman Team RunsScored StrikeRate
1 Rohit Sharma India 200 100.2
1 Virat Kohli India 100 98.02
1 Steven Smith Aus 77 79.23
35 Clive Lloyd WI 29 37.00
243 Rohit Sharma India 23 150.00
243 Faf du Plesis SA 17 35.06
File B:
Batsman Team
Rohit_Sharma India
Steven_Smith Aus
Virat_Kohli India
Question: Find the batsman participating in 2019 who has the best average of
TRENDYTECH 9108179578
scoring runs in his career. Solve using Dataframes or Datasets.
1. reading the data - Reader API
2. crunching of data - transformations
3. write the data back - Writer API
Scala
======
orderDf.write.format("csv")
.mode(SaveMode.Overwrite)
.option("path","/Users/trendytech/Desktop/newfolder1")
.save()
pyspark
======
orderDf.write.format("csv")\
.mode("overwrite")\
.option("path","/Users/trendytech/Desktop/newfolder1")\
.save()
from pyspark import SparkConf
from pyspark.sql import SparkSession
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
orderDf = spark.read.format("csv")\
.option("header",True)\
.option("inferSchema",True)\
.option("path","/Users/trendytech/Desktop/data/orders.csv")\
.load()
print("number of partitions are ", orderDf.rdd.getNumPartitions())
ordersRep = orderDf.repartition(4)
ordersRep.write.format("csv")\
.mode("overwrite")\
.option("path","/Users/trendytech/Desktop/newfolder1")\
.save()
====
overwrite
append
errorIfExists
ignore
=====
Parquet is the default file format in apache spark when we talk about structured api's
=====
Spark File Layout
Number of files is equal to number of partitions.
1. simple repartition - repartition
2. partitioning - partitionBy (allows partitioning pruning)
3. bucketBy
4. maxRecordsPerFile
from pyspark import SparkConf
from pyspark.sql import SparkSession
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
orderDf = spark.read.format("csv")\
.option("header",True)\
.option("inferSchema",True)\
.option("path","/Users/trendytech/Desktop/data/orders.csv")\
.load()
orderDf.write.format("csv").partitionBy("order_status")\
.mode("overwrite")\
.option("path","/Users/trendytech/Desktop/newfolder4")\
.save()
=======
Avro
3.1.2 pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
my_conf.set("spark.jars","/Users/trendytech/Downloads/spark-avro_2.12-3.1.2.jar")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
orderDf = spark.read.format("csv")\
.option("header",True)\
.option("inferSchema",True)\
.option("path","/Users/trendytech/Desktop/data/orders.csv")\
.load()
orderDf.write.format("avro")\
.mode("overwrite")\
.option("path","/Users/trendytech/Desktop/newfolder4")\
.save()
====
Spark SQL
from pyspark import SparkConf
from pyspark.sql import SparkSession
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
orderDf = spark.read.format("csv")\
.option("header",True)\
.option("inferSchema",True)\
.option("path","/Users/trendytech/Desktop/data/orders.csv")\
.load()
orderDf.createOrReplaceTempView("orders")
resultDf = spark.sql("select order_status, count(*) as total_orders from orders group by
order_status")
resultDf.show()
====
from pyspark import SparkConf
from pyspark.sql import SparkSession
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
orderDf = spark.read.format("csv")\
.option("header",True)\
.option("inferSchema",True)\
.option("path","/Users/trendytech/Desktop/data/orders.csv")\
.load()
orderDf.createOrReplaceTempView("orders")
resultDf = spark.sql("select order_customer_id, count(*) as total_orders from orders where
order_status = 'CLOSED' group by order_customer_id order by total_orders desc")
resultDf.show()
====
Table has 2 parts
1. data - warehouse - spark.sql.warehouse.dir
2. metadata - catalog metastore - memory
from pyspark import SparkConf
from pyspark.sql import SparkSession
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
orderDf = spark.read.format("csv")\
.option("header",True)\
.option("inferSchema",True)\
.option("path","/Users/trendytech/Desktop/data/orders.csv")\
.load()
orderDf.write.format("csv")\
.mode("overwrite")\
.saveAsTable("orders1")
========
from pyspark import SparkConf
from pyspark.sql import SparkSession
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
orderDf = spark.read.format("csv")\
.option("header",True)\
.option("inferSchema",True)\
.option("path","/Users/trendytech/Desktop/data/orders.csv")\
.load()
spark.sql("create database if not exists retail")
orderDf.write.format("csv")\
.mode("overwrite")\
.saveAsTable("retail.orders2")
==========
from pyspark import SparkConf
from pyspark.sql import SparkSession
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).enableHiveSupport().getOrCreate()
orderDf = spark.read.format("csv")\
.option("header",True)\
.option("inferSchema",True)\
.option("path","/Users/trendytech/Desktop/data/orders.csv")\
.load()
spark.sql("create database if not exists retail")
orderDf.write.format("csv")\
.mode("overwrite")\
.saveAsTable("retail.orders3")
============
from pyspark import SparkConf
from pyspark.sql import SparkSession
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).enableHiveSupport().getOrCreate()
orderDf = spark.read.format("csv")\
.option("header",True)\
.option("inferSchema",True)\
.option("path","/Users/trendytech/Desktop/data/orders.csv")\
.load()
spark.sql("create database if not exists retail")
orderDf.write.format("csv")\
.mode("overwrite")\
.bucketBy(4,"order_customer_id")\
.sortBy("order_customer_id")\
.saveAsTable("retail.orders4")
============
Spark DF session 12
===================
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_extract
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
myregex = r'^(\S+) (\S+)\t(\S+)\,(\S+)'
lines_df = spark.read.text("/Users/trendytech/Desktop/data/orders_new.csv")
#lines_df.printSchema()
#lines_df.show()
final_df =
lines_df.select(regexp_extract('value',myregex,1).alias("order_id"),regexp_extract('value',myreg
ex,2).alias("date"),regexp_extract('value',myregex,3).alias("customer_id"),regexp_extract('value'
,myregex,4).alias("status"))
final_df.printSchema()
final_df.show()
final_df.select("order_id").show()
final_df.groupby("status").count().show()
===============
spark df session 13
==============
Column String
Column object
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
orderDf = spark.read.format("csv")\
.option("header",True)\
.option("inferSchema",True)\
.option("path","/Users/trendytech/Desktop/data/orders.csv")\
.load()
orderDf.select("order_id","order_date").show()
orderDf.select(col("order_id")).show()
==========
Spark DF Session 14
=====================
Creating our own user defined function is spark.
1. Column object expression -- the function won't be registered in catalog
2. SQL expression -- the function will be registered in catalog.
So in this case we can even use it with spark SQL.
if the age is greater than 18 we have to populate the 4th column named Adult with "Y"
else we need to populated the column with "N"
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
df = spark.read.format("csv")\
.option("inferSchema",True)\
.option("path","/Users/trendytech/Desktop/data/dataset1")\
.load()
df1 = df.toDF("name","age","city")
def ageCheck(age):
if(age > 18):
return "Y"
else:
return "N"
parseAgeFunction = udf(ageCheck,StringType())
df2 = df1.withColumn("adult",parseAgeFunction("age"))
df2.printSchema()
df2.show()
===========
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
df = spark.read.format("csv")\
.option("inferSchema",True)\
.option("path","/Users/trendytech/Desktop/data/dataset1")\
.load()
df1 = df.toDF("name","age","city")
def ageCheck(age):
if(age > 18):
return "Y"
else:
return "N"
spark.udf.register("parseAgeFunction",ageCheck,StringType())
for x in spark.catalog.listFunctions():
print(x)
df2 = df1.withColumn("adult",expr("parseAgeFunction(age)"))
df2.show()
============
Spark DF session 15
=====================
create the spark session
create a local list
create a dataframe from this local list and give column names
add a new column date1 with unix timestamp
add one more column with monotonically increasing id
drop the duplicates based on combination of 2 columns
drop the orderid column
sort based on order date
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
myList = [(1,"2013-07-25",11599,"CLOSED"),
(2,"2014-07-25",256,"PENDING_PAYMENT"),
(3,"2013-07-25",11599,"COMPLETE"),
(4,"2019-07-25",8827,"CLOSED")]
ordersDf = spark.createDataFrame(myList)\
.toDF("orderid","orderdate","customerid","status")
newDf = ordersDf\
.withColumn("date1",unix_timestamp(col("orderdate"))) \
.withColumn("newid", monotonically_increasing_id()) \
.dropDuplicates(["orderdate","customerid"])\
.drop("orderid")\
.sort("orderdate")
ordersDf.printSchema()
ordersDf.show()
newDf.show()
===========
Spark DF session 16
=====================
Aggregate transformations
1. Simple aggregations
2. Grouping aggregations
3. window aggregates
//simple aggregates
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
invoiceDF = spark.read
.format("csv")
.option("header",true)
.option("inferSchema",true)
.option("path","/Users/trendytech/Desktop/order_data.csv")
.load()
//column object expression
invoiceDF.select(
count("*").as("RowCount"),
sum("Quantity").as("TotalQuantity"),
avg("UnitPrice").as("AvgPrice"),
countDistinct("InvoiceNo").as("CountDistinct")).show()
//column string expression
invoiceDF.selectExpr(
"count(*) as RowCount",
"sum(Quantity) as TotalQuantity",
"avg(UnitPrice) as AvgPrice",
"count(Distinct(InvoiceNo)) as CountDistinct").show()
//spark SQL
invoiceDF.createOrReplaceTempView("sales")
spark.sql("select count(*),sum(Quantity),avg(UnitPrice),count(distinct(InvoiceNo)) from
sales").show
spark.stop()
}
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
invoiceDF = spark.read\
.format("csv")\
.option("header",True)\
.option("inferSchema",True) \
.option("path","/Users/trendytech/Desktop/data/order_data.csv") \
.load()
invoiceDF.select(
count("*").alias("RowCount"),
sum("Quantity").alias("TotalQuantity"),
avg("UnitPrice").alias("AvgPrice"),
countDistinct("InvoiceNo").alias("CountDistinct")).show()
invoiceDF.selectExpr(
"count(*) as RowCount",
"sum(Quantity) as TotalQuantity",
"avg(UnitPrice) as AvgPrice",
"count(Distinct(InvoiceNo)) as CountDistinct").show()
invoiceDF.createOrReplaceTempView("sales")
spark.sql("select count(*),sum(Quantity),avg(UnitPrice),count(distinct(InvoiceNo)) from
sales").show()
=============
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
invoiceDF = spark.read\
.format("csv")\
.option("header",True)\
.option("inferSchema",True) \
.option("path","/Users/trendytech/Desktop/data/order_data.csv") \
.load()
#column object expression
val summaryDF = invoiceDF.groupBy("Country","InvoiceNo")
.agg(sum("Quantity").alias("TotalQuantity"),
sum(expr("Quantity * UnitPrice")).alias("InvoiceValue"))
summaryDF.show()
#string expression
val summaryDf1 = invoiceDF.groupBy("Country","InvoiceNo")
.agg(expr("sum(Quantity) as TotalQunatity"),
expr("sum(Quantity * UnitPrice) as InvoiceValue"))
summaryDf1.show()
#spark SQL
invoiceDF.createOrReplaceTempView("sales")
spark.sql("""select country,InvoiceNo,sum(Quantity) as totQty,sum(Quantity * UnitPrice) as
InvoiceValue from sales group by country,InvoiceNo""").show()
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master","local[*]")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
invoiceDF = spark.read\
.format("csv")\
.option("header",True)\
.option("inferSchema",True) \
.option("path","/Users/trendytech/Desktop/data/order_data.csv") \
.load()
#column object expression
summaryDF = invoiceDF.groupBy("Country","InvoiceNo")\
.agg(sum("Quantity").alias("TotalQuantity"),
sum(expr("Quantity * UnitPrice")).alias("InvoiceValue"))
summaryDF.show()
#string expression
summaryDf1 = invoiceDF.groupBy("Country","InvoiceNo")\
.agg(expr("sum(Quantity) as TotalQunatity"),
expr("sum(Quantity * UnitPrice) as InvoiceValue"))
summaryDf1.show()
#spark SQL
invoiceDF.createOrReplaceTempView("sales")
spark.sql("""select country,InvoiceNo,sum(Quantity) as totQty,sum(Quantity * UnitPrice) as
InvoiceValue from sales group by country,InvoiceNo""").show()
==============
from pyspark import SparkConf
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import *
my_conf = SparkConf()
my_conf.set("spark.app.name", "my first application")
my_conf.set("spark.master", "local[*]")
spark = SparkSession.builder.config(conf=my_conf).getOrCreate()
invoiceDF = spark.read \
.format("csv") \
.option("header", True) \
.option("inferSchema", True) \
.option("path", "/Users/trendytech/Desktop/data/windowdata.csv") \
.load()
myWindow = Window.partitionBy("country")\
.orderBy("weeknum")\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
mydf = invoiceDF.withColumn("RunningTotal",sum("invoicevalue").over(myWindow))
mydf.show()
==========
spark2-shell --master yarn
rdd1 = sc.textFile("bigLogNew.txt")
rdd2 = rdd1.map(lambda x: (x.split(":")[0], x.split(":")[1]))
rdd3 = rdd2.groupByKey()
rdd4 = rdd3.map(lambda x: (x[0], len(x[1])))
rdd4.collect()
=========
import random
def randomGenerator():
return random.randint(1,60)
def myFunction(x):
if(x[0][0:4]=="WARN"):
return("WARN",x[1])
else:
return ("ERROR",x[1])
rdd1 = sc.textFile("bigLogNew.txt")
rdd2 = rdd1.map(lambda x:(x.split(":")[0] + str(randomGenerator()),x.split(":")[1]))
rdd3 = rdd2.groupByKey()
rdd4 = rdd3.map(lambda x: (x[0] , len(x[1])))
rdd4.cache()
rdd5 = rdd4.map(lambda x: myFunction(x))
rdd6 = rdd5.reduceByKey(lambda x,y: x+y)
rdd6.collect()
1
Week14: Apache Spark - Optimization Part2
Spark Optimization Session - 12
================================
Broadcast join can be used when we have 1 large table and 1 small table.
and we want to join these.
RDD (lower level API's)
=========================
spark2-shell --conf spark.dynamicAllocation.enabled=false --master yarn --num-executors 6
--executor-cores 2 --executor-memory 3G --conf spark.ui.port=4063
we will have 2 rdd's
one of them will be large
one of them will be smaller.
ERROR: Thu Jun 04 10:37:51 BST 2015
WARN: Sun Nov 06 10:37:51 GMT 2016
WARN: Mon Aug 29 10:37:51 BST 2016
ERROR: Thu Dec 10 10:37:51 GMT 2015
ERROR: Fri Dec 26 10:37:51 GMT 2014
ERROR: Thu Feb 02 10:37:51 GMT 2017
WARN: Fri Oct 17 10:37:51 BST 2014
ERROR: Wed Jul 01 10:37:51 BST 2015
WARN: Thu Jul 27 10:37:51 BST 2017
WARN: Thu Oct 19 10:37:51 BST 2017
the size of this data is 1.4 GB (Large dataset)
val rdd1 = sc.textFile("bigLogNew.txt")
Input:
ERROR: Thu Jun 04 10:37:51 BST 2015
WARN: Sun Nov 06 10:37:51 GMT 2016
WARN: Mon Aug 29 10:37:51 BST 2016
2
ERROR: Thu Dec 10 10:37:51 GMT 2015
ERROR: Fri Dec 26 10:37:51 GMT 2014
ERROR: Thu Feb 02 10:37:51 GMT 2017
WARN: Fri Oct 17 10:37:51 BST 2014
ERROR: Wed Jul 01 10:37:51 BST 2015
WARN: Thu Jul 27 10:37:51 BST 2017
WARN: Thu Oct 19 10:37:51 BST 2017
output:
(ERROR,Thu Jun 04 10:37:51 BST 2015)
(WARN, Sun Nov 06 10:37:51 GMT 2016)
MAP TRANFORMATION
val rdd2 = rdd1.map(x => (x.split(":")(0),x.split(":")(1))
rdd2 will have something like the below:
RDD2 is Large
(ERROR,Thu Jun 04 10:37:51 BST 2015)
(WARN, Sun Nov 06 10:37:51 GMT 2016)
RDD3 IS SMALL
("ERROR",0)
("WARN",1)
val rdd4 = rdd2.join(rdd3)
(ERROR,(Thu Jun 04 10:37:51 BST 2015,0))
(WARN, (Sun Nov 06 10:37:51 GMT 2016,1))
val a = Array(
("ERROR",0),
("WARN",1)
)
val rdd3 = sc.parallelize(a)
val a = Array(
3
("ERROR",0),
("WARN",1)
)
val keyMap = a.toMap
val bcast = sc.broadcast(keyMap)
val rdd1 = sc.textFile("bigLogNew.txt")
val rdd2 = rdd1.map(x => (x.split(":")(0),x.split(":")(1)))
val rdd3 = rdd2.map(x => (x._1,x._2,bcast.value(x._1)))
rdd3.saveAsTextFile("joinresults2")
Dataframes (Structured API's)
Spark Optimization Session - 13
================================
we will have 2 dataframes.
one of them will be large and other will be small.
orders is 2.6 GB
customers is around 900 kb
customers - customer_id (small)
orders - order_customer_id (large)
we will try to create dataframe for each of these.
spark2-shell --conf spark.dynamicAllocation.enabled=false --master yarn --num-executors 21
val customerDF =
spark.read.format("csv").option("header",true).option("inferSchema",true).option("path","custom
ers.csv").load
4
val orderDF =
spark.read.format("csv").option("header",true).option("inferSchema",true).option("path","orders.c
sv").load
spark.conf.set("spark.sql.autoBroadcastJoinThreshold",-1)
val joinedDF = customerDF.join(orderDF,customerDF("customer_id") ===
orderDF("order_customer_id"))
how many partitions are there in this big DF - 21
and for the small dataframe we have just a single partition.
whenever we do shuffling in case of structured API's we ge 200 partitions by default.
if I do groupBy on a Dataframe then we will get 200 partitions after the shuffling is done.
500 mb file..
rdd - groupBy then
before groupBy we had 4 partitions..
after groupBy the partitions remain the same that means 4 partitions.
Spark Optimization Session - 14
================================
2 things..
1. lets not infer the schema and save time.
2. use a broadcast join.
import org.apache.spark.sql.types._
val ordersSchema = StructType(
List(
StructField("order_id",IntegerType,true),
StructField("order_date",TimestampType,true),
StructField("order_customer_id",IntegerType,true),
StructField("order_status",StringType,true)
5
)
)
val customerDF =
spark.read.format("csv").option("header",true).option("inferSchema",true).option("path","custom
ers.csv").load
val orderDF =
spark.read.format("csv").schema(ordersSchema).option("header",true).option("path","orders.csv
").load
val joinedDF = customerDF.join(orderDF,customerDF("customer_id") ===
orderDF("order_customer_id"))
joinedDF.take(1000000)
increase the --driver-memory when you are collecting more data on driver machine.
otherwise you will get out of memory error.
--num-executors
--driver-memory
--executor-memory
--executor-cores
===============================
Spark Optimization Session - 15
===============================
1. demo on repartition vs coalesce when we want to decrease the number of partitions. -
coalesce
2. using spark-submit we will try to submit a job.
while submitting the jar we will see
a) client
6
b) cluster
you have a 1 gb file
and you create a rdd on top of it.
8 partitions..
a cluster of 4 nodes..
on each machine there might be 2 partitions..
rdd.coalesce(4)
when we are using coalesce then the resultant partitions can be of unequal sizes..
when we are using repartition then full shuffling is involved which is time consuming but we
know that the resultant partitions will be of similar size..
1. write your code in IDE like eclipse/intelliJ
2. bundle your code as a java jar file and export the jar.
3. move this jar file to your edge node/gateway machine.
scp wordcount.jar bigdatabysumit@gw02.itversity.com:/home/bigdatabysumit
wordcount.jar
I want to run LogLevelGrouping
bigLogNew.txt - 1.4 GB (11 partitions)
spark2-submit \
--class LogLevelGrouping \
--master yarn \
--deploy-mode cluster \
7
--executor-memory 3G \
--num-executors 4 \
wordcount.jar bigLogNew.txt
since the deploy mode is cluster mode that means our driver is running on one of the executors
residing in the cluster.
n02.itversity.com:37973 (cluster) => recommended
driver w
driver g w02.itversity.com:52754 (client)
driver is running on one of the worker node..
spark2-submit \
--class LogLevelGrouping \
--master yarn \
--executor-memory 3G \
--num-executors 4 \
wordcount.jar bigLogNew.txt
Spark Optimization Session - 16
===============================
Join opimizations
==================
we have 2 things which we work for:
1. we always want to avoid or minimize the shuffling.
2. we want to increase the parallelism.
1 large table and 1 small table - broadcast join
no shuffling required.
2 large tables - there is no way to completely avoid the shuffling.
but we can think of minimizing it.
To make sure we shuffle less data
===================================
8
try to filter the data as early as possible before the shuffle. cut down the size before the shuffle
phase. and do aggregation before.
filter & aggregation try to do before the shuffle.
Increase the Parallelism
=========================
100 node cluster each worker node having 64 GB RAM & 16 CPU Cores.
1. how many cpu cores we are grabbing.
50 executors with 5 cores each - 250 total cpu cores (at the max 250 tasks)
2. whenever shuffling happens in case of structured API's
we get 200 partitions after the shuffle.
spark.sql.shuffle.partitions
200 partitions - (at the max 200 tasks running in parallel)
3. in your data you have 100 distinct keys..
after the shuffle only at max 100 partitions will be full and other will be empty.
whenever the cardinality of data is low then some of the partitions will be empty.
(at the max 100 tasks will be running in parallel)
we grabbed 250 cpu cores (50 executors X 5)
shuffle partitions - 200
we have just 100 distinct keys..
min(Total CPU Cores , Number of Shuffle partitions, Number of Distinct Keys)
min(250, 200, 100)
100
9
you can try increasing the cardinality - salting.
we can increase the Number of shuffle partitions if required.
we can try grabbing more resources if feasible
Skew Partitions
================
customers table and orders table..
customers table - customer_id
orders table - order_customer_id
one of the Customer (APPLE) has placed a lot of orders..
10000000 - 10 million total orders
9000000 - 9 million orders are just for one customer(APPLE)
same key always goes to same partition..
200 shuffle partitions..
1 of the partition will be holding majority of data..
which ever task is working on this heavily loaded partition will be very slow.
and other tasks will complete quickly..
your job is dependent on the slowest performing task..
there should not be partition skew, else the job will be delayed.
orders - order_customer_id
customers - customer_id
bucketing & sorting on the datasets on join column
10
orders into 32 buckets.. - 32 files
customers into 32 buckets.. - 32 files
15 minutes to get the bucketing and sorting done.
when you try to join these 2 tables it will be very quick.
SMB join - shuffling
if you are not doing bucketing and sorting
and you are doing a plain join
30 minutes * 1000 = 30000 minutes
bucketed both the tables on the join column and sorted it.
60 minutes to do bucketing and sorting.
join - 5 minutes.. * 1000 = 5000 minutes
SMB join.
Connecting to External Data Source
===================================
in mysql we will have a table and we will try to create a dataframe by directly connecting from
that.
mysql-connector-java.jar
11
spark-shell --driver-class-path /usr/share/java/mysql-connector-java.jar
val connection_url ="jdbc:mysql://cxln2.c.thelab-240901.internal/retail_db"
val mysql_props = new java.util.Properties
mysql_props.setProperty("user","sqoopuser")
mysql_props.setProperty("password","NHkkP876rp")
val orderDF = spark.read.jdbc(connection_url,"orders",mysql_props)
orderDF.show()
Spark Optimization Session - 17
================================
Sort Aggregate vs Hash Aggregate
orders.csv - 2.6 GB
order_id,order_date,order_customer_id,order_status
1,2013-07-25 00:00:00.0,11599,CLOSED
2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
3,2013-07-25 00:00:00.0,12111,COMPLETE
4,2013-07-25 00:00:00.0,8827,CLOSED
5,2013-07-25 00:00:00.0,11318,COMPLETE
6,2013-07-25 00:00:00.0,7130,COMPLETE
7,2013-07-25 00:00:00.0,4530,COMPLETE
8,2013-07-25 00:00:00.0,2911,PROCESSING
9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT
we want to find out the number of orders which are placed by each customer in each month.
grouping based on order_customer_id and Month - count of number of orders.
spark2-shell --conf spark.dynamicAllocation.enabled=false --master yarn --num-executors 11
--conf spark.ui.port=4063
12
val orderDF =
spark.read.format("csv").option("inferSchema",true).option("header",true).option("path","orders.c
sv").load
orderDF.createOrReplaceTempView("orders")
spark.sql("select * from orders").show
spark.sql("select order_customer_id, date_format(order_date, 'MMMM') orderdt, count(1) cnt,
first(date_format(order_date,'M')) monthnum from orders group by order_customer_id, orderdt
order by cast(monthnum as int)").show
It took 3.9 minutes to complete this query
spark.sql("select order_customer_id, date_format(order_date, 'MMMM') orderdt, count(1) cnt,
first(cast(date_format(order_date,'M') as int)) monthnum from orders group by
order_customer_id, orderdt order by monthnum").show
It took 1.2 minutes to complete this query
Spark Optimization Session - 18
================================
spark.sql("select order_customer_id, date_format(order_date, 'MMMM') orderdt, count(1) cnt,
first(date_format(order_date,'M')) monthnum from orders group by order_customer_id, orderdt
order by cast(monthnum as int)").explain
It took 3.9 minutes to complete this query - sort aggregate
spark.sql("select order_customer_id, date_format(order_date, 'MMMM') orderdt, count(1) cnt,
first(cast(date_format(order_date,'M') as int)) monthnum from orders group by
order_customer_id, orderdt order by monthnum").explain
It took 1.2 minutes to complete this query - hash aggregate
2 questions
============
1. difference between sort aggregate and hash aggregate
13
2. why in query1 it used sort aggregate
and why in query2 it used hash aggregate
sort aggregate
===============
customer_id:month value
1024:january 1 "1"
1024:january 1 "1"
1024:january 1 "1"
1024:january 1 "1"
1025:january 1 "1"
1025:january 1 "!"
first the data is sorted based on the grouping columns.
1024:january ,{1,1,1,1,1}
sorting of data takes time..
2000
O(nlogn)
1000 * log(1000)
1000 * 10 = 10000
2000 * 11 = 22000
sort aggregate takes lot of time when the data grows..
1024:january 1 "1"
1024:january 1 "1"
1024:january 1 "1"
1025:january 1 "1"
1025:january 1 "!"
Hash Aggregate
===============
hash table
14
============
customer_id:month value
1024:january 3 "1"
1025:january 2 "1"
no sorting is required..
additional memory is required to have the hashtable kind of structure.
1000 rows...
sort aggregate = O(nlogn) = 1000 * 10 = 10000 operations
hash aggregate = O(n) = 1000
this hash table kind of structure is not a part of container..
rather this additional memory is grabbed as part of off heap memory..
this memory is not the part of your jvm..
question 2: why in the first query it used sort aggregate and why in second query it used hash
aggregate..
slower - sort aggregate
======
spark.sql("select order_customer_id, date_format(order_date, 'MMMM') orderdt, count(1) cnt,
first(date_format(order_date,'M')) monthnum from orders group by order_customer_id,
orderdt").explain
faster - hash aggregate
=======
spark.sql("select order_customer_id, date_format(order_date, 'MMMM') orderdt, count(1) cnt,
first(cast(date_format(order_date,'M') as int)) monthnum from orders group by
order_customer_id, orderdt").explain
15
customer_id:month value
1024:january 3 1
month number was string
string is immutable
when we are using hash aggregate we should have mutable types in the values
Spark Optimization Session - 19
================================
Catalyst optimizer
Structured API's (DF, DS, Spark SQL) perform better than Raw RDD's
catalyst optimizer will optimize the execution plan for Structured API's
Rule Based Optimization.
Many rules are already available. Also if we want we can add our own optimization rules.
Students.csv - 60 mb
student_id,exam_center_id,subject,year,quarter,score,grade
1,1,Math,2005,1,41,D
1,1,Spanish,2005,1,51,C
1,1,German,2005,1,39,D
1,1,Physics,2005,1,35,D
1,1,Biology,2005,1,53,C
1,1,Philosophy,2005,1,73,B
1,1,Modern Art,2005,1,32,E
1,1,History,2005,1,43,D
1,1,Geography,2005,1,54,C
16
val df1 =
spark.read.format("csv").option("header",true).option("inferSchema",true).option("path","/Users/t
rendytech/Desktop/students.csv").load
df1.createOrReplaceTempView("students")
spark.sql("select * from students").show
1. Parsed Logical Plan - un-resolved
our query is parsed and we get a parsed logical plan - unresolved
it checks for any of the syntax errors.
syntax is correct
2. Resolved/Analysed Logical plan..
it will try to resolve the table name the column names etc..
if the columnname or table name is not available then we will get analysis exception.
if we have referred to the correct columnnames and table name
3. Optimized Logical Plan - catalyst optimizer.
filter push down
combining of filters..
combining of projections
There are many such rules which are already in place.
If we want we can add our own rules in the catalyst optimizer.
consider you are doing a Aggregate.
as per the logical plan lets say it says we have to do Aggregate.
in physical plan..
17
physical plan1
===============
sortAggregate
physical plan2
===============
HashAggregate
It will select the physical plan which is the most optimized one with minimum cost.
This selected physical plan is converted to Lower Level API's
RDD code.
Spark Optimization Session - 20
================================
student_id,exam_center_id,subject,year,quarter,score,grade
1,1,Math,2005,1,41,D
1,1,Spanish,2005,1,51,C
1,1,German,2005,1,39,D
1,1,Physics,2005,1,35,D
1,1,Biology,2005,1,53,C
1,1,Philosophy,2005,1,73,B
1,1,Modern Art,2005,1,32,E
1,1,History,2005,1,43,D
1,1,Geography,2005,1,54,C
a*b
if b is 1 then return a
Catalyst optimizer
RDD vs Structured API's
Parsed Logical Plan - Unresolved
Analysed/Resolved Logical Plan
18
Optimized Logical Plan
Physical Plan
val df1 = spark.read.format("csv").option("header",
true).option("inferSchema",true).option("path","/Users/trendytech/Desktop/students.csv").load
df1.createOrReplaceTempView("students")
spark.sql("select student_id from (select student_id, exam_center_id from students) where
student_id <5").explain(true)
spark.sql("select student_id,sum(score) from (select student_id, exam_center_id,score from
students where exam_center_id=5) where student_id < 5 group by student_id ").explain(true)
http://localhost:4040/
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.expressions.Multiply
import org.apache.spark.sql.catalyst.expressions.Literal
object MultiplyOptimizationRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case Multiply(left,right) if right.isInstanceOf[Literal] &&
right.asInstanceOf[Literal].value.asInstanceOf[Integer] == 1 =>
println("optimization of one applied")
left
}
}
spark.experimental.extraOptimizations = Seq(MultiplyOptimizationRule)
TRENDY TECH
Week 14 Scala – PySpark equivalent programs
Week 14 is based on optimization where we need to allocate resources and hence, we
are using shell prompt on cluster to write programs.
Generalized changes that are required in every program
1. To start cmd prompt for PySpark. We PySpark instead of scala-shell.
2. Remove all val, var keyword as python does not have val and var types.
3. Anonymous functions are replaced with lambda in python.
4. Comment is given using # in python instead of // in scala
Note
1. Best practice is to use your own itversity hdfs location in the program for input and
output files. You can also use Linux root as shown in video.
2. There are be many ways to get the output for particular problem, we are showcasing
one way.
3. Changes are highlighted in yellow.
WEEK 14 – Spark Optimization Part 2 TRENDY TECH
TRENDY TECH
Problem Statement: We are creating array as one source and second source is a file. We are going to do join on both the
sources.
Solution:
Scala Spark Program PySpark Program
spark2-shell --conf spark.dynamicAllocation.enabled=false --master PySpark --conf spark.dynamicAllocation.enabled=false --master yarn --
yarn --num-executors 6 --executor-cores 2 --executor-memory 3G -- num-executors 6 --executor-cores 2 --executor-memory 3G --conf
conf spark.ui.port=4063 spark.ui.port=4063
val rdd1 = sc.textFile("bigLogNew.txt") rdd1=sc.textFile("/user/itv000001/bigLog.txt")
rdd1.getNumPartitions rdd1.getNumPartitions
val rdd2 = rdd1.map(x => (x.split(":")(0),x.split(":")(1)) rdd2 = rdd1.map(lambda x : (x.split(":")[0],x.split(":")[1]))
val a = Array(("ERROR",0),("WARN",1)) a = {"ERROR":0, "WARN":1}
val rdd3 = sc.parallelize(a) rdd3 = sc.parallelize(a)
val rdd4 = rdd2.join(rdd3) rdd4 = rdd2.join(rdd3)
rdd4.saveAsTextFile("joinResults1") rdd4.saveAsTextFile("/user/itv000001/joinResult1")
Specific changes that are required in above program
1. Replace () with [] in python for accessing array
2. Array in scala is changed to dict (Dictionary) in python. Variations are possible, we have showed one way.
WEEK 14 – Spark Optimization Part 2 TRENDY TECH
TRENDY TECH
Problem Statement: We are creating array as one source and second source is a file. We are going to do join on both the
sources. In above program we did normal join and, in this program, we will use broadcast join.
Solution:
Scala Spark Program PySpark Program
spark2-shell --conf spark.dynamicAllocation.enabled=false --master PySpark --conf spark.dynamicAllocation.enabled=false --master yarn --
yarn --num-executors 6 --executor-cores 2 --executor-memory 3G -- num-executors 6 --executor-cores 2 --executor-memory 3G --conf
conf spark.ui.port=4063 spark.ui.port=4063
val a = Array(("ERROR",0),("WARN",1)) a = {"ERROR":0, "WARN":1}
val rdd3 = sc.parallelize(a) # not required
val keyMap = a.toMap # not required
val bcast = sc.broadcast(keyMap) bcast = sc.broadcast(a)
val rdd1 = sc.textFile("bigLogNew.txt") rdd1=sc.textFile("/user/itv000001/bigLog.txt")
val rdd2 = rdd1.map(x => (x.split(":")(0),x.split(":")(1))) rdd2 = rdd1.map(lambda x : (x.split(":")[0],x.split(":")[1]))
val rdd4 = rdd2.map(x => (x._1,x._2,bcast.value(x._1))) rdd4 = rdd2.map(lambda x : (x[0], x[1], bcast.value[x[0]]))
rdd4.saveAsTextFile("joinresults2") rdd4.saveAsTextFile("/user/itv000001/joinResult2")
Specific changes that are required in above program
1. Array in scala is changed to dict (Dictionary) in python. Variations are possible, we have showed one way.
2. Second and third line of scala program is not required, hence fourth line broadcast method accepts a as input
3. Replace () with [] in python for accessing array
4. Tuples in python is 0 index and accessed with []
WEEK 14 – Spark Optimization Part 2 TRENDY TECH
TRENDY TECH
Problem Statement: Join using two data frames
Solution:
Scala Spark Program PySpark Program
spark2-shell --conf spark.dynamicAllocation.enabled=false --master yarn -- PySpark --conf spark.dynamicAllocation.enabled=false --master
num-executors 21 yarn --num-executors 21
val customerDF = customerDF = spark.read.format("csv").option("header",True)\
spark.read.format("csv").option("header",true).option("inferSchema",true) .option("inferSchema",True)
.option("path","customers.csv").load .option("path","/user/itv000173/customers.csv").load()
val orderDF = orderDF = spark.read.format("csv").option("header",True).\
spark.read.format("csv").option("header",true).option("inferSchema",true) option("inferSchema",True)
.option("path","orders.csv").load .option("path","/user/itv000173/orders.csv").load()
spark.conf.set("spark.sql.autoBroadcastJoinThreshold",-1) spark.conf.set("spark.sql.autoBroadcastJoinThreshold",-1)
val joinedDF = customerDF.join(orderDF,customerDF("customer_id") === joinedDF = customerDF.join(orderDF,customerDF["customer_id"]
orderDF("order_customer_id")) == orderDF["order_customer_id"])
joinedDF.write.csv("output1") joinedDF.write.csv("/user/itv000173/output1")
Specific changes that are required in above program
1. Replace true with True
2. Replace load with load()
3. Replace === with ==
WEEK 14 – Spark Optimization Part 2 TRENDY TECH
TRENDY TECH
Problem Statement: Join using two data frames and providing orders schema
Solution:
Scala Spark Program PySpark Program
spark2-shell --conf spark.dynamicAllocation.enabled=false -- PySpark --conf spark.dynamicAllocation.enabled=false --master yarn --num-
master yarn --num-executors 21 executors 21
import org.apache.spark.sql.types._ from PySpark.sql.types import StructType, StructField, IntegerType,
TimestampType, StringType
val ordersSchema = StructType(
List( ordersSchema = StructType([StructField("order_id",IntegerType(),True),
StructField("order_id",IntegerType,true), StructField("order_date",TimestampType(),True),
StructField("order_date",TimestampType,true), StructField("order_customer_id",IntegerType(),True),
StructField("order_customer_id",IntegerType,true), StructField("order_status",StringType(),True)
StructField("order_status",StringType,true)) ]
) )
val customerDF = customerDF =
spark.read.format("csv").option("header",true) spark.read.format("csv").option("header",True).option("inferSchema",True).\
.option("inferSchema",true).option("path","custom option("path","/user/itv000173/customers.csv").load()
ers.csv").load
val orderDF = orderDF =
spark.read.format("csv").schema(ordersSchema) spark.read.format("csv").schema(ordersSchema).option("header",True).\
.option("header",true).option("path","orders.csv ").load option("path","/user/itv000173/orders.csv").load()
val joinedDF = joinedDF = customerDF.join(orderDF,customerDF.customer_id ==
customerDF.join(orderDF,customerDF("customer_id") === orderDF.order_customer_id)
orderDF("order_customer_id"))
joined.write.csv("output21") joinedDF.write.csv("/user/itv000173/output21")
WEEK 14 – Spark Optimization Part 2 TRENDY TECH
TRENDY TECH
//----------call take so that it will bring data to driver and may
raise OOM #----------call take so that it will bring data to driver and may raise OOM
joinedDF.take(1000000) joinedDF.take(1000000)
//----------increase driver memory and call take again , now no
OOM error #----------increase driver memory and call take again , now no OOM error
spark2-shell --conf spark.dynamicAllocation.enabled=false -- PySpark --conf spark.dynamicAllocation.enabled=false --master yarn --num-
master yarn --num-executors 21 --driver-memory 4G executors 21 --driver-memory 4G
joinedDF.take(1000000)
joinedDF.take(1000000)
Specific changes that are required in above program
1. Replace appropriate imports in python.
2. Replace List() with []
3. Replace true with True
4. Replace load with load(), same for IntegerType(), TimestampType(), StringType()
5. Replace === with ==
6. Replace customerDF("customer_id") === orderDF("order_customer_id") with customerDF.customer_id ==
orderDF.order_customer_id)
WEEK 14 – Spark Optimization Part 2 TRENDY TECH
TRENDY TECH
Problem Statement: try repartition and coalesce
Solution:
Scala Spark Program PySpark Program
val rdd1 = sc.textFile("bigLogFinal.txt") rdd1 = sc.textFile("bigLogFinal.txt")
rdd1.getNumPartitions rdd1.getNumPartitions
val rdd2 = rdd1.repartition(6) rdd2 = rdd1.repartition(6)
rdd2.count rdd2.count
val rdd2 = rdd1.coalesce(6) rdd2 = rdd1.coalesce(6)
rdd2.count rdd2.count
Specific changes that are required in above program
1. Remove all val keyword
WEEK 14 – Spark Optimization Part 2 TRENDY TECH
TRENDY TECH
Problem Statement: Execute Code in production. Create jar and execute using spark-submit in cluster mode. Program is same
as week13 except few changes mentioned in video
Solution:
Scala Spark Program PySpark Program
spark2-submit \ spark2-submit \
--class LogLevelGrouping \ --class LogLevelGrouping \
--master yarn \ --master yarn \
--deploy-mode cluster \ --deploy-mode cluster \
--executor-memory 3G \ --executor-memory 3G \
--num-executors 4 \ --num-executors 4 \
wordcount.jar bigLogNew.txt LogLevelGrouping.py bigLogNew.txt
Specific changes that are required in above program
1. –class is not required, remove it
2. In python we execute python file directly
WEEK 14 – Spark Optimization Part 2 TRENDY TECH
TRENDY TECH
Problem Statement: Execute Code in production. Create jar and execute using spark-submit in local mode. Program is same as
week13 except few changes mentioned in video
Solution:
Scala Spark Program PySpark Program
spark2-submit \ spark2-submit \
--class LogLevelGrouping \ --class LogLevelGrouping \
--master yarn \ --master yarn \
--executor-memory 3G \ --executor-memory 3G \
--num-executors 4 \ --num-executors 4 \
wordcount.jar bigLogNew.txt LogLevelGrouping.py bigLogNew.txt
Specific changes that are required in above program
1. –class is not required, remove it
2. In python we execute python file directly
WEEK 14 – Spark Optimization Part 2 TRENDY TECH
TRENDY TECH
Problem Statement: spark sql
Solution:
Scala Spark Program PySpark Program
spark2-shell --conf spark.dynamicAllocation.enabled=false -- PySpark --conf spark.dynamicAllocation.enabled=false --master yarn --num-
master yarn --num-executors 11 --conf spark.ui.port=4063 executors 11 --conf spark.ui.port=4063
val orderDF = orderDF = spark.read.format("csv").option("inferSchema",True)\
spark.read.format("csv").option("inferSchema",true) .option("header",True).option("path","/user/itv000001/orders.csv").load()
.option("header",true).option("path","orders.csv").load
orderDF.createOrReplaceTempView("orders") orderDF.createOrReplaceTempView("orders")
spark.sql("select * from orders").show spark.sql("select * from orders").show()
spark.sql("select order_customer_id, date_format(order_date, spark.sql("select order_customer_id, date_format(order_date, 'MMMM')
'MMMM') orderdt, count(1) cnt, orderdt, count(1) cnt, first(date_format(order_date,'M')) monthnum from
first(date_format(order_date,'M')) monthnum from orders group orders group by order_customer_id, orderdt order by cast(monthnum as
by order_customer_id, orderdt order by cast(monthnum as int)").show()
int)").show
//----------------------------change the cast from order by //----------------------------change the cast from order by
spark.sql("select order_customer_id, date_format(order_date, spark.sql("select order_customer_id, date_format(order_date, 'MMMM')
'MMMM') orderdt, count(1) cnt, orderdt, count(1) cnt, first(date_format(order_date,'M')) monthnum from
first(cast(date_format(order_date,'M') as int)) monthnum from orders group by order_customer_id, orderdt order by cast(monthnum as
orders group by order_customer_id, orderdt order by int)").show()
monthnum").show
Specific changes that are required in above program
1. Replace true with True
2. Replace load with load()
WEEK 14 – Spark Optimization Part 2 TRENDY TECH
TRENDY TECH
3. Replace show with show()
4. Spark sql is same in scala and python
Problem Statement: just add .explain to spark sql from above program
Solution:
Scala Spark Program PySpark Program
spark2-shell --conf spark.dynamicAllocation.enabled=false --master PySpark --conf spark.dynamicAllocation.enabled=false --master yarn --
yarn --num-executors 11 --conf spark.ui.port=4063 num-executors 11 --conf spark.ui.port=4063
spark.sql("select order_customer_id, date_format(order_date, 'MMMM') spark.sql("select order_customer_id, date_format(order_date, 'MMMM')
orderdt, count(1) cnt, first(date_format(order_date,'M')) monthnum from orderdt, count(1) cnt, first(date_format(order_date,'M')) monthnum from
orders group by order_customer_id, orderdt order by cast(monthnum as orders group by order_customer_id, orderdt order by cast(monthnum as
int)").explain int)").explain()
// It took 3.9 minutes to complete this query - sort aggregate // It took 3.9 minutes to complete this query - sort aggregate
spark.sql("select order_customer_id, date_format(order_date, 'MMMM') spark.sql("select order_customer_id, date_format(order_date, 'MMMM')
orderdt, count(1) cnt, first(cast(date_format(order_date,'M') as int)) orderdt, count(1) cnt, first(cast(date_format(order_date,'M') as int))
monthnum from orders group by order_customer_id, orderdt order by monthnum from orders group by order_customer_id, orderdt order by
monthnum").explain monthnum").explain()
// It took 1.2 minutes to complete this query - hash aggregate // It took 1.2 minutes to complete this query - hash aggregate
Specific changes that are required in above program
1. Replace explain with explain()
2. Spark sql is same in scala and python
WEEK 14 – Spark Optimization Part 2 TRENDY TECH
TRENDY TECH
Problem Statement: Connecting to external resources
Solution:
Scala Spark Program PySpark Program
spark-shell --driver-class-path /usr/share/java/mysql-connector-java.jar PySpark -jars /usr/share/java/mysql-connector-java.jar
val connection_url ="jdbc:mysql://cxln2.c.thelab- connection_url ="jdbc:mysql://cxln2.c.thelab-
240901.internal/retail_db" 240901.internal/retail_db"
orderDF = spark.read \
val mysql_props = new java.util.Properties .jdbc(connection_url, "orders",
mysql_props.setProperty("user","sqoopuser") properties={"user": "sqoopuser", "password": "NHkkP876rp"})
mysql_props.setProperty("password","NHkkP876rp")
val orderDF = spark.read.jdbc(connection_url,"orders",mysql_props) orderDF.show()
orderDF.show()
Specific changes that are required in above program
1. Giving properties is different
WEEK 14 – Spark Optimization Part 2 TRENDY TECH
Spark Streaming Session - 1
Real Time Processing
====================
whatever we have done till now is nothing but batch processing.
Batch Processing
=================
doing analysis on top of files.
your processing might take few minutes, few hours or few days.
Real time Processing
======================
we will have continuously flowing data and we have to calculate the results instantly.
credit card fraud detection
finding trending hashtag
track website failures using server logs..
when we talk about hadoop
HDFS + MR + YARN
MR (mapreduce) Processing.
Mapreduce can only handle batch jobs.
mapreduce cannot handle streaming or real time data.
How do we process real time streaming data?
Apache Spark.
Spark is a general purpose compute engine which can handle.
Batch + Streaming Data
IN spark we have a module called as Spark Streaming.
Spark Streaming Session - 2
============================
batch vs real time stream processing
mapreduce can handle only batch processing.
spark can handle streaming data as well.
Spark Streaming.
how is the data stored in spark?
RDD (resilient distributed dataset)
when we load the file
val baseRdd = sc.textFile("file path in hdfs")
if the file is of 500 mb & if the default block size in hdfs is 128 mb.
then our baseRDD will have 4 partitions.
when there is no concept of a static file then how do you visualize your rdd.
consider a water tap which is continuosly flowing.
tap is running for 1 hour
size of my stream is 1 hour
every 2 minutes you are filling one water balloon.
30 water balloons.
a new rdd is framed every 2 minutes.
we will have 30 rdds which are created.
some balloons might have more data than other balloons based on flow of water.
batch interval - 5 seconds
consider whatever 30 balloons that we have filled we are putting them in a tub.
(Dstream)
so basically
Dstream -> Rdd's -> Messages (entities)
1 -> 30 rdd's -> each rdd can have lot of messages.
we need to operate at Dstream level
underneath spark still works in batch style.
the batch size is small like 1 second.
spark's compute engine is a batch engine and not a streaming engine.
whenever batch size is very small example 1 second.
then we get a feeling that it's real time streaming.
Spark Streaming Session - 3
============================
In batch processing in Spark
1. Lower level Constructs (RDD)
2. Higher Level Constructs (Structured API's Dataframes, dataset and spark sql)
Stream Processing in Spark
we have both the lower level as well as higher level constructs.
1. Spark Streaming (RDD's) traditional way
2. Structured Streaming - newer thing - Dataframes
Lets talk about example of normal spark streaming
Producer will be an application which is giving you continuous data.
consumer will be our spark streaming application.
twitter will be producer.
spark streaming will be the consumer.
we will try to simulate a dummy producer.
there will be a terminal and whatever you type will be produced.
producer will write to a socket
and consumer will read from the same socket.
socket
=======
IP Address + Port number (localhost + 9998)
step 1: start a producer
we want to create a dummy producer where we continously type things.
nc -lk 9998
step 2: we have to start a consumer
spark streaming code which reads the data from the socket
spark-shell --master local[2]
sc is already available
we have spark context which is available
when we talk about spark streaming applications we require a spark streaming
context
//creating spark streaming context
val ssc = new StreamingContext(sc, Seconds(5))
//lines is a dstream
val lines = ssc.socketTextStream("localhost",9998)
//words is a transformed dstream
val words = lines.flatMap(x => x.split(" "))
val pairs = words.map(x => (x,1))
val wordCounts = pairs.reduceByKey((x,y) => x+y)
wordCounts.print()
ssc.start()
Spark Streaming Session - 4
============================
what is real time processing.
Dstream -> RDD's -> messages
word count streaming example
when calculating word count it was forgetting the state of previous rdds.
if I write hello 5 times
then (hello,5)
hello 2 times again
(hello,2)
it was giving output for each rdd individually.
when we talk about spark streaming. There are 2 kind of transformations.
1. stateless transformation
is the one which forgets the previous state. we perform operation on a single rdd
always.
2. stateful transformation
the one in which we do aggregation over more than one rdd.
when we talk about batch processing. we load the entire file as one single rdd.
batch processing is always stateless. there is no point of talking about stateful
transformation in case of batch processing.
when we talk about stateful transformations we have 2 choices:
lets consider you have a streaming application which runs for 6 hours.
batch interval size to be 5 minutes - a new rdd will be created every 5 minutes.
during the course of entire streaming application how many rdds will be created?
72 rdd's will be created in 6 hours.
1. consider all of the rdd's within a stream - consider all 72
2. you want to do operations over the last 30 minutes..
we will consider last 6 rdd's always.
Sum() is stateless and work for each rdd individually.
what if we want to get a running total?
in this case we should be using a stateful transformation.
we need to convert his normal rdd's into pair rdd's.
and add a dummy key to all these elements.
157
(k,1)
(k,5)
(k,7)
updateStateByKey() is a stateful transformation.
when we talk about stateless we just talk about 1 single rdd. - stateless
considering the entire stream we talked about including all rdds. - stateful
considering a few rdds (window) - stateful
lets say the batch interval is 10 seconds..
that means a new rdd will be created every 10 seconds.
3 things
=========
1. batch interval - 10 seconds
2. window size - 30 seconds
3. sliding interval - 20 seconds
countByWindow() - stateful transformation
Spark Streaming Session - 5
============================
//creating spark streaming context
val ssc = new StreamingContext(sc, Seconds(5))
//lines is a dstream
val lines = ssc.socketTextStream("localhost",9998)
//words is a transformed dstream
val words = lines.flatMap(x => x.split(" "))
val pairs = words.map(x => (x,1))
val wordCounts = pairs.reduceByKey((x,y) => x+y)
wordCounts.print()
ssc.start()
Spark Streaming Session - 6
============================
I want to calculate the frequency of each word across the entire stream...
stateful transformation..
updateStateByKey is a stateful transformation we can think of using.
this requires 2 steps:
1. Define a state to start with.
2. a function to update the state
big data is interesting big data is fun
(big,1)
(data,1)
(is,1)
(interesting,1)
(big,1)
(data,1)
(is,1)
(fun,1)
(big,1)
(big,1)
(data,1)
(data,1)
(is,1)
(is,1)
(fun,1)
rdd1
(big,{1,1}) newValues = {1,1} 2 previousState = 0 , (big,2)
(data,{1,1}) newValues = {1,1} previousState = 0 , (data,2)
(is,{1,1}) (is,2)
(fun,{1}) (fun,1)
rdd2
big data is vast
(big,1) newValues = {1} previousState = 2 , (big,3)
(data,1) (data,3)
(is,1) (is,3)
(vast,1) (vast,1)
when we talk about stateful transformations then we have to do checkpointing.
=====================================
Spark Streaming Session - 7
=============================
reduceByKey - stateless transformation - one rdd
updateStateByKey - stateful transformation , it considers the entire dstream from the
beginning to the end. - all rdd's
sliding window -
1. batch interval - the time in which each rdd is created.
if we have the batch interval as 2 seconds. that means after every 2 seconds a new
rdd will created.
2. window size - 10 seconds.. we are interested in the last 5 rdd's always.
3. Sliding interval. - 2 seconds..
after every 2 seconds one oldest rdd will go away and one new rdd will come in.
if this sliding interval is 4 seconds.
after every 4 seconds.. 2 oldest rdd's will go away and 2 new rdd will come in.
sliding interval has to be a integral multiple of batch interval.
window size should also be an integral multiple of batch size.
reduceByKeyAndWindow
hello,1
how,1
are,1
you,1
hello,1
reduceByKeyAndWindow transformation takes 4 parameters.
1. the summary function. (x,y) => x+y
2. the inverse function. (x,y) => x-y
3. the window size. Seconds(10)
4. the sliding interval. Seconds(2)
My batch interval is 2 seconds.
our problem statement is find the frequency of each word in the 10 seconds sliding
window.
when will we see the very first result - 10 seconds.
when do you see the second result. - after 12th second
Spark Streaming Session - 8
============================
reduceByKey - stateless
updateStateByKey - stateful (considers entire stream)
reduceByKeyAndWindow - stateful (sliding window) - pair rdd is required.
reduceByWindow - here pair rdd is not required
countByWindow - it will count the number of lines in the window.
Traditional spark streaming - lower level constructs
Dealing at the level of rdd's.
Structured API's are trending these days.
Spark Structured Streaming
TRENDY TECH
Week 15 Scala – PySpark equivalent programs
Week 15 is based on Spark streaming where we need real time stream. We will use
socket and file
Generalized changes that are required in every program
1. To start cmd prompt for PySpark. We write PySpark instead of scala-shell.
2. Remove all val, var keyword as python does not have val and var types.
3. Anonymous functions are replaced with lambda in python.
4. Comment is given using # in python instead of // in scala
Note
1. Best practice is to use your own itversity hdfs location in the program for input and
output files. You can also use Linux root as shown in video.
2. There are be many ways to get the output for particular problem, we are showcasing
one way.
3. Changes are highlighted in yellow.
4. Ncat is Linux utility. For windows, follow below steps.
WEEK 15 – Spark Streaming Part 1 TRENDY TECH
TRENDY TECH
Steps for streaming program execution on windows.
1. Download the Free Nmap Security Scanner for Linux/Mac/Windows Download nmap-stable setup and install
2. In code give localhost 9998
3. Run the code... It will give error because no port is listening ... That's ok
4. Open cmd – go to Nmap folder
5. ncat -lvp 9998
6. start typing words
7. Cross check in your program
WEEK 15 – Spark Streaming Part 1 TRENDY TECH
TRENDY TECH
Problem Statement: Write a real time word count program
Solution:
Scala Spark Program PySpark Program
Spark-shell –master local[2] PySpark –master local[2]
import org.apache.spark._ from PySpark import *
import org.apache.spark.streaming._ from PySpark.streaming import *
import org.apache.spark.streaming.StreamingContext._
sc.setLogLevel("ERROR")
//creating spark streaming context #creating spark streaming context
val ssc = new StreamingContext(sc, Seconds(2)) ssc = StreamingContext(sc, 2)
//lines is a dstream #lines is a dstream
val lines = ssc.socketTextStream("localhost",9998) lines = ssc.socketTextStream("localhost", 9998)
//words is a transformed dstream #words is a transformed dstream
val words = lines.flatMap(x => x.split(" ")) words = lines.flatMap(lambda x: x.split())
val pairs = words.map(x => (x,1)) pairs = words.map(lambda x: (x, 1))
val wordCounts = pairs. reduceByKey((x,y) => x+y) wordCounts = pairs.reduceByKey(lambda x, y: x + y)
wordCounts.print() wordCounts.pprint()
ssc.start() ssc.start()
Specific changes that are required in above program
1. In scala we give Seconds(2) whereas in python you can give directly 2
2. We use pprint in python. The pprint module provides a capability to “pretty-print” arbitrary Python data structures in a
well-formatted and more readable way.
WEEK 15 – Spark Streaming Part 1 TRENDY TECH
TRENDY TECH
Problem Statement: Write real time stateless word count program in IDE
Solution:
Scala Spark Program PySpark Program
Create word.scala Create word.py
import org.apache.spark.SparkContext from PySpark import *
import org.apache.spark.streaming.Seconds from PySpark.streaming import *
import org.apache.spark.streaming.StreamingContext sc =SparkContext("local[2]","APP")
sc.setLogLevel("ERROR")
object StreamingWordCount extends App{ #creating spark streaming context
val sc = new SparkContext("local[*]","wordcount") ssc = StreamingContext(sc, 2)
//creating spark streaming context #lines is a dstream
val ssc = new StreamingContext(sc, Seconds(5)) lines = ssc.socketTextStream("localhost", 9998)
//lines is a dstream #words is a transformed dstream
val lines = ssc.socketTextStream("localhost",9998) words = lines.flatMap(lambda x: x.split())
//words is a transformed dstream pairs = words.map(lambda x: (x, 1))
val words = lines.flatMap(x => x.split(" ")) wordCounts = pairs.reduceByKey(lambda x, y: x + y)
val pairs = words.map(x => (x,1)) wordCounts.pprint()
val wordCounts = pairs. reduceByKey((x,y) => x+y) ssc.start()
wordCounts.print() ssc.awaitTermination()
ssc.start()
ssc.awaitTermination()
}
Specific changes that are required in above program
1. In IDE you need to create SparkContext object
2. Last line we need to write code to wait for termination which is same.
3. Green color is the change from PySpark shell to IDE
4. Yellow highlight is change from scala to PySpark in shell
WEEK 15 – Spark Streaming Part 1 TRENDY TECH
TRENDY TECH
Problem Statement: Write a real time stateful word count program in IDE
Solution:
Scala Spark Program PySpark Program
Create word1.scala Create word1.py
import org.apache.spark.SparkContext from PySpark import *
import org.apache.spark.streaming.Seconds from PySpark.streaming import *
import org.apache.spark.streaming.StreamingContext
object StreamingWordCount extends App{
val sc = new SparkContext("local[*]","wordcount") sc =SparkContext("local[2]","APP")
//creating spark streaming context sc.setLogLevel("ERROR")
val ssc = new StreamingContext(sc, Seconds(5)) #creating spark streaming context
//lines is a dstream ssc = StreamingContext(sc, 2)
val lines = ssc.socketTextStream("localhost",9998) #lines is a dstream
lines = ssc.socketTextStream("localhost", 9998)
ssc.checkpoint(".")
def updatefunc(newValues:Seq[Int],previousState:Option[Int]): Option[Int]={ ssc.checkpoint(".")
val newCount= previousState.getOrElse(0) + newValues.sum def updatefunc(newValues, previousState):
Some(newCount) if previousState is None :
} previousState = 0
val words = lines.flatMap(x => x.split(" ")) return sum(newValues, previousState)
val pairs = words.map(x => (x,1))
val wordCounts = pairs.updateStateByKey(updatefunc) words = lines.flatMap(lambda x: x.split())
wordCounts.print() pairs = words.map(lambda x: (x, 1))
ssc.start() wordCounts = pairs.updateStateByKey(updatefunc)
ssc.awaitTermination() wordCounts.pprint()
} ssc.start()
ssc.awaitTermination()
Specific changes that are required in above program
1. updateFunc function definition is changed
2. Yellow highlight is change in previous and this scala program
3. Green highlight is change between scala to PySpark.
WEEK 15 – Spark Streaming Part 1 TRENDY TECH
TRENDY TECH
Problem Statement: Write a real time stateful word count program using sliding window in IDE
Solution:
Scala Spark Program PySpark Program
Create word1.scala Create word1.py
import org.apache.spark.SparkContext from PySpark import *
import org.apache.spark.streaming.Seconds from PySpark.streaming import *
import org.apache.spark.streaming.StreamingContext sc =SparkContext("local[2]","APP")
sc.setLogLevel("ERROR")
object StreamingWordCount extends App{ #creating spark streaming context
val sc = new SparkContext("local[*]","wordcount") ssc = StreamingContext(sc, 2)
//creating spark streaming context sss.checkpoint(".")
val ssc = new StreamingContext(sc, Seconds(5)) #lines is a dstream
//lines is a dstream lines = ssc.socketTextStream("localhost", 9998)
val lines = ssc.socketTextStream("localhost",9998) #words is a transformed dstream
ssc.checkpoint(".") wordCounts = lines.flatMap(lambda x: x.split()) \
//words is a transformed dstream .words.map(lambda x: (x, 1)) \
val wordCounts = lines.flatMap(x => x.split(" ")) .reduceByKeyAndWindow(lambda x, y: int(x) + int(y), lambda x, y: int(x) -
.map(x => (x,1)) int(y), 10, 2)
.reduceByKeyAndWindow((x,y)=>x+y,(x,y)=>x-y,Seconds(10),Seconds(2))
wordCounts.print() wordCounts.pprint()
ssc.start() ssc.start()
ssc.awaitTermination() ssc.awaitTermination()
}
// show filter #add filter to print count > 2
.filter(x => x._2>0) .filter(lambda x:x[1]>2)
Specific changes that are required in above program
1. We are converting x and y into int and seconds are given directly.
WEEK 15 – Spark Streaming Part 1 TRENDY TECH
TRENDY TECH
Problem Statement: Write a real time stateful word count program using sliding window and named function in IDE
Solution:
Scala Spark Program PySpark Program
import org.apache.spark.SparkContext from PySpark import *
import org.apache.spark.streaming.Seconds from PySpark.streaming import *
import org.apache.spark.streaming.StreamingContext sc =SparkContext("local[2]","APP")
sc.setLogLevel("ERROR")
object StreamingWordCount extends App{ #creating spark streaming context
val sc = new SparkContext("local[*]","wordcount") ssc = StreamingContext(sc, 2)
//creating spark streaming context sss.checkpoint(".")
val ssc = new StreamingContext(sc, Seconds(5)) #lines is a dstream
//lines is a dstream lines = ssc.socketTextStream("localhost", 9998)
val lines = ssc.socketTextStream("localhost",9998)
ssc.checkpoint(".") def summaryFuct(x,y):
def summaryFuct(x:Int, y:Int)={x+y} return x + y
def inverseFuct(x:Int, y:Int)={x-y} def inverseFuct(x, y):
return x - y
//words is a transformed dstream
val wordCounts = lines.flatMap(x => x.split(" ")) #words is a transformed dstream
.map(x => (x,1)) wordCounts = lines.flatMap(lambda x: x.split()) \
.reduceByKeyAndWindow(summaryFuct(_,_),inverseFuct(_,_),Seconds(10),Seconds(2)) .words.map(lambda x: (x, 1)) \
.filter(x => x._2>0) .reduceByKeyAndWindow(summaryFuct, inverseFuct, 10, 2) \
.filter(lambda x:x[1]>0)
wordCounts.print()
ssc.start() wordCounts.pprint()
ssc.awaitTermination() ssc.start()
} ssc.awaitTermination()
Specific changes that are required in above program
WEEK 15 – Spark Streaming Part 1 TRENDY TECH
TRENDY TECH
1. Function definition is different in python. We don’t specify datatype for parameters. Also, while calling function, just
give function name.
WEEK 15 – Spark Streaming Part 1 TRENDY TECH
TRENDY TECH
Problem Statement: Write a real time stateful word count program using sliding window and named function in IDE.
Implement reduceByWindow method which does not need pair RDD
Solution:
Scala Spark Program PySpark Program
import org.apache.spark.SparkContext from PySpark import *
import org.apache.spark.streaming.Seconds from PySpark.streaming import *
import org.apache.spark.streaming.StreamingContext sc =SparkContext("local[2]","APP")
sc.setLogLevel("ERROR")
object StreamingWordCount extends App{ #creating spark streaming context
val sc = new SparkContext("local[*]","wordcount") ssc = StreamingContext(sc, 2)
//creating spark streaming context sss.checkpoint(".")
val ssc = new StreamingContext(sc, Seconds(5)) #lines is a dstream
//lines is a dstream lines = ssc.socketTextStream("localhost", 9998)
val lines = ssc.socketTextStream("localhost",9998)
ssc.checkpoint(".")
def summaryFuct(x:String, y: String)={ (x.toInt + y.toInt).toString()} def summaryfuct(x, y):
def inverseFuct(x: String, y: String)= { (x.toInt - y.toInt).toString()} return str((int(x)+int(y)))
//words is a transformed dstream
val wordCounts = lines.flatMap(x => x.split(" ")) def inversefuct(x, y):
.map(x => (x,1)) return str((int(x)-int(y)))
.reduceByWindow(summaryFuct(_,_),inverseFuct(_,_),Seconds(10),Seconds(2))
.filter(x => x._2>0)
wordCounts= lines.reduceByWindow(summaryfuct, inversefuct, 10, 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination() wordCounts.pprint()
} ssc.start()
ssc.awaitTermination()
Specific changes that are required in above program
1. reduceByWindow does not need pair RDD hence we don’t need flapMap and map
WEEK 15 – Spark Streaming Part 1 TRENDY TECH
TRENDY TECH
Problem Statement: Write a real time program to count number of lines in window.
Solution:
Scala Spark Program PySpark Program
import org.apache.spark.SparkContext from PySpark import *
import org.apache.spark.streaming.Seconds from PySpark.streaming import *
import org.apache.spark.streaming.StreamingContext sc =SparkContext("local[2]","APP")
sc.setLogLevel("ERROR")
object StreamingWordCount extends App{ #creating spark streaming context
val sc = new SparkContext("local[*]","wordcount") ssc = StreamingContext(sc, 2)
//creating spark streaming context sss.checkpoint(".")
val ssc = new StreamingContext(sc, Seconds(2)) #lines is a dstream
//lines is a dstream lines = ssc.socketTextStream("localhost", 9998)
val lines = ssc.socketTextStream("localhost",9998)
ssc.checkpoint(".")
//words is a transformed dstream
val wordCounts = lines.countByWindow(Seconds(10),Seconds(2))
wordCounts= lines.countByWindow(10, 2)
wordCounts.print()
ssc.start() wordCounts.pprint()
ssc.awaitTermination() ssc.start()
} ssc.awaitTermination()
Specific changes that are required in above program
1. To specify seconds for interval in python, specify directly.
WEEK 15 – Spark Streaming Part 1 TRENDY TECH
TRENDY TECH
Week 16 Scala – PySpark equivalent programs
Week 16 is based on Spark structured streaming where we need real time stream. We
will use socket and file as source for streaming
Generalized changes that are required in every program
1. Remove all val, var keyword as python does not have val and var types.
2. Anonymous functions are replaced with lambda in python.
3. Comment is given using # in python instead of // in scala
4. To write multi line code in pyspark, end every line with \ except the last line.
Note
1. There are be many ways to get the output for particular problem, we are showcasing
one way.
2. Changes are highlighted in yellow.
3. Ncat is Linux utility. For windows, follow below steps.
WEEK 16 – Spark Streaming Part 2 TRENDY TECH
TRENDY TECH
Steps for streaming program execution on windows.
1. Download the Free Nmap Security Scanner for Linux/Mac/Windows Download nmap-stable setup and install
2. In code give localhost 9998
3. Run the code... It will give error because no port is listening ... That's ok
4. Open cmd – go to Nmap folder
5. ncat -lvp 9998
6. start typing words
7. Cross check in your program
WEEK 16 – Spark Streaming Part 2 TRENDY TECH
TRENDY TECH
Problem Statement: Write a real time word count program using spark structured stream
Solution:
Scala Spark Program PySpark Program
import org.apache.spark.sql.SparkSession from pyspark.sql import SparkSession
val spark = SparkSession.builder spark = SparkSession.builder \
.master("local[2]") .master("local[2]") \
.appName("My Streaming Application") .appName("My Streaming Application") \
.getOrCreate() .getOrCreate()
// read from the stream # read from the stream
val linesDf = spark.readStream linesDf = spark.readStream \
.format("socket") .format("socket") \
.option("host", "localhost") .option("host", "localhost") \
.option("port", "12345") .option("port", "12345") \
.load() .load() \
linesDf.printSchema() linesDf.printSchema()
// process # process
val wordsDf=linesDf.selectExpr("explode(split(value,' ')) as word") wordsDf=linesDf.selectExpr("explode(split(value,' ')) as word")
val countsDf =wordsDf.groupBy("word").count() countsDf =wordsDf.groupBy("word").count()
// write to the sink # write to the sink
val wordCountQuery = countsDf.writeStream wordCountQuery = countsDf.writeStream \
.format('console') .format('console') \
.outputMode('complete') .outputMode('complete') \
.option("checkpointLocation","checkpoint-location1") .option("checkpointLocation","checkpoint-location1") \
.start() .start()
wordCountQuery.awaitTermination() wordCountQuery.awaitTermination()
Add below two bold lines and run again, you can see the performance gain. NO change in pyspark
WEEK 16 – Spark Streaming Part 2 TRENDY TECH
TRENDY TECH
spark = SparkSession.builder spark = SparkSession.builder \
.master("local[2]") .master("local[2]") \
.appName("My Streaming Application") .appName("My Streaming Application") \
.config("spark.sql.shuffle.partitions",3) .config("spark.sql.shuffle.partitions",3) \
.config("spark.streaming.stopGracefullyOnShutdown","true" .config("spark.streaming.stopGracefullyOnShutdown","true") \
.getOrCreate() .getOrCreate()
Add below bold line to Set triggering time. Small difference in pyspark
wordCountQuery = countsDf.writeStream wordCountQuery = countsDf.writeStream \
.format('console') .format('console') \
.outputMode('complete') .outputMode('complete') \
.option("checkpointLocation","checkpoint-location2") .option("checkpointLocation","checkpoint-location2") \
.trigger(Trigger.ProcessingTime("5 seconds")) .trigger(processingTime='5 seconds') \
.start() .start()
Specific changes that are required in above program
1. Import is different
2. Need to remove all val keywords
3. Add \ at end of line to continue code to next line.
4. .trigger(Trigger.ProcessingTime("5 seconds")) in scala is changed to .trigger(processingTime='5 seconds') \ in pyspark
WEEK 16 – Spark Streaming Part 2 TRENDY TECH
TRENDY TECH
Problem Statement: Read json file and write json file using spark structured streaming
Solution:
Scala Spark Program PySpark Program
import org.apache.spark.sql.SparkSession from pyspark.sql import SparkSession
val spark = SparkSession.builder spark = SparkSession.builder \
.master("local[2]") .master("local[2]") \
.appName("My Streaming Application") .appName("My Streaming Application") \
.config("spark.sql.shuffle.partitions",3) .config("spark.sql.shuffle.partitions",3) \
.config("spark.streaming.stopGracefullyOnShutdown","true") .config("spark.streaming.stopGracefullyOnShutdown","true") \
.config("spark.sql.streaming.schemaInference","true") .config("spark.sql.streaming.schemaInference","true") \
.getOrCreate() .getOrCreate()
// read from file source # read from file source
val ordersDf = spark.readStream ordersDf = spark.readStream \
.format("json") .format("json") \
.option("path", "myinputfolder") .option("path", "myinputfolder") \
.load() .load() \
// process # process
val ordersDf.createOrReplaceTempView("orders") ordersDf.createOrReplaceTempView("orders")
val completeOrders =spark.sql("select * from orders where completeOrders =spark.sql("select * from orders where
order_status='COMPLETE'") order_status='COMPLETE'")
// write to the sink # write to the sink
val wordCountQuery = completeOrders.writeStream wordCountQuery = completeOrders.writeStream \
.format('json') .format('json') \
.outputMode('append') .outputMode('append') \
.option("path", "myoutputfolder") .option("path", "myoutputfolder") \
.option("checkpointLocation","checkpoint-location5") .option("checkpointLocation","checkpoint-location5") \
.trigger(Trigger.ProcessingTime("5 seconds")) .trigger(processingTime='30 seconds') \
WEEK 16 – Spark Streaming Part 2 TRENDY TECH
TRENDY TECH
.start() .start()
wordCountQuery.awaitTermination() wordCountQuery.awaitTermination()
Add bold line to set micro batch of one file. NO change in pyspark
val ordersDf = spark.readStream ordersDf = spark.readStream \
.format("json") .format("json") \
.option("path", "myinputfolder") .option("path", "myinputfolder") \
.option("maxFilesPerTrigger",1) .option("maxFilesPerTrigger",1) \
.load() .load()\
Specific changes that are required in above program
1. Import is different
2. Need to remove all val keywords
3. Add \ at end of line to continue code to next line.
4. .trigger(Trigger.ProcessingTime("5 seconds")) in scala is changed to .trigger(processingTime='5 seconds') \ in pyspark
WEEK 16 – Spark Streaming Part 2 TRENDY TECH
TRENDY TECH
Problem Statement: Write a real time tumbling window program to read json file.
Solution:
Scala Spark Program PySpark Program
import org.apache.spark.SparkConf from pyspark.sql import SparkSession
import org.apache.spark.sql.SparkSession from pyspark.sql.functions import from_json, window
import org.apache.log4j.Level from pyspark.sql.types import StructType, StructField, IntegerType ,
import org.apache.log4j.Logger StringType
import java.sql.Timestamp from pyspark.sql import functions as F
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
object TumblingDemo extends App {
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession.builder() spark = SparkSession.builder \
.master("local[2]") .master("local[2]") \
.appName("Tumbling") .appName("Tumbling") \
.config("spark.streaming.stopGracefullyOnShutdown","true") .config("spark.streaming.stopGracefullyOnShutdown", "true") \
.config("spark.sql.shuffle.partitions",3) .config("spark.sql.shuffle.partitions", 3) \
.getOrCreate() .getOrCreate()
//1. read # 1. read
val ordersDf = spark.readStream ordersDf = spark.readStream \
.format("socket") .format("socket") \
.option("host","localhost") .option("host", "localhost") \
.option("port","12345") .option("port", "12345") \
.load .load()
//define own schema instead of infering it # define own schema instead of infering it
val orderSchema= StructType(List( orderSchema = StructType([
WEEK 16 – Spark Streaming Part 2 TRENDY TECH
TRENDY TECH
StructField("order_id", IntegerType), StructField("order_id", IntegerType()),
StructField("order_date", StringType), StructField("order_date", StringType()),
StructField("order_customer_id", IntegerType), StructField("order_customer_id", IntegerType()),
StructField("order_status", StringType), StructField("order_status", StringType()),
StructField("amount", IntegerType), StructField("amount", IntegerType()),
)) ])
//2. process # 2. process
val valueDF = ordersDf.select(from_json(col("value"), valueDF = ordersDf.select(from_json(F.col("value"),
orderSchema).alias("value")) orderSchema).alias("value"))
val refinedOrderDF = valueDF.select("value.*") refinedOrderDF = valueDF.select("value.*")
val windowAggDF = refinedOrderDF windowAggDF = refinedOrderDF \
.groupBy(window(col("order_date"),"15 minute")) .groupBy(window(F.col("order_date"), "15 minute")) \
.agg(sum("amount") .alias("totalInvoice")) .agg(F.sum("amount").alias("totalInvoice"))
val opDf= windowAggDF.select("window.start","window.end","totalInvoice") opDf = windowAggDF.select("window.start", "window.end", "totalInvoice")
val ordersQuery = opDf.writeStream ordersQuery = opDf.writeStream \
.format("console") .format("console") \
.outputMode("update") .outputMode("update") \
.option("checkpointLocation","chk-Loc1") .option("checkpointLocation", "chk-Loc1") \
.trigger(Trigger.ProcessingTime("15 second")) .trigger(processingTime="15 second")\
.start() .start()
ordersQuery.awaitTermination() ordersQuery.awaitTermination()
Add bold line to use Watermark NO change in pyspark
val windowAggDF = refinedOrderDF windowAggDF = refinedOrderDF \
.withWatermark("order_date","30 minute") .withWatermark("order_date","30 minute") \
.groupBy(window(col("order_date"),"15 minute")) .groupBy(window(F.col("order_date"), "15 minute")) \
.agg(sum("amount") .agg(F.sum("amount").alias("totalInvoice"))
.alias("totalInvoice"))
WEEK 16 – Spark Streaming Part 2 TRENDY TECH
TRENDY TECH
Add bold argument to make the window sliding NO change in pyspark
windowAggDF = refinedOrderDF windowAggDF = refinedOrderDF \
.withWatermark("order_date","30 .withWatermark("order_date","30 minute") \
minute").groupBy(window(F.col("order_date"), "15 minute","1 minute")) .groupBy(window(F.col("order_date"), "15 minute","1 minute")) \
.agg(F.sum("amount").alias("totalInvoice")) .agg(F.sum("amount").alias("totalInvoice"))
Specific changes that are required in above program
1. Import is different
2. Need to remove all val keywords
3. Add \ at end of line to continue code to next line.
4. .trigger(Trigger.ProcessingTime("15 seconds")) in scala is changed to .trigger(processingTime='15 seconds') \ in pyspark
5. In pyspark some functions such as col and sum cannot be used directly hence we add from pyspark.sql import functions
as F and then use F.col or F.sum
6. List() is changed to []
7. Replace load with load(), same for IntegerType(),StringType()
8. Replace builder() to builder
WEEK 16 – Spark Streaming Part 2 TRENDY TECH
TRENDY TECH
Problem Statement: Write a real time join program using spark structured streaming
Solution:
Scala Spark Program PySpark Program
import org.apache.spark.sql.SparkSession from pyspark.sql import SparkSession
import org.apache.log4j.Level from pyspark.sql.functions import from_json, window
import org.apache.log4j.Logger from pyspark.sql.types import StructType, StructField, IntegerType,
import java.sql.Timestamp StringType, TimestampType
import org.apache.spark.sql.types.StructType from pyspark.sql import functions as F
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
val spark = SparkSession.builder() spark = SparkSession.builder \
.master("local[2]") .master("local[2]") \
.appName("join") .appName("join") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") .config("spark.streaming.stopGracefullyOnShutdown", "true") \
.config("spark.sql.shuffle.partitions", 3) .config("spark.sql.shuffle.partitions", 3) \
.getOrCreate() .getOrCreate()
// define own schema instead of infering it # define own schema instead of infering it
val impressionSchema = StructType(List( impressionSchema = StructType([
StructField("impressionID", StringType), StructField("impressionID", StringType()),
StructField("ImpressionTime", TimestampType), StructField("ImpressionTime", TimestampType()),
StructField("CampaignName", StringType), StructField("CampaignName", StringType()),
)) ])
val clickSchema = StructType(List( clickSchema = StructType([
StructField("clickID", StringType), StructField("clickID", StringType()),
StructField("ClickTime", TimestampType), StructField("ClickTime", TimestampType()),
)) ])
WEEK 16 – Spark Streaming Part 2 TRENDY TECH
TRENDY TECH
// read the stream # read the stream
val impressionsDf = spark.readStream impressionsDf = spark.readStream \
.format("socket") .format("socket") \
.option("host", "localhost") .option("host", "localhost") \
.option("port", "12342") .option("port", "12342") \
.load() .load()
val clicksDf = spark.readStream clicksDf = spark.readStream \
.format("socket") .format("socket") \
.option("host", "localhost") .option("host", "localhost") \
.option("port", "12343") .option("port", "12343") \
.load() .load()
// structure the data based on the schema defined - impressionDf # structure the data based on the schema defined - impressionDf
val valueDF1 = impressionsDf.select(from_json(col("value"), valueDF1 = impressionsDf.select(from_json(F.col("value"),
impressionSchema).alias("value")) impressionSchema).alias("value"))
val impressionDfNew = valueDF1.select("value.*") impressionDfNew = valueDF1.select("value.*")
// structure the data based on the schema defined - clickDf # structure the data based on the schema defined - clickDf
val valueDF2 = clicksDf.select(from_json(col("value"), valueDF2 = clicksDf.select(from_json(F.col("value"),
clickSchema).alias("value")) clickSchema).alias("value"))
val clickDfNew = valueDF2.select("value.*") clickDfNew = valueDF2.select("value.*")
// join condition #join condition
val joinExpr = impressionDfNew.col("ImpressionID") === joinExpr = impressionDfNew["ImpressionID"] == clickDfNew["clickID"]
clickDfNew.col("clickID")
#join type
// join type joinType="inner"
val joinType="inner"
// joining both the streaming data frames #joining both the streaming data frames
val joinedDf = impressionDfNew.join(clickDfNew,joinExpr,joinType) joinedDf = impressionDfNew.join(clickDfNew,joinExpr,joinType) \
.drop(clickDfNew.col("clickID")) .drop(clickDfNew["clickID"])
// output to the sink #output to the sink
val campaignQuery = joinedDf.writeStream campaignQuery = joinedDf.writeStream \
.format("console") .format("console") \
WEEK 16 – Spark Streaming Part 2 TRENDY TECH
TRENDY TECH
.outputMode("append") .outputMode("append") \
.option("checkpointLocation", "chk-Loc2") .option("checkpointLocation", "chk-Loc2") \
.trigger(processingTime="15 second") .trigger(processingTime="15 second")\
.start() .start()
campaignQuery.awaitTermination() campaignQuery.awaitTermination()
Add bold code to use watermark No change in pyspark except F.col
// structure the data based on the schema defined - impressionDf # structure the data based on the schema defined - impressionDf
valueDF1 = impressionsDf.select(from_json(col("value"), valueDF1 = impressionsDf.select(from_json(F.col("value"),
impressionSchema).alias("value")) impressionSchema).alias("value"))
impressionDfNew = impressionDfNew =
valueDF1.select("value.*").withWatermark("impressionTime","30 minute") valueDF1.select("value.*").withWatermark("impressionTime","30 minute")
// structure the data based on the schema defined - clickDf # structure the data based on the schema defined - clickDf
valueDF2 = clicksDf.select(from_json(col("value"), clickSchema).alias("value")) valueDF2 = clicksDf.select(from_json(F.col("value"),
clickDfNew = valueDF2.select("value.*").withWatermark("clickTime","30 clickSchema).alias("value"))
minute") clickDfNew = valueDF2.select("value.*").withWatermark("clickTime","30
minute")
Specific changes that are required in above program
1. Import is different
2. .trigger(Trigger.ProcessingTime("15 seconds")) in scala is changed to .trigger(processingTime='15 seconds') \ in pyspark
3. In pyspark some functions such as col cannot be used directly hence we add from pyspark.sql import functions as F and
then use F.col
4. List() is changed to []
5. Replace load with load(), same for IntegerType(),StringType()
6. Replace builder() to builder
7. In pyspark dataframe does not have col function , hence we give column name using []. Also Replace === with ==
impressionDfNew["ImpressionID"] == clickDfNew["clickID"]
WEEK 16 – Spark Streaming Part 2 TRENDY TECH