Skip to content

Commit 47898b6

Browse files
authored
Feature/scala code/ch04 deepak (#14)
* ch04-averagebykeyusereducebykey * ch04-averagebykeyusegroupbykey * ch04-averagebykeyusecombinebykey * ch04-averagebykeyusecombinebykey * ch04-DFMedianExact
1 parent 32abe23 commit 47898b6

File tree

5 files changed

+242
-10
lines changed

5 files changed

+242
-10
lines changed

code/chap04/scala/build.gradle

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,12 @@ repositories {
1515

1616
dependencies {
1717
implementation "org.scala-lang:scala-library:$scalaVersion"
18-
compile "org.scala-lang:scala-compiler:$scalaVersion"
1918
implementation "org.apache.spark:spark-core_$scalaClassifier:$sparkVersion"
2019
implementation "org.apache.spark:spark-sql_$scalaClassifier:$sparkVersion"
21-
implementation 'org.scalanlp:breeze_2.13:2.0.1-RC2'
20+
implementation "org.scalanlp:breeze_$scalaClassifier:2.0.1-RC2"
2221

2322
}
2423

2524
application {
2625
mainClass = project.hasProperty("mainClass") ? project.getProperty("mainClass") : "NULL"
27-
}
28-
29-
// To start the scala repl use the command ./gradlew repl --console plain --no-daemon
30-
task repl(type:JavaExec) {
31-
main = "scala.tools.nsc.MainGenericRunner"
32-
classpath = sourceSets.main.runtimeClasspath
33-
standardInput System.in
34-
args '-usejavacp'
3526
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package org.data.algorithms.spark.ch04
2+
3+
import org.apache.spark.sql.SparkSession
4+
5+
object AverageByKeyUseCombineByKey {
6+
def createSparkSession: SparkSession = {
7+
SparkSession.builder()
8+
.master("local[*]")
9+
.appName("AverageByKeyUseReduceByKey")
10+
.getOrCreate()
11+
}
12+
13+
def main(args: Array[String]): Unit = {
14+
//Create a new spark session
15+
val spark = createSparkSession
16+
val input = List(
17+
("k1", 1), ("k1", 2), ("k1", 3), ("k1", 4), ("k1", 5),
18+
("k2", 6), ("k2", 7), ("k2", 8),
19+
("k3", 10), ("k3", 12)
20+
)
21+
// build RDD<key, value>
22+
val rdd = spark.sparkContext.parallelize(input)
23+
/*
24+
Combined data structure (C) is a Tuple2(sum, count)
25+
3 functions needs to be defined:
26+
v --> C
27+
C, v --> C
28+
C, C --> C
29+
*/
30+
val sumCount = rdd.combineByKey(
31+
v=> (v,1),
32+
(C:(Int,Int),v:Int) => (C._1+v , C._2 + 1) ,
33+
(C1:(Int,Int),C2:(Int,Int)) => (C1._1+C2._1,C1._2+C2._2)
34+
)
35+
//show sum count
36+
println(s"sum_count = ${sumCount.collect().mkString("["," ,","]")}")
37+
/*
38+
[
39+
(k3, (22, 2)),
40+
(k2, (21, 3)),
41+
(k1, (15, 5))
42+
]
43+
*/
44+
45+
//find averages
46+
val avg = sumCount.mapValues(values => (values._1/values._2).toFloat)
47+
println(s"avg = ${avg.collect().mkString("["," ,","]")}")
48+
/*
49+
avg.collect()
50+
[
51+
(k3, 11.0),
52+
(k2, 7.0),
53+
(k1, 3.0)
54+
]
55+
*/
56+
//Done!
57+
spark.stop()
58+
}
59+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package org.data.algorithms.spark.ch04
2+
3+
import org.apache.spark.sql.SparkSession
4+
5+
object AverageByKeyUseGroupByKey {
6+
def createSparkSession: SparkSession = {
7+
SparkSession.builder()
8+
.master("local[*]")
9+
.appName("AverageByKeyUseReduceByKey")
10+
.getOrCreate()
11+
}
12+
13+
def main(args: Array[String]): Unit = {
14+
// create an instance of SparkSession
15+
val spark = createSparkSession
16+
val input = List(
17+
("k1", 1), ("k1", 2), ("k1", 3), ("k1", 4), ("k1", 5),
18+
("k2", 6), ("k2", 7), ("k2", 8),
19+
("k3", 10), ("k3", 12)
20+
)
21+
// build RDD<key, value>
22+
val rdd = spark.sparkContext.parallelize(input)
23+
//group (key, value) pairs by key
24+
val groupByKey = rdd.groupByKey()
25+
//show grouped by key
26+
println(s"grouped_by_key = ${(groupByKey.mapValues(values => values.toList).collect()).mkString("["," , ","]")}")
27+
/*
28+
[
29+
('k3', List(10, 12)),
30+
('k2', List(6, 7, 8)),
31+
('k1', List(1, 2, 3, 4, 5))
32+
]
33+
*/
34+
35+
//find averages
36+
val avg = groupByKey.mapValues(values => values.sum.toFloat / values.size.toFloat )
37+
println(s"avg = ${avg.collect().mkString("["," ,","]")}")
38+
/*
39+
avg.collect()
40+
[
41+
(k3, 11.0),
42+
(k2, 7.0),
43+
(k1, 3.0)
44+
]
45+
*/
46+
//Done!
47+
spark.stop()
48+
}
49+
50+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package org.data.algorithms.spark.ch04
2+
3+
import org.apache.spark.sql.SparkSession
4+
5+
object AverageByKeyUseReduceByKey {
6+
7+
def createSparkSession: SparkSession = {
8+
SparkSession.builder()
9+
.master("local[*]")
10+
.appName("AverageByKeyUseReduceByKey")
11+
.getOrCreate()
12+
}
13+
14+
def main(args: Array[String]): Unit = {
15+
//Create a new spark session
16+
val spark = createSparkSession
17+
val input = List(
18+
("k1", 1), ("k1", 2), ("k1", 3), ("k1", 4), ("k1", 5),
19+
("k2", 6), ("k2", 7), ("k2", 8),
20+
("k3", 10), ("k3", 12)
21+
)
22+
// build RDD<key, value>
23+
val rdd = spark.sparkContext.parallelize(input)
24+
// map each (key, value) into (key, (value, 1))
25+
val pairs = rdd.map( kv => (kv._1, (kv._2, 1)))
26+
/*
27+
pairs =
28+
[
29+
("k1", (1, 1)), ("k1", (2, 1)), ("k1", (3, 1)), ("k1", (4, 1)), ("k1", (5, 1)),
30+
("k2", (6, 1)), ("k2", (7, 1)), ("k2", (8, 1)),
31+
("k3", (10, 1)), ("k3", (12, 1))
32+
]
33+
34+
reduce by key:
35+
x = (sum1, count1)
36+
y = (sum2, count2)
37+
x + y --> (sum1+sum2, count1+count2)
38+
*/
39+
val sumCount = pairs.reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))
40+
println(s"sum_count = ${sumCount.collect().mkString("[", ", ", "]")}")
41+
/*
42+
[
43+
('k3', (22, 2)),
44+
('k2', (21, 3)),
45+
('k1', (15, 5))
46+
]
47+
48+
find averages
49+
v = (sum-of-values, count-of-values)
50+
v[0] = sum-of-values
51+
v[1] = count-of-values
52+
*/
53+
val avg = sumCount.mapValues(v => (v._1/v._2).toFloat)
54+
print(s"avg = ${avg.collect().mkString("[", ", ", "]")}")
55+
/*
56+
avg.collect()
57+
[
58+
('k3', 11),
59+
('k2', 7),
60+
('k1', 3)
61+
]
62+
*/
63+
64+
// done!
65+
spark.stop()
66+
}
67+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package org.data.algorithms.spark.ch04
2+
3+
import org.apache.spark.sql.functions.{col, collect_list, rand, udf}
4+
import org.apache.spark.sql.{DataFrame, SparkSession}
5+
6+
/*
7+
-----------------------------------------------------
8+
This program find the exact median per key.
9+
10+
------------------------------------------------------
11+
Note-1: print(), collect(), and show() are used
12+
for debugging and educational purposes only.
13+
14+
------------------------------------------------------
15+
Input Parameters:
16+
none
17+
-------------------------------------------------------
18+
19+
@author Deepak Kumar
20+
-------------------------------------------------------
21+
*/
22+
object DataframeMedianExact {
23+
24+
def createTestDataframe(sparkSession: SparkSession, numOfKeys: Int, numOfRows: Int): DataFrame = {
25+
val key = (col("id") % numOfKeys).alias("key")
26+
val value = (rand(41)+key * numOfKeys).alias("value")
27+
val df = sparkSession.range(0,numOfRows,1,1).select(key,value)
28+
return df
29+
}
30+
31+
def calculateMedian(L: Seq[Float]) : Float = {
32+
breeze.stats.median(L)
33+
}
34+
35+
def main(args: Array[String]): Unit = {
36+
//create an instance of spark session
37+
val spark =
38+
SparkSession.builder()
39+
.appName("DataframeMedianExact")
40+
.master("local[*]")
41+
.getOrCreate()
42+
/*
43+
create a DataFrame with 1000,000 rows and two columns: "key" and "value"
44+
number of keys will be 10 {0, 1, 2,, ..., 9}
45+
*/
46+
val df = createTestDataframe(spark, 10, 1000000)
47+
print("df.count()=", df.count())
48+
df.printSchema()
49+
df.show(20, truncate=false)
50+
/*
51+
create a UDF from a scala function:
52+
FloatType() is a return type of function calculate_median(list)
53+
*/
54+
val calculateMedianUDF = udf( L => calculateMedian(L))
55+
// relative error = 1/10,000,000
56+
// use approximation df2.agg(collect_list('age'))
57+
val exactMedianPerKey = df.groupBy("key").agg(calculateMedianUDF(collect_list("value")).alias("median"))
58+
print("exact_median_per_key.count()=", exactMedianPerKey.count())
59+
exactMedianPerKey.printSchema()
60+
exactMedianPerKey.show(truncate=false)
61+
//Done
62+
spark.stop()
63+
}
64+
65+
}

0 commit comments

Comments
 (0)