Skip to content
144 changes: 35 additions & 109 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Kotlin for Apache® Spark™ [![Maven Central](https://img.shields.io/maven-central/v/org.jetbrains.kotlinx.spark/kotlin-spark-api-parent.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:org.jetbrains.kotlinx.spark%20AND%20v:1.0.2) [![official JetBrains project](http://jb.gg/badges/official.svg)](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub) [![Join the chat at https://gitter.im/JetBrains/kotlin-spark-api](https://badges.gitter.im/JetBrains/kotlin-spark-api.svg)](https://gitter.im/JetBrains/kotlin-spark-api?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
# Kotlin for Apache® Spark™ [![Maven Central](https://img.shields.io/maven-central/v/org.jetbrains.kotlinx.spark/kotlin-spark-api-parent-3.2.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:org.jetbrains.kotlinx.spark%20AND%20v:1.1.0) [![official JetBrains project](http://jb.gg/badges/official.svg)](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub) [![Join the chat at https://gitter.im/JetBrains/kotlin-spark-api](https://badges.gitter.im/JetBrains/kotlin-spark-api.svg)](https://gitter.im/JetBrains/kotlin-spark-api?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)


Your next API to work with [Apache Spark](https://spark.apache.org/).
Expand Down Expand Up @@ -31,20 +31,21 @@ We have opened a Spark Project Improvement Proposal: [Kotlin support for Apache

## Supported versions of Apache Spark

| Apache Spark | Scala | Kotlin for Apache Spark |
| Apache Spark | Scala | Kotlin for Apache Spark |
|:------------:|:-----:|:-------------------------------:|
| 3.0.0+ | 2.12 | kotlin-spark-api-3.0:1.0.2 |
| 2.4.1+ | 2.12 | kotlin-spark-api-2.4_2.12:1.0.2 |
| 2.4.1+ | 2.11 | kotlin-spark-api-2.4_2.11:1.0.2 |
| 3.2.0+ | 2.12 | kotlin-spark-api-3.2:1.0.3 |
| 3.2.1+ | 2.12 | kotlin-spark-api-3.2:1.1.0 |
| 3.1.3+ | 2.12 | kotlin-spark-api-3.1:1.1.0 |
| 3.0.3+ | 2.12 | kotlin-spark-api-3.0:1.1.0 |
| 2.4.1+ | 2.12 | kotlin-spark-api-2.4_2.12:1.0.2 |
| 2.4.1+ | 2.11 | kotlin-spark-api-2.4_2.11:1.0.2 |

## Releases

The list of Kotlin for Apache Spark releases is available [here](https://github.com/JetBrains/kotlin-spark-api/releases/).
The Kotlin for Spark artifacts adhere to the following convention:
`[Apache Spark version]_[Scala core version]:[Kotlin for Apache Spark API version]`

[![Maven Central](https://img.shields.io/maven-central/v/org.jetbrains.kotlinx.spark/kotlin-spark-api-parent.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:"org.jetbrains.kotlinx.spark"%20AND%20a:"kotlin-spark-api-3.0")
[![Maven Central](https://img.shields.io/maven-central/v/org.jetbrains.kotlinx.spark/kotlin-spark-api-parent-3.2.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:"org.jetbrains.kotlinx.spark"%20AND%20a:"kotlin-spark-api-3.2")

## How to configure Kotlin for Apache Spark in your project

Expand All @@ -55,7 +56,7 @@ Here's an example `pom.xml`:
```xml
<dependency>
<groupId>org.jetbrains.kotlinx.spark</groupId>
<artifactId>kotlin-spark-api-3.0</artifactId>
<artifactId>kotlin-spark-api-3.2</artifactId>
<version>${kotlin-spark-api.version}</version>
</dependency>
<dependency>
Expand All @@ -79,25 +80,28 @@ The Kotlin Spark API also supports Kotlin Jupyter notebooks.
To it, simply add

```jupyterpython
%use kotlin-spark-api
%use spark
```
to the top of your notebook. This will get the latest version of the API, together with the latest version of Spark.
To define a certain version of Spark or the API itself, simply add it like this:
```jupyterpython
%use kotlin-spark-api(spark=3.2, version=1.0.4)
%use spark(spark=3.2, v=1.1.0)
```

Inside the notebook a Spark session will be initiated automatically. This can be accessed via the `spark` value.
`sc: JavaSparkContext` can also be accessed directly. The API operates pretty similarly.

There is also support for HTML rendering of Datasets and simple (Java)RDDs.
Check out the [example](examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/JupyterExample.ipynb) as well.


To use Spark Streaming abilities, instead use
```jupyterpython
%use kotlin-spark-api-streaming
%use spark-streaming
```
This does not start a Spark session right away, meaning you can call `withSparkStreaming(batchDuration) {}`
in whichever cell you want.
Check out the [example](examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/streaming/JupyterStreamingExample.ipynb).

## Kotlin for Apache Spark features

Expand All @@ -115,14 +119,19 @@ This is not needed when running the Kotlin Spark API from a Jupyter notebook.
```kotlin
spark.dsOf("a" to 1, "b" to 2)
```
The example above produces `Dataset<Pair<String, Int>>`. While Kotlin Pairs and Triples are supported, Scala Tuples are reccomended for better support.
The example above produces `Dataset<Pair<String, Int>>`. While Kotlin Pairs and Triples are supported, Scala Tuples are
recommended for better support.

### Null safety
There are several aliases in API, like `leftJoin`, `rightJoin` etc. These are null-safe by design.
For example, `leftJoin` is aware of nullability and returns `Dataset<Pair<LEFT, RIGHT?>>`.
Note that we are forcing `RIGHT` to be nullable for you as a developer to be able to handle this situation.
`NullPointerException`s are hard to debug in Spark, and we doing our best to make them as rare as possible.
`NullPointerException`s are hard to debug in Spark, and we're doing our best to make them as rare as possible.

In Spark, you might also come across Scala-native `Option<*>` or Java-compatible `Optional<*>` classes.
We provide `getOrNull()` and `getOrElse()` functions for these to use Kotlin's null safety for good.

Similarly, you can also create `Option<*>`s and `Optional<*>`s like `T?.toOptional()` if a Spark function requires it.
### withSpark function

We provide you with useful function `withSpark`, which accepts everything that may be needed to run Spark — properties, name, master location and so on. It also accepts a block of code to execute inside Spark context.
Expand All @@ -134,8 +143,8 @@ Do not use this when running the Kotlin Spark API from a Jupyter notebook.
```kotlin
withSpark {
dsOf(1, 2)
.map { it X it } // creates Tuple2<Int, Int>
.show()
.map { it X it } // creates Tuple2<Int, Int>
.show()
}
```

Expand All @@ -152,14 +161,14 @@ To solve these problems we've added `withCached` function
```kotlin
withSpark {
dsOf(1, 2, 3, 4, 5)
.map { tupleOf(it, it + 2) }
.withCached {
showDS()

filter { it._1 % 2 == 0 }.showDS()
}
.map { tupleOf(it._1, it._2, (it._1 + it._2) * 2) }
.show()
.map { tupleOf(it, it + 2) }
.withCached {
showDS()
filter { it._1 % 2 == 0 }.showDS()
}
.map { tupleOf(it._1, it._2, (it._1 + it._2) * 2) }
.show()
}
```

Expand All @@ -185,49 +194,7 @@ dataset.where( col("colA") `===` 6 )
dataset.where( col("colA") eq 6)
```

In short, all supported operators are:

- `==`,
- `!=`,
- `eq` / `` `===` ``,
- `neq` / `` `=!=` ``,
- `-col(...)`,
- `!col(...)`,
- `gt`,
- `lt`,
- `geq`,
- `leq`,
- `or`,
- `and` / `` `&&` ``,
- `+`,
- `-`,
- `*`,
- `/`,
- `%`

Secondly, there are some quality of life additions as well:

In Kotlin, Ranges are often
used to solve inclusive/exclusive situations for a range. So, you can now do:
```kotlin
dataset.where( col("colA") inRangeOf 0..2 )
```

Also, for columns containing map- or array like types:

```kotlin
dataset.where( col("colB")[0] geq 5 )
```

Finally, thanks to Kotlin reflection, we can provide a type- and refactor safe way
to create `TypedColumn`s and with those a new Dataset from pieces of another using the `selectTyped()` function, added to the API:
```kotlin
val dataset: Dataset<YourClass> = ...
val newDataset: Dataset<Pair<TypeA, TypeB>> = dataset.selectTyped(col(YourClass::colA), col(YourClass::colB))

// Alternatively, for instance when working with a Dataset<Row>
val typedDataset: Dataset<Pair<String, Int>> = otherDataset.selectTyped(col("a").`as`<String>(), col("b").`as`<Int>())
```
To read more, check the [wiki](https://github.com/JetBrains/kotlin-spark-api/wiki/Column-functions).

### Overload resolution ambiguity

Expand All @@ -253,49 +220,7 @@ val a: Tuple2<Int, Long> = tupleOf(1, 2L)
val b: Tuple3<String, Double, Int> = t("test", 1.0, 2)
val c: Tuple3<Float, String, Int> = 5f X "aaa" X 1
```
Tuples can be expanded and merged like this:
```kotlin
// expand
tupleOf(1, 2).appendedBy(3) == tupleOf(1, 2, 3)
tupleOf(1, 2) + 3 == tupleOf(1, 2, 3)
tupleOf(2, 3).prependedBy(1) == tupleOf(1, 2, 3)
1 + tupleOf(2, 3) == tupleOf(1, 2, 3)

// merge
tupleOf(1, 2) concat tupleOf(3, 4) == tupleOf(1, 2, 3, 4)
tupleOf(1, 2) + tupleOf(3, 4) == tupleOf(1, 2, 3, 4)

// extend tuple instead of merging with it
tupleOf(1, 2).appendedBy(tupleOf(3, 4)) == tupleOf(1, 2, tupleOf(3, 4))
tupleOf(1, 2) + tupleOf(tupleOf(3, 4)) == tupleOf(1, 2, tupleOf(3, 4))
```

The concept of `EmptyTuple` from Scala 3 is also already present:
```kotlin
tupleOf(1).dropLast() == tupleOf() == emptyTuple()
```

Finally, all these tuple helper functions are also baked in:

- `componentX()` for destructuring: `val (a, b) = tuple`
- `dropLast() / dropFirst()`
- `contains(x)` for `if (x in tuple) { ... }`
- `iterator()` for `for (x in tuple) { ... }`
- `asIterable()`
- `size`
- `get(n) / get(i..j)` for `tuple[1] / tuple[i..j]`
- `getOrNull(n) / getOrNull(i..j)`
- `getAs<T>(n) / getAs<T>(i..j)`
- `getAsOrNull<T>(n) / getAsOrNull<T>(i..j)`
- `copy(_1 = ..., _5 = ...)`
- `first() / last()`
- `_1`, `_6` etc. (instead of `_1()`, `_6()`)
- `zip`
- `dropN() / dropLastN()`
- `takeN() / takeLastN()`
- `splitAtN()`
- `map`
- `cast`
To read more about tuples and all the added functions, refer to the [wiki](https://github.com/JetBrains/kotlin-spark-api/wiki/Tuples).

### Streaming

Expand Down Expand Up @@ -338,6 +263,7 @@ withSparkStreaming(batchDuration = Durations.seconds(1), timeout = 10_000) { //
}
```

For more information, check the [wiki](https://github.com/JetBrains/kotlin-spark-api/wiki/Streaming).

## Examples

Expand Down
2 changes: 1 addition & 1 deletion core/3.2/pom_2.12.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<artifactId>core-3.2_2.12</artifactId>
<parent>
<groupId>org.jetbrains.kotlinx.spark</groupId>
<artifactId>kotlin-spark-api-parent_2.12</artifactId>
<artifactId>kotlin-spark-api-parent-3.2_2.12</artifactId>
<version>1.0.4-SNAPSHOT</version>
<relativePath>../../pom_2.12.xml</relativePath>
</parent>
Expand Down
2 changes: 1 addition & 1 deletion dummy/pom.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>kotlin-spark-api-parent</artifactId>
<artifactId>kotlin-spark-api-parent-3.2</artifactId>
<groupId>org.jetbrains.kotlinx.spark</groupId>
<version>1.0.4-SNAPSHOT</version>
</parent>
Expand Down
2 changes: 1 addition & 1 deletion examples/pom-3.2_2.12.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<artifactId>examples-3.2_2.12</artifactId>
<parent>
<groupId>org.jetbrains.kotlinx.spark</groupId>
<artifactId>kotlin-spark-api-parent_2.12</artifactId>
<artifactId>kotlin-spark-api-parent-3.2_2.12</artifactId>
<version>1.0.4-SNAPSHOT</version>
<relativePath>../pom_2.12.xml</relativePath>
</parent>
Expand Down
Loading