Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
c57eaaf
added streaming and exploring how it works with kotlin spark api
Jolanrensen Feb 18, 2022
bb39fc7
Adds helpful rdd to dataset conversion, as well as a new withSpark fu…
Jolanrensen Feb 21, 2022
4bd3fe1
makes javaRDD toDS function more generic.
Jolanrensen Feb 21, 2022
c0ead09
added encoders: Duration, Period, ByteArray (Binary, now actually wor…
Jolanrensen Feb 21, 2022
09e9bb5
Arity is now Serializable, removed sc.stop(), sc is now lazy, updates…
Jolanrensen Feb 22, 2022
597b6f1
copying over some other missing parts from ScalaReflection.scala. Did…
Jolanrensen Feb 22, 2022
0f585bd
serializing binary works!
Jolanrensen Feb 23, 2022
92ed60e
serializing binary works!
Jolanrensen Feb 23, 2022
1fb680b
fixed serializing CalendarInterval, added tests and fixes for Decimal…
Jolanrensen Feb 23, 2022
4f8ae68
updating all tests to shouldBe instead of just show
Jolanrensen Feb 23, 2022
38486bb
removed .show() from rdd test
Jolanrensen Feb 23, 2022
0acd3e2
Update README.md
Jolanrensen Feb 23, 2022
bcf99b8
split up rdd tests, added list test.
Jolanrensen Feb 24, 2022
165c1b0
Merge pull request #132 from JetBrains/rdd-related-helpers
Jolanrensen Feb 24, 2022
2fdba6a
Update docs generation
Jolanrensen Feb 24, 2022
5d785e0
added jira issue
Jolanrensen Feb 24, 2022
518d1a1
Update ApiTest.kt
Jolanrensen Feb 25, 2022
e620896
added encoders: Duration, Period, ByteArray (Binary, now actually wor…
Jolanrensen Feb 21, 2022
e466df6
copying over some other missing parts from ScalaReflection.scala. Did…
Jolanrensen Feb 22, 2022
1557fa4
serializing binary works!
Jolanrensen Feb 23, 2022
680e5b1
serializing binary works!
Jolanrensen Feb 23, 2022
ba0c452
fixed serializing CalendarInterval, added tests and fixes for Decimal…
Jolanrensen Feb 23, 2022
054d626
updating all tests to shouldBe instead of just show
Jolanrensen Feb 23, 2022
66dc40e
added jira issue
Jolanrensen Feb 24, 2022
31f56d8
rebasing on spark 3.2 branch
Jolanrensen Feb 25, 2022
9b1c2c9
Merge remote-tracking branch 'origin/encoders-and-data-types' into en…
Jolanrensen Feb 28, 2022
d56b5a4
spark 3.2.1
Jolanrensen Feb 28, 2022
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
updating all tests to shouldBe instead of just show
  • Loading branch information
Jolanrensen committed Feb 23, 2022
commit 4f8ae68fed1e2a8e7086ef68a91e52d70aabb59d
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.unsafe.types.CalendarInterval
import org.jetbrains.kotlinx.spark.extensions.KSparkExtensions
import scala.Product
import scala.Tuple2
import scala.concurrent.duration.`Duration$`
import scala.reflect.ClassTag
import scala.reflect.api.StandardDefinitions
import java.beans.PropertyDescriptor
Expand Down Expand Up @@ -1306,7 +1307,6 @@ private val knownDataTypes: Map<KClass<out Any>, DataType> = mapOf(
Decimal::class to DecimalType.SYSTEM_DEFAULT(),
BigDecimal::class to DecimalType.SYSTEM_DEFAULT(),
CalendarInterval::class to DataTypes.CalendarIntervalType,
Nothing::class to DataTypes.NullType,
)

private fun transitiveMerge(a: Map<String, KType>, b: Map<String, KType>): Map<String, KType> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.jetbrains.kotlinx.spark.api/*-
import ch.tutteli.atrium.api.fluent.en_GB.*
import ch.tutteli.atrium.api.verbs.expect
import io.kotest.core.spec.style.ShouldSpec
import io.kotest.matchers.should
import io.kotest.matchers.shouldBe
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions.*
Expand Down Expand Up @@ -323,68 +324,97 @@ class ApiTest : ShouldSpec({
cogrouped.count() shouldBe 4
}
should("handle LocalDate Datasets") { // uses encoder
val dataset: Dataset<LocalDate> = dsOf(LocalDate.now(), LocalDate.now())
dataset.show()
val dates = listOf(LocalDate.now(), LocalDate.now())
val dataset: Dataset<LocalDate> = dates.toDS()
dataset.collectAsList() shouldBe dates
}
should("handle Instant Datasets") { // uses encoder
val dataset: Dataset<Instant> = dsOf(Instant.now(), Instant.now())
dataset.show()
val instants = listOf(Instant.now(), Instant.now())
val dataset: Dataset<Instant> = instants.toDS()
dataset.collectAsList() shouldBe instants
}
should("Be able to serialize Instant") { // uses knownDataTypes
val dataset = dsOf(Instant.now() to Instant.now())
dataset.show()
val instantPair = Instant.now() to Instant.now()
val dataset = dsOf(instantPair)
dataset.collectAsList() shouldBe listOf(instantPair)
}
should("be able to serialize Date") { // uses knownDataTypes
val dataset: Dataset<Pair<Date, Int>> = dsOf(Date.valueOf("2020-02-10") to 5)
dataset.show()
val datePair = Date.valueOf("2020-02-10") to 5
val dataset: Dataset<Pair<Date, Int>> = dsOf(datePair)
dataset.collectAsList() shouldBe listOf(datePair)
}
should("handle Timestamp Datasets") { // uses encoder
val dataset = dsOf(Timestamp(0L))
dataset.show()
val timeStamps = listOf(Timestamp(0L), Timestamp(1L))
val dataset = timeStamps.toDS()
dataset.collectAsList() shouldBe timeStamps
}
should("be able to serialize Timestamp") { // uses knownDataTypes
val dataset = dsOf(Timestamp(0L) to 2)
dataset.show()
val timestampPair = Timestamp(0L) to 2
val dataset = dsOf(timestampPair)
dataset.collectAsList() shouldBe listOf(timestampPair)
}
should("handle Duration Datasets") { // uses encoder
val dataset = dsOf(Duration.ZERO)
dataset.show()
dataset.collectAsList() shouldBe listOf(Duration.ZERO)
}
should("handle Period Datasets") { // uses encoder
val dataset = dsOf(Period.ZERO)
dataset.show()
val periods = listOf(Period.ZERO, Period.ofDays(2))
val dataset = periods.toDS()

dataset.show(false)

dataset.collectAsList().let {
it[0] shouldBe Period.ZERO

// TODO this is also broken in Scala. It reports a Period of 0 instead of 2 days
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please link relevant JIRA ticket

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://issues.apache.org/jira/browse/SPARK-38317 Apparently it's expected behavior, strange

// it[1] shouldBe Period.ofDays(2)
it[1] shouldBe Period.ofDays(0)
}

}
should("handle binary datasets") { // uses encoder
val dataset = dsOf("Hello there".encodeToByteArray())
dataset.show()
val byteArray = "Hello there".encodeToByteArray()
val dataset = dsOf(byteArray)
dataset.collectAsList() shouldBe listOf(byteArray)
}
should("be able to serialize binary") { // uses knownDataTypes
val dataset = dsOf(c("Hello there".encodeToByteArray(), 1, intArrayOf(1, 2, 3)))
dataset.show()
}
should("handle Decimal datasets") { // uses encoder
val dataset = dsOf(Decimal().set(50))
dataset.show()
val byteArrayTriple = c("Hello there".encodeToByteArray(), 1, intArrayOf(1, 2, 3))
val dataset = dsOf(byteArrayTriple)

val (a, b, c) = dataset.collectAsList().single()
a contentEquals "Hello there".encodeToByteArray() shouldBe true
b shouldBe 1
c contentEquals intArrayOf(1, 2, 3) shouldBe true
}
should("be able to serialize Decimal") { // uses knownDataTypes
val dataset = dsOf(c(Decimal().set(50), 12))
dataset.show()
val decimalPair = c(Decimal().set(50), 12)
val dataset = dsOf(decimalPair)
dataset.collectAsList() shouldBe listOf(decimalPair)
}
should("handle BigDecimal datasets") { // uses encoder
val dataset = dsOf(BigDecimal.TEN)
dataset.show()
val decimals = listOf(BigDecimal.ONE, BigDecimal.TEN)
val dataset = decimals.toDS()
dataset.collectAsList().let { (one, ten) ->
one.compareTo(BigDecimal.ONE) shouldBe 0
ten.compareTo(BigDecimal.TEN) shouldBe 0
}
}
should("be able to serialize BigDecimal") { // uses knownDataTypes
val dataset = dsOf(c(BigDecimal.TEN, 12))
dataset.show()
val decimalPair = c(BigDecimal.TEN, 12)
val dataset = dsOf(decimalPair)
val (a, b) = dataset.collectAsList().single()
a.compareTo(BigDecimal.TEN) shouldBe 0
b shouldBe 12
}
should("be able to serialize CalendarInterval") { // uses knownDataTypes
val dataset = dsOf(CalendarInterval(1, 0, 0L) to 2)
dataset.show()
val calendarIntervalPair = CalendarInterval(1, 0, 0L) to 2
val dataset = dsOf(calendarIntervalPair)
dataset.collectAsList() shouldBe listOf(calendarIntervalPair)
}
should("be able to serialize null") { // uses knownDataTypes
val dataset: Dataset<Pair<Nothing?, Int>> = dsOf(null to 2)
dataset.show()
should("handle nullable datasets") {
val ints = listOf(1, 2, 3, null)
val dataset = ints.toDS()
dataset.collectAsList() shouldBe ints
}
should("Be able to serialize Scala Tuples including data classes") {
val dataset = dsOf(
Expand Down Expand Up @@ -415,28 +445,28 @@ class ApiTest : ShouldSpec({
val newDS1WithAs: Dataset<IntArray> = dataset.selectTyped(
col("a").`as`<IntArray>(),
)
newDS1WithAs.show()
newDS1WithAs.collectAsList()

val newDS2: Dataset<Pair<IntArray, Int>> = dataset.selectTyped(
col(SomeClass::a), // NOTE: this only works on 3.0, returning a data class with an array in it
col(SomeClass::b),
)
newDS2.show()
newDS2.collectAsList()

val newDS3: Dataset<Triple<IntArray, Int, Int>> = dataset.selectTyped(
col(SomeClass::a),
col(SomeClass::b),
col(SomeClass::b),
)
newDS3.show()
newDS3.collectAsList()

val newDS4: Dataset<Arity4<IntArray, Int, Int, Int>> = dataset.selectTyped(
col(SomeClass::a),
col(SomeClass::b),
col(SomeClass::b),
col(SomeClass::b),
)
newDS4.show()
newDS4.collectAsList()

val newDS5: Dataset<Arity5<IntArray, Int, Int, Int, Int>> = dataset.selectTyped(
col(SomeClass::a),
Expand All @@ -445,7 +475,7 @@ class ApiTest : ShouldSpec({
col(SomeClass::b),
col(SomeClass::b),
)
newDS5.show()
newDS5.collectAsList()
}
should("Access columns using invoke on datasets") {
val dataset = dsOf(
Expand Down Expand Up @@ -498,19 +528,18 @@ class ApiTest : ShouldSpec({
dataset(SomeOtherClass::a),
col(SomeOtherClass::c),
)
b.show()
b.collectAsList()
}
should("Handle some where queries using column operator functions") {
val dataset = dsOf(
SomeOtherClass(intArrayOf(1, 2, 3), 4, true),
SomeOtherClass(intArrayOf(4, 3, 2), 1, true),
)
dataset.show()
dataset.collectAsList()

val column = col("b").`as`<IntArray>()

val b = dataset.where(column gt 3 and col(SomeOtherClass::c))
b.show()

b.count() shouldBe 1
}
Expand All @@ -519,21 +548,51 @@ class ApiTest : ShouldSpec({
listOf(SomeClass(intArrayOf(1, 2, 3), 4)),
listOf(SomeClass(intArrayOf(3, 2, 1), 0)),
)
dataset.show()

val (first, second) = dataset.collectAsList()

first.single().let { (a, b) ->
a.contentEquals(intArrayOf(1, 2, 3)) shouldBe true
b shouldBe 4
}
second.single().let { (a, b) ->
a.contentEquals(intArrayOf(3, 2, 1)) shouldBe true
b shouldBe 0
}
}
should("Be able to serialize arrays of data classes") {
val dataset = dsOf(
arrayOf(SomeClass(intArrayOf(1, 2, 3), 4)),
arrayOf(SomeClass(intArrayOf(3, 2, 1), 0)),
)
dataset.show()

val (first, second) = dataset.collectAsList()

first.single().let { (a, b) ->
a.contentEquals(intArrayOf(1, 2, 3)) shouldBe true
b shouldBe 4
}
second.single().let { (a, b) ->
a.contentEquals(intArrayOf(3, 2, 1)) shouldBe true
b shouldBe 0
}
}
should("Be able to serialize lists of tuples") {
val dataset = dsOf(
listOf(Tuple2(intArrayOf(1, 2, 3), 4)),
listOf(Tuple2(intArrayOf(3, 2, 1), 0)),
)
dataset.show()

val (first, second) = dataset.collectAsList()

first.single().let {
it._1().contentEquals(intArrayOf(1, 2, 3)) shouldBe true
it._2() shouldBe 4
}
second.single().let {
it._1().contentEquals(intArrayOf(3, 2, 1)) shouldBe true
it._2() shouldBe 0
}
}
should("Allow simple forEachPartition in datasets") {
val dataset = dsOf(
Expand Down