Skip to content

Commit 467220f

Browse files
WIP Fix Delta issue (#234)
* chore(deps): bump delta-core_2.12 from 0.7.0 to 1.1.0 Bumps [delta-core_2.12](https://github.com/delta-io/delta) from 0.7.0 to 1.1.0. - [Release notes](https://github.com/delta-io/delta/releases) - [Commits](delta-io/delta@v0.7.0...v1.1.0) --- updated-dependencies: - dependency-name: io.delta:delta-core_2.12 dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] <support@github.com> * fix: delta issue * fix: issue caused by the '`' character in sql expression generation In the newer Spark version, column name with no space character will no longer have the '`' character. Therefore changes were made to have the unit test running correctly * fix: test error caused by the character '`' in spark 3.2 * doc: edit changelog Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
1 parent 75ea316 commit 467220f

File tree

10 files changed

+42
-21
lines changed

10 files changed

+42
-21
lines changed

.github/workflows/release.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ jobs:
1111
matrix:
1212
include:
1313
- SCALA_VER: "2.12"
14-
SPARK_VER: "3.0"
14+
SPARK_VER: "3.2"
1515
- SCALA_VER: "2.11"
1616
SPARK_VER: "2.4"
1717

.github/workflows/snapshot.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ jobs:
1515
matrix:
1616
include:
1717
- SCALA_VER: "2.12"
18-
SPARK_VER: "3.0"
18+
SPARK_VER: "3.2"
1919
- SCALA_VER: "2.11"
2020
SPARK_VER: "2.4"
2121

.github/workflows/test.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@ jobs:
1515
fail-fast: false
1616
matrix:
1717
SCALA_VER: ["2.12", "2.11"]
18-
SPARK_VER: ["3.0", "2.4", "2.3"]
18+
SPARK_VER: ["3.2", "3.0", "2.4", "2.3"]
1919
exclude:
2020
- SCALA_VER: 2.12
2121
SPARK_VER: 2.3
2222
- SCALA_VER: 2.11
2323
SPARK_VER: 3.0
24+
- SCALA_VER: 2.11
25+
SPARK_VER: 3.2
2426
steps:
2527
- name: Checkout
2628
uses: actions/checkout@v2

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
## Unreleased
1+
## 1.0.0 (2022-03-12)
22

33
### Added:
44

@@ -24,6 +24,7 @@
2424
- Deprecated `FileConnector.delete()` to avoid ambiguity (use `FileConnector.drop()` instead)
2525
- Upgraded spark-cassandra-connector to 3.0.0 for the mvn profile `spark_3.0`
2626
- New logo
27+
- Update Delta version to v1.0 (PR #234)
2728

2829
### Fixed:
2930

pom.xml

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,16 +72,16 @@
7272
<java.version>1.8</java.version>
7373
<app.environment>local</app.environment>
7474
<apache.hadoop.version>3.2.0</apache.hadoop.version>
75-
<delta.version>0.7.0</delta.version>
75+
<delta.version>1.1.0</delta.version>
7676
<scala.version>2.12.10</scala.version>
7777
<scala.compat.version>2.12</scala.compat.version>
7878
<scala.maven.plugin.version>4.4.1</scala.maven.plugin.version>
7979
<scalatest.version>3.2.1</scalatest.version>
8080
<scalatest.maven.plugin.version>2.0.2</scalatest.maven.plugin.version>
8181
<scoverage.plugin.version>1.4.1</scoverage.plugin.version>
82-
<spark.version>3.0.1</spark.version>
83-
<spark.compat.version>3.0</spark.compat.version>
84-
<spark.cassandra.connector.version>3.0.0</spark.cassandra.connector.version>
82+
<spark.version>3.2.0</spark.version>
83+
<spark.compat.version>3.2</spark.compat.version>
84+
<spark.cassandra.connector.version>3.1.0</spark.cassandra.connector.version>
8585
<spark.dynamodb.version>1.1.2</spark.dynamodb.version>
8686
<spark.excel.version>0.13.7</spark.excel.version>
8787
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -408,6 +408,15 @@
408408
</profile>
409409
<profile>
410410
<id>spark_3.0</id>
411+
<properties>
412+
<spark.version>3.0.1</spark.version>
413+
<spark.compat.version>3.0</spark.compat.version>
414+
<spark.cassandra.connector.version>3.0.0</spark.cassandra.connector.version>
415+
<delta.version>0.7.0</delta.version>
416+
</properties>
417+
</profile>
418+
<profile>
419+
<id>spark_3.2</id>
411420
<properties>
412421
</properties>
413422
</profile>

src/main/scala/io/github/setl/storage/repository/SparkRepository.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,7 @@ object SparkRepository {
477477
// Check if use is trying to filter an binary column
478478
(binaryColumnNames ++ aliasBinaryColumns).toSet.foreach {
479479
colName: String =>
480-
if (sqlString.contains(s"`$colName`")) {
480+
if (sqlString.contains(s"$colName")) {
481481
throw new IllegalArgumentException(s"Binary column ${cond.key} couldn't be filtered")
482482
}
483483
}
@@ -490,7 +490,7 @@ object SparkRepository {
490490
col =>
491491
val alias = col.metadata.getStringArray(SchemaConverter.COLUMN_NAME).headOption
492492
if (alias.nonEmpty) {
493-
sqlString = sqlString.replace(s"`${col.name}`", s"`${alias.get}`")
493+
sqlString = sqlString.replace(s"${col.name}", s"${alias.get}")
494494
}
495495
}
496496
cond.copy(value = Option(sqlString))

src/test/scala/io/github/setl/storage/ConditionSuite.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,14 @@ class ConditionSuite extends AnyFunSuite {
5757
test("Condition should handle Column") {
5858
import org.apache.spark.sql.functions._
5959
val condition = Condition(
60-
col("test").isin(1, 2, 3)
60+
col("test col").isin(1, 2, 3)
6161
)
62-
assert(condition.toSqlRequest === Condition("test", "IN", Set(1, 2, 3)).toSqlRequest)
62+
63+
assert(condition.toSqlRequest === Condition("test col", "IN", Set(1, 2, 3)).toSqlRequest)
6364

6465
val condition2 = Condition(
65-
col("test").isin(1, 2, 3) && col("test2") === "A"
66+
col("test col").isin(1, 2, 3) && col("test col 2") === "A"
6667
)
67-
assert(condition2.toSqlRequest === "((`test` IN (1, 2, 3)) AND (`test2` = 'A'))")
68+
assert(condition2.toSqlRequest === "((`test col` IN (1, 2, 3)) AND (`test col 2` = 'A'))")
6869
}
6970
}

src/test/scala/io/github/setl/storage/ConnectorBuilderSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ class ConnectorBuilderSuite extends AnyFunSuite with BeforeAndAfterAll {
219219
}
220220

221221
test("build DeltaConnector") {
222+
org.apache.spark.sql.delta.DeltaLog.clearCache()
222223
val spark: SparkSession = new SparkSessionBuilder("cassandra")
223224
.withSparkConf(MockCassandra.cassandraConf)
224225
.setEnv("local")

src/test/scala/io/github/setl/storage/connector/DeltaConnectorSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class DeltaConnectorSuite extends AnyFunSuite {
4848
)
4949

5050
test("Instantiation of constructors") {
51+
org.apache.spark.sql.delta.DeltaLog.clearCache()
5152
val spark: SparkSession = new SparkSessionBuilder().setEnv("local").build().get()
5253
assume(SparkTestUtils.checkSparkVersion("2.4.2"))
5354

@@ -80,6 +81,7 @@ class DeltaConnectorSuite extends AnyFunSuite {
8081
}
8182

8283
test("test Delta connector update") {
84+
org.apache.spark.sql.delta.DeltaLog.clearCache()
8385
val spark: SparkSession = new SparkSessionBuilder().setEnv("local").build().get()
8486
assume(SparkTestUtils.checkSparkVersion("2.4.2"))
8587

@@ -112,6 +114,7 @@ class DeltaConnectorSuite extends AnyFunSuite {
112114
}
113115

114116
test("test Delta connector vacuum") {
117+
org.apache.spark.sql.delta.DeltaLog.clearCache()
115118
val spark: SparkSession = new SparkSessionBuilder().setEnv("local")
116119
.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
117120
.build()
@@ -139,6 +142,7 @@ class DeltaConnectorSuite extends AnyFunSuite {
139142
}
140143

141144
test("test Delta connector delete") {
145+
org.apache.spark.sql.delta.DeltaLog.clearCache()
142146
val spark: SparkSession = new SparkSessionBuilder()
143147
.setEnv("local")
144148
.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
@@ -160,6 +164,7 @@ class DeltaConnectorSuite extends AnyFunSuite {
160164
}
161165

162166
test("Delta connector should push down filter and select") {
167+
org.apache.spark.sql.delta.DeltaLog.clearCache()
163168
val spark: SparkSession = new SparkSessionBuilder().setEnv("local")
164169
.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
165170
.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
@@ -188,6 +193,7 @@ class DeltaConnectorSuite extends AnyFunSuite {
188193
}
189194

190195
test("test Delta connector with different file path") {
196+
org.apache.spark.sql.delta.DeltaLog.clearCache()
191197
val spark: SparkSession = new SparkSessionBuilder().setEnv("local").build().get()
192198
assume(SparkTestUtils.checkSparkVersion("2.4.2"))
193199

@@ -207,6 +213,7 @@ class DeltaConnectorSuite extends AnyFunSuite {
207213
}
208214

209215
test("test Delta connector partition by") {
216+
org.apache.spark.sql.delta.DeltaLog.clearCache()
210217
val spark: SparkSession = new SparkSessionBuilder().setEnv("local").build().get()
211218
assume(SparkTestUtils.checkSparkVersion("2.4.2"))
212219

src/test/scala/io/github/setl/storage/repository/SparkRepositorySuite.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ class SparkRepositorySuite extends AnyFunSuite with Matchers {
261261
assert(df.count() === 8)
262262

263263
df.show()
264-
connector.delete()
264+
connector.drop()
265265
}
266266

267267
test("SparkRepository should handle column name changed by annotation while filtering") {
@@ -313,7 +313,7 @@ class SparkRepositorySuite extends AnyFunSuite with Matchers {
313313
findedBis4.collect() should contain theSameElementsAs findedBis3.collect()
314314
findedBis4.collect() should contain theSameElementsAs findedBis5.collect()
315315

316-
connector.delete()
316+
connector.drop()
317317
}
318318

319319
test("SparkRepository should compress columns with Compress annotation") {
@@ -356,7 +356,7 @@ class SparkRepositorySuite extends AnyFunSuite with Matchers {
356356
assertThrows[IllegalArgumentException](repo.findBy(Set(Condition("col1", "=", "haha"), Condition("col4", "=", "test"))))
357357
assertThrows[IllegalArgumentException](repo.findBy($"col4" === "test"))
358358
assertThrows[IllegalArgumentException](repo.findBy($"col4" === "test" && $"col1" === "haha"))
359-
connector.delete()
359+
connector.drop()
360360
}
361361

362362
test("SparkRepository should compress columns with Compress annotation with another Compressor") {
@@ -380,7 +380,7 @@ class SparkRepositorySuite extends AnyFunSuite with Matchers {
380380
// Write non compressed data
381381
connector.write(test.toDF())
382382
val sizeUncompressed = connector.getSize
383-
connector.delete()
383+
connector.drop()
384384

385385
// Write compressed data
386386
repo.save(test)
@@ -400,7 +400,7 @@ class SparkRepositorySuite extends AnyFunSuite with Matchers {
400400
assertThrows[IllegalArgumentException](repo.findBy(Set(Condition("col1", "=", "haha"), Condition("col4", "=", "test"))))
401401
assertThrows[IllegalArgumentException](repo.findBy($"col4" === "test"))
402402
assertThrows[IllegalArgumentException](repo.findBy($"col4" === "test" && $"col1" === "haha"))
403-
connector.delete()
403+
connector.drop()
404404
}
405405

406406
test("SparkRepository should cache read data unless there are new data be written") {
@@ -428,7 +428,7 @@ class SparkRepositorySuite extends AnyFunSuite with Matchers {
428428
val field = repoCached.getClass.getDeclaredField("readCache")
429429
field.setAccessible(true)
430430
assert(field.get(repoCached).asInstanceOf[DataFrame].storageLevel.useMemory)
431-
connector.delete()
431+
connector.drop()
432432

433433
// Test cache-disabled repository
434434
val repoNotCached = new SparkRepository[TestCompressionRepositoryGZIP].setConnector(connector)
@@ -440,7 +440,7 @@ class SparkRepositorySuite extends AnyFunSuite with Matchers {
440440
val field2 = repoCached.getClass.getDeclaredField("readCache")
441441
field2.setAccessible(true)
442442
assert(!field2.get(repoNotCached).asInstanceOf[DataFrame].storageLevel.useMemory)
443-
connector.delete()
443+
connector.drop()
444444
}
445445

446446
test("SparkRepository IO methods") {

0 commit comments

Comments
 (0)