DEV Community

Shaine Ismail
Shaine Ismail

Posted on

Calling a stored Procedure SQL Server stored procedure from Spark

Not really a regular thing people need to do and there are options to insert the record set into a temp table which means that you can go directly into data frame. But that is an option that you need your DBA's to switch on.

the following uses a jdbc connection and a result set into a RDD then into a dataframe

import java.sql.DriverManager import java.sql.Connection import org.apache.spark.sql.types.{StructType, StructField, StringType,IntegerType}; import java.sql.ResultSet //driver = 'com.microsoft.sqlserver.jdbc.SQLServerDriver'  val username = "" val pass = "" val url = "jdbc:sqlserver://<host>:<port>;databaseName=<db>" val columns = Seq ("DateCreated", "Action", "ServerName", "IPAddress", "Domain") val schema = StructType(List( StructField("DateCreated", StringType, nullable = true), StructField("Action", StringType, nullable = true), StructField("ServerName", StringType, nullable = true), StructField("IPAddress", StringType, nullable = true), StructField("Domain", StringType, nullable = true) )) def parseResultSet(rs: ResultSet): Row = { val resultSetRecord = columns.map(c => rs.getString(c)) Row(resultSetRecord:_*) } def resultSetToIter(rs: ResultSet)(f: ResultSet => Row): Iterator[Row] = new Iterator[Row] { def hasNext: Boolean = rs.next() def next(): Row = f(rs) } def paralleliseResultSet(rs: ResultSet, spark:SparkSession): DataFrame = { val rdd = spark.sparkContext.parallelize(resultSetToIter(rs(parseResultSet).toSeq) spark.createDataFrame(rdd, schema) } val conn = DriverManager.getConnection(url, username, pass) val rs = conn.createStatement.executeQuery("exec <sp name>") val df: DataFrame = paralleliseResultSet(rs,spark) df.createOrReplaceTempView("df") spark.sql("""select * from df""").show(10, False) 

Top comments (0)