MinIO Spark select enables retrieving only required data from an object using Select API.
This library requires
- Spark 2.3+
- Scala 2.11+
- S3 Select is supported with CSV, JSON and Parquet files using
minioSelectCSV
,minioSelectJSON
andminioSelectParquet
values to specify the data format. - S3 Select supports select on multiple objects.
- S3 Select supports querying SSE-C encrypted objects.
- Spark CSV and JSON options such as nanValue, positiveInf, negativeInf, and options related to corrupt records (for example, failfast and dropmalformed mode) are not supported.
- Using commas (,) within decimals is not supported. For example, 10,000 is not supported and 10000 is.
- The following filters are not pushed down to MinIO:
- Aggregate functions such as COUNT() and SUM().
- Filters that CAST() an attribute. For example, CAST(stringColumn as INT) = 1.
- Filters with an attribute that is an object or is complex. For example, intArray[1] = 1, objectColumn.objectNumber = 1.
- Filters for which the value is not a literal value. For example, intColumn1 = intColumn2
- Only Select Supported Data Types are supported with the documented limitations.
Include this package in your Spark Applications using:
> $SPARK_HOME/bin/spark-shell --packages io.minio:spark-select_2.11:2.1
If you use the sbt-spark-package plugin, in your sbt build file, add:
spDependencies += "minio/spark-select:2.1"
Otherwise,
libraryDependencies += "io.minio" % "spark-select_2.11" % "2.1"
In your pom.xml, add:
<dependencies> <!-- list of dependencies --> <dependency> <groupId>io.minio</groupId> <artifactId>spark-select_2.11</artifactId> <version>2.1</version> </dependency> </dependencies>
Setup all required environment variables
NOTE: It is assumed that you have already installed hadoop-2.8.5, spark 2.3.1 at some locations locally.
export HADOOP_HOME=${HOME}/spark/hadoop-2.8.5/ export PATH=${PATH}:${HADOOP_HOME}/bin export SPARK_DIST_CLASSPATH=$(hadoop classpath) export SPARK_HOME=${HOME}/spark/spark-2.3.1-bin-without-hadoop/ export PATH=${PATH}:${SPARK_HOME}/bin export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64/ git clone https://github.com/minio/spark-select sbt assembly spark-shell --jars target/scala-2.11/spark-select-assembly-2.1.jar
Once the spark-shell
has been successfully invoked.
scala> :load examples/csv.scala Loading examples/csv.scala... import org.apache.spark.sql._ import org.apache.spark.sql.types._ schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,false)) df: org.apache.spark.sql.DataFrame = [name: string, age: int] +-------+---+ | name|age| +-------+---+ |Michael| 31| | Andy| 30| | Justin| 19| +-------+---+ scala>
spark .read .format("minioSelectCSV") // "minioSelectJSON" for JSON or "minioSelectParquet" for Parquet .schema(...) // mandatory .options(...) // optional .load("s3://path/to/my/datafiles")
read.df("s3://path/to/my/datafiles", "minioSelectCSV", schema)
spark .read .format("minioSelectCSV") // "minioSelectJSON" for JSON or "minioSelectParquet" for Parquet .schema(...) // mandatory .options(...) // optional. Examples: // .options(Map("quote" -> "\'", "header" -> "true")) or // .option("quote", "\'").option("header", "true") .load("s3://path/to/my/datafiles")
CREATE TEMPORARY VIEW MyView (number INT, name STRING) USING minioSelectCSV OPTIONS (path "s3://path/to/my/datafiles")
The following options are available when using minioSelectCSV
and minioSelectJSON
. If not specified, default values are used.
Option | Default | Usage |
---|---|---|
compression | "none" | Indicates whether compression is used. "gzip", "bzip2" are values supported besides "none". |
delimiter | "," | Specifies the field delimiter. |
quote | '"' | Specifies the quote character. Specifying an empty string is not supported and results in a malformed XML error. |
escape | '"' | Specifies the quote escape character. |
header | "true" | "false" specifies that there is no header. "true" specifies that a header is in the first line. Only headers in the first line are supported, and empty lines before a header are not supported. |
comment | "#" | Specifies the comment character. |
Option | Default | Usage |
---|---|---|
compression | "none" | Indicates whether compression is used. "gzip", "bzip2" are values supported besides "none". |
multiline | "false" | "false" specifies that the JSON is in Select LINES format, meaning that each line in the input data contains a single JSON object. "true" specifies that the JSON is in Select DOCUMENT format, meaning that a JSON object can span multiple lines in the input data. |
There are no options needed with Parquet files.
Schema with two columns for CSV
.
import org.apache.spark.sql._ import org.apache.spark.sql.types._ object app { def main(args: Array[String]) { val schema = StructType( List( StructField("name", StringType, true), StructField("age", IntegerType, false) ) ) val df = spark .read .format("minioSelectCSV") .schema(schema) .load("s3://sjm-airlines/people.csv") println(df.show()) println(df.select("*").filter("age > 19").show()) } }
With custom schema for JSON
.
import org.apache.spark.sql._ import org.apache.spark.sql.types._ object app { def main(args: Array[String]) { val schema = StructType( List( StructField("name", StringType, true), StructField("age", IntegerType, false) ) ) val df = spark .read .format("minioSelectJSON") .schema(schema) .load("s3://sjm-airlines/people.json") println(df.show()) println(df.select("*").filter("age > 19").show()) } }
With custom schema for Parquet
.
import org.apache.spark.sql._ import org.apache.spark.sql.types._ object app { def main(args: Array[String]) { val schema = StructType( List( StructField("name", StringType, true), StructField("age", IntegerType, false) ) ) val df = spark .read .format("minioSelectParquet") .schema(schema) .load("s3://sjm-airlines/people.parquet") println(df.show()) println(df.select("*").filter("age > 19").show()) } }
Schema with two columns for CSV
.
from pyspark.sql import * from pyspark.sql.types import * if __name__ == "__main__": # create SparkSession spark = SparkSession.builder \ .master("local") \ .appName("spark-select in python") \ .getOrCreate() # filtered schema st = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), False), ]) df = spark \ .read \ .format('minioSelectCSV') \ .schema(st) \ .load("s3://testbucket/people.csv") # show all rows. df.show() # show only filtered rows. df.select("*").filter("age > 19").show()
> $SPARK_HOME/bin/spark-submit --packages io.minio:spark-select_2.11:2.1 <python-file>