Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 20 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,15 @@ val df: DataFrame = sqlContext.read
.format("io.github.spark_redshift_community.spark.redshift")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("dbtable", "my_table")
.option("tempdir", "s3n://path/for/temp/data")
.option("tempdir", "s3a://path/for/temp/data")
.load()

// Can also load data from a Redshift query
val df: DataFrame = sqlContext.read
.format("io.github.spark_redshift_community.spark.redshift")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("query", "select x, count(*) my_table group by x")
.option("tempdir", "s3n://path/for/temp/data")
.option("tempdir", "s3a://path/for/temp/data")
.load()

// Apply some transformations to the data as per normal, then you can use the
Expand All @@ -190,7 +190,7 @@ df.write
.format("io.github.spark_redshift_community.spark.redshift")
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("dbtable", "my_table_copy")
.option("tempdir", "s3n://path/for/temp/data")
.option("tempdir", "s3a://path/for/temp/data")
.mode("error")
.save()

Expand All @@ -200,7 +200,7 @@ df.write
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
.option("dbtable", "my_table_copy")
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role")
.option("tempdir", "s3n://path/for/temp/data")
.option("tempdir", "s3a://path/for/temp/data")
.mode("error")
.save()
```
Expand All @@ -218,23 +218,23 @@ df = sql_context.read \
.format("io.github.spark_redshift_community.spark.redshift") \
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
.option("dbtable", "my_table") \
.option("tempdir", "s3n://path/for/temp/data") \
.option("tempdir", "s3a://path/for/temp/data") \
.load()

# Read data from a query
df = sql_context.read \
.format("io.github.spark_redshift_community.spark.redshift") \
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
.option("query", "select x, count(*) my_table group by x") \
.option("tempdir", "s3n://path/for/temp/data") \
.option("tempdir", "s3a://path/for/temp/data") \
.load()

# Write back to a table
df.write \
.format("io.github.spark_redshift_community.spark.redshift") \
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
.option("dbtable", "my_table_copy") \
.option("tempdir", "s3n://path/for/temp/data") \
.option("tempdir", "s3a://path/for/temp/data") \
.mode("error") \
.save()

Expand All @@ -243,7 +243,7 @@ df.write \
.format("io.github.spark_redshift_community.spark.redshift") \
.option("url", "jdbc:redshift://redshifthost:5439/database?user=username&password=pass") \
.option("dbtable", "my_table_copy") \
.option("tempdir", "s3n://path/for/temp/data") \
.option("tempdir", "s3a://path/for/temp/data") \
.option("aws_iam_role", "arn:aws:iam::123456789000:role/redshift_iam_role") \
.mode("error") \
.save()
Expand All @@ -258,7 +258,7 @@ CREATE TABLE my_table
USING io.github.spark_redshift_community.spark.redshift
OPTIONS (
dbtable 'my_table',
tempdir 's3n://path/for/temp/data',
tempdir 's3a://path/for/temp/data',
url 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass'
);
```
Expand All @@ -271,7 +271,7 @@ CREATE TABLE my_table
USING io.github.spark_redshift_community.spark.redshift
OPTIONS (
dbtable 'my_table',
tempdir 's3n://path/for/temp/data'
tempdir 's3a://path/for/temp/data'
url 'jdbc:redshift://redshifthost:5439/database?user=username&password=pass'
)
AS SELECT * FROM tabletosave;
Expand All @@ -287,7 +287,7 @@ Reading data using R:
df <- read.df(
NULL,
"io.github.spark_redshift_community.spark.redshift",
tempdir = "s3n://path/for/temp/data",
tempdir = "s3a://path/for/temp/data",
dbtable = "my_table",
url = "jdbc:redshift://redshifthost:5439/database?user=username&password=pass")
```
Expand Down Expand Up @@ -373,23 +373,16 @@ The following describes how each connection can be authenticated:

2. **Set keys in Hadoop conf:** You can specify AWS keys via
[Hadoop configuration properties](https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md).
For example, if your `tempdir` configuration points to a `s3n://` filesystem then you can
set the `fs.s3n.awsAccessKeyId` and `fs.s3n.awsSecretAccessKey` properties in a Hadoop XML
For example, if your `tempdir` configuration points to a `s3a://` filesystem then you can
set the `fs.s3a.awsAccessKeyId` and `fs.s3a.awsSecretAccessKey` properties in a Hadoop XML
configuration file or call `sc.hadoopConfiguration.set()` to mutate Spark's global Hadoop
configuration.

For example, if you are using the `s3n` filesystem then add
For example, if you are using the `s3a` filesystem then add

```scala
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "YOUR_KEY_ID")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "YOUR_SECRET_ACCESS_KEY")
```

and for the `s3a` filesystem add

```scala
sc.hadoopConfiguration.set("fs.s3a.access.key", "YOUR_KEY_ID")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "YOUR_SECRET_ACCESS_KEY")
sc.hadoopConfiguration.set("fs.s3a.awsAccessKeyId", "YOUR_KEY_ID")
sc.hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", "YOUR_SECRET_ACCESS_KEY")
```

Python users will have to use a slightly different method to modify the `hadoopConfiguration`,
Expand All @@ -398,8 +391,8 @@ The following describes how each connection can be authenticated:
break or change in the future:

```python
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "YOUR_KEY_ID")
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "YOUR_SECRET_ACCESS_KEY")
sc._jsc.hadoopConfiguration().set("fs.s3a.awsAccessKeyId", "YOUR_KEY_ID")
sc._jsc.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey", "YOUR_SECRET_ACCESS_KEY")
```

- **Redshift to S3**: Redshift also connects to S3 during `COPY` and `UNLOAD` queries. There are
Expand Down Expand Up @@ -462,7 +455,7 @@ val df1: DataFrame = sqlContext.read
.option("data_api_cluster", "<cluster name>")
.option("data_api_database", "<database name>")
.option("dbtable", "<table name>")
.option("tempdir", "s3n://path/for/temp/data")
.option("tempdir", "s3a://path/for/temp/data")
.option("aws_iam_role", "<iam role arn>")
.load()

Expand All @@ -472,7 +465,7 @@ val df2: DataFrame = sqlContext.read
.option("data_api_workgroup", "<workgroup name>")
.option("data_api_database", "<database name>")
.option("dbtable", "<table name>")
.option("tempdir", "s3n://path/for/temp/data")
.option("tempdir", "s3a://path/for/temp/data")
.option("aws_iam_role", "<iam role arn>")
.load()
```
Expand Down
18 changes: 9 additions & 9 deletions tutorial/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ object SparkRedshiftTutorial {
val sc = new SparkContext(new SparkConf().setAppName("SparkSQL").setMaster("local"))

// Configure SparkContext to communicate with AWS
val tempS3Dir = "s3n://redshift-spark/temp/"
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", awsAccessKeyId)
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey)
val tempS3Dir = "s3a://redshift-spark/temp/"
sc.hadoopConfiguration.set("fs.s3a.awsAccessKeyId", awsAccessKeyId)
sc.hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", awsSecretAccessKey)

// Create the SQL Context
val sqlContext = new SQLContext(sc)
Expand Down Expand Up @@ -89,14 +89,14 @@ jdbc:redshift://swredshift.czac2vcs84ci.us-east-1.redshift.amazonaws.com:5439/sp
`spark-redshift` reads and writes data to S3 when transferring data from/to Redshift, so you'll need to specify a path in S3 where the library should write these temporary files. `spark-redshift` cannot automatically clean up the temporary files it creates in S3. As a result, we recommend that you use a dedicated temporary S3 bucket with an [object lifecycle configuration ](http://docs.aws.amazon.com/AmazonS3/latest/dev/object-lifecycle-mgmt.html) to ensure that temporary files are automatically deleted after a specified expiration period. For this example we create a S3 bucket `redshift-spark`. We tell `spark-redshift` that we will use the following temporary location in S3 to store temporary files generated by `spark-redshift`:

```scala
val tempS3Dir = "s3n://redshift-spark/temp/"
val tempS3Dir = "s3a://redshift-spark/temp/"
```

Next, configure AWS security credentials by setting following properties in the `SparkContext`'s `hadoopConfiguration`:

```scala
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", awsAccessKeyId)
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey)
sc.hadoopConfiguration.set("fs.s3a.awsAccessKeyId", awsAccessKeyId)
sc.hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", awsSecretAccessKey)
```

Finally, we create the `SQLContext` so that we can use the Data Sources API to communicate with Redshift:
Expand Down Expand Up @@ -181,7 +181,7 @@ CREATE TEMPORARY TABLE myevent
USING io.github.spark_redshift_community.spark.redshift
OPTIONS (
dbtable 'event',
tempdir 's3n://redshift-spark/temp/',
tempdir 's3a://redshift-spark/temp/',
url 'jdbc:redshift://swredshift.czac2vcs84ci.us-east-1.redshift.amazonaws.com:5439/sparkredshift?user=spark&password=mysecretpass'
);
SELECT * FROM myevent;
Expand Down Expand Up @@ -253,7 +253,7 @@ The following diagram shows the steps that are performed when Spark Redshift UNL

First, the Spark Driver communicates with the Redshift leader node to obtain the schema of the table (or query) requested. This is done by issuing a `DESCRIBE TABLE` query over a JDBC connection, then parsing its output and mapping its types back to Spark SQL types.

Next, a Redshift [UNLOAD](http://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html) query is created using the schema information obtained. The UNLOAD command unloads each slice into a S3 folder (`22c365b4-13cb-40fd-b4d6-d0ac5d426551`) created in the temporary S3 location provided by the user (`s3n://spark-redshift/temp/`). Each file contains one row per line and each column of the row is pipe (`|`) delimited. This process occurs in parallel for each slice. The `spark-redshift` library achieves its high performance through this mechanism.
Next, a Redshift [UNLOAD](http://docs.aws.amazon.com/redshift/latest/dg/r_UNLOAD.html) query is created using the schema information obtained. The UNLOAD command unloads each slice into a S3 folder (`22c365b4-13cb-40fd-b4d6-d0ac5d426551`) created in the temporary S3 location provided by the user (`s3a://spark-redshift/temp/`). Each file contains one row per line and each column of the row is pipe (`|`) delimited. This process occurs in parallel for each slice. The `spark-redshift` library achieves its high performance through this mechanism.

#### Read UNLOAD'ed S3 files into a DataFrame instance ####

Expand Down Expand Up @@ -312,7 +312,7 @@ CREATE TABLE redshiftevent
USING io.github.spark_redshift_community.spark.redshift
OPTIONS (
dbtable 'redshiftevent',
tempdir 's3n://redshift-spark/temp/',
tempdir 's3a://redshift-spark/temp/',
url 'jdbc:redshift://swredshift.czac2vcs84ci.us-east-1.redshift.amazonaws.com:5439/sparkredshift?user=spark&password=mysecretpass'
)
AS SELECT * FROM myevent;
Expand Down
6 changes: 3 additions & 3 deletions tutorial/SparkRedshiftTutorial.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ object SparkRedshiftTutorial {
println(jdbcURL)
val sc = new SparkContext(new SparkConf().setAppName("SparkSQL").setMaster("local"))

val tempS3Dir = "s3n://redshift-spark/temp/"
sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", awsAccessKey)
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", awsSecretKey)
val tempS3Dir = "s3a://redshift-spark/temp/"
sc.hadoopConfiguration.set("fs.s3a.awsAccessKeyId", awsAccessKey)
sc.hadoopConfiguration.set("fs.s3a.awsSecretAccessKey", awsSecretKey)

val sqlContext = new SQLContext(sc)

Expand Down