Skip to content
Prev Previous commit
Next Next commit
ch07-DataframeRedisWriter
  • Loading branch information
deepakmca05 committed Jan 15, 2022
commit b65861a5b43c1cef2d95005f979b2db9f1139bc5
2 changes: 1 addition & 1 deletion code/chap04/scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ ext.scalaClassifier = '2.13'
ext.scalaVersion = '2.13.7'
ext.sparkVersion = '3.2.0'

group 'org.data.algorithms.spark.ch03'
group 'org.data.algorithms.spark.ch04'
version '1.0-SNAPSHOT'

repositories {
Expand Down
8 changes: 4 additions & 4 deletions code/chap07/scala/build.gradle
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
apply plugin: 'scala'
apply plugin: 'application'

ext.scalaClassifier = '2.13'
ext.scalaVersion = '2.13.7'
ext.scalaClassifier = '2.12'
ext.scalaVersion = '2.12.15'
ext.sparkVersion = '3.2.0'

group 'org.data.algorithms.spark.ch03'
group 'org.data.algorithms.spark.ch07'
version '1.0-SNAPSHOT'

repositories {
Expand All @@ -17,7 +17,7 @@ dependencies {
implementation "org.scala-lang:scala-library:$scalaVersion"
implementation "org.apache.spark:spark-core_$scalaClassifier:$sparkVersion"
implementation "org.apache.spark:spark-sql_$scalaClassifier:$sparkVersion"
implementation "org.scalanlp:breeze_$scalaClassifier:2.0.1-RC2"
implementation "com.redislabs:spark-redis_2.12:3.0.0"

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.data.algorithms.spark.ch07

import org.apache.spark.sql.{SaveMode, SparkSession}
object DatasourceRedisWriter {
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("Usage: DatasourceRedisWriter <redis-host> <redis-port>")
System.exit(-1)
}

//redis host
val REDIS_HOST = args(0)
println(s"REDIS_HOST = ${REDIS_HOST}")
//redis port number
val REDIS_PORT = args(1)
print("REDIS_PORT = ", REDIS_PORT)
//create an instance of SparkSession
val spark =
SparkSession.
builder().
master("local[*]").
config("spark.redis.host",REDIS_HOST).
config("spark.redis.port",REDIS_PORT).
getOrCreate()
//
// you may add the password, if desired
// .config("spark.redis.auth", "passwd")
//
/*
========================================
Write to Redis from a DataFrame
========================================


Use the SparkSession.createDataFrame() function
to create a DataFrame. In the following example,
createDataFrame() takes a list of tuples containing
names, cities, and ages, and a list of column names:
*/
val columnNames = List("name", "city", "age")
val listOfValues = List(
("Alex", "Ames", 50),
("Gandalf", "Cupertino", 60),
("Thorin", "Sunnyvale", 95),
("Betty", "Ames", 78),
("Brian", "Stanford", 77)
)
val df = spark.createDataFrame(listOfValues).toDF(columnNames:_*)
println(s"df.count(): ${df.count()}")
println(s"df.collect(): ${df.collect().mkString("",",","")}")
df.show()
df.printSchema()

//-----------------------------------
// Write an existing DataFrame (df)
// to a redis database:
//-----------------------------------
df.write
.mode(SaveMode.Overwrite)
.format("org.apache.spark.sql.redis")
.option("table","people")
.option("key.column","name")
.save()
//-----------------------------------
// READ back data from Redis:
//-----------------------------------
val loadedDF = spark.read
.format("org.apache.spark.sql.redis")
.option("table", "people")
.option("key.column", "name")
.load()

println(s"loadedDF.count(): ${loadedDF.count()}")
println(s"loadedDF.collect(): ${loadedDF.collect().mkString("",",","")}")
loadedDF.show()
loadedDF.printSchema()

//Done.
spark.stop()

}

}