Skip to content

Commit 8b4b6cc

Browse files
authored
Upgrade delta-sharing-client to 1.0.5 (#2955)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [X] Other (Delta Sharing) ## Description Upgrade delta-sharing-client to 1.0.5 ## How was this patch tested? Unit Tests ## Does this PR introduce _any_ user-facing changes? No
1 parent 3c09d95 commit 8b4b6cc

File tree

3 files changed

+30
-2
lines changed

3 files changed

+30
-2
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ lazy val sharing = (project in file("sharing"))
310310
libraryDependencies ++= Seq(
311311
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "provided",
312312

313-
"io.delta" %% "delta-sharing-client" % "1.0.4",
313+
"io.delta" %% "delta-sharing-client" % "1.0.5",
314314

315315
// Test deps
316316
"org.scalatest" %% "scalatest" % scalaTestVersion % "test",

sharing/src/main/scala/io/delta/sharing/spark/DeltaFormatSharingSource.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.spark.sql.delta.sources.{
3434
DeltaSourceOffset
3535
}
3636
import io.delta.sharing.client.DeltaSharingClient
37+
import io.delta.sharing.client.util.ConfUtils
3738
import io.delta.sharing.client.model.{Table => DeltaSharingTable}
3839

3940
import org.apache.spark.delta.sharing.CachedTableManager
@@ -155,7 +156,17 @@ case class DeltaFormatSharingSource(
155156
private var lastTimestampForGetVersionFromServer: Long = -1
156157

157158
// The minimum gap between two getTableVersion rpcs, to avoid a high traffic load to the server.
158-
private val QUERY_TABLE_VERSION_INTERVAL_MILLIS = TimeUnit.SECONDS.toMillis(30)
159+
private val QUERY_TABLE_VERSION_INTERVAL_MILLIS = {
160+
val intervalSeconds = ConfUtils.MINIMUM_TABLE_VERSION_INTERVAL_SECONDS.max(
161+
ConfUtils.streamingQueryTableVersionIntervalSeconds(spark.sessionState.conf)
162+
)
163+
logInfo(s"Configured queryTableVersionIntervalSeconds:${intervalSeconds}.")
164+
if (intervalSeconds < ConfUtils.MINIMUM_TABLE_VERSION_INTERVAL_SECONDS) {
165+
throw new IllegalArgumentException(s"QUERY_TABLE_VERSION_INTERVAL_MILLIS($intervalSeconds) " +
166+
s"must not be less than ${ConfUtils.MINIMUM_TABLE_VERSION_INTERVAL_SECONDS} seconds.")
167+
}
168+
TimeUnit.SECONDS.toMillis(intervalSeconds)
169+
}
159170

160171
// Maximum number of versions of getFiles() rpc when fetching files from the server. Used to
161172
// reduce the number of files returned to avoid timeout of the rpc on the server.

sharing/src/test/scala/io/delta/sharing/spark/DeltaFormatSharingSourceSuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,23 @@ class DeltaFormatSharingSourceSuite
215215
.load(tablePath)
216216
.filter($"value" contains "keep")
217217

218+
spark.sessionState.conf.setConfString(
219+
"spark.delta.sharing.streaming.queryTableVersionIntervalSeconds",
220+
"9s"
221+
)
222+
val e = intercept[Exception] {
223+
testStream(df)(
224+
AssertOnQuery { q =>
225+
q.processAllAvailable(); true
226+
}
227+
)
228+
}
229+
assert(e.getMessage.contains("must not be less than 10 seconds"))
230+
231+
spark.sessionState.conf.setConfString(
232+
"spark.delta.sharing.streaming.queryTableVersionIntervalSeconds",
233+
"10s"
234+
)
218235
testStream(df)(
219236
AssertOnQuery { q =>
220237
q.processAllAvailable(); true

0 commit comments

Comments
 (0)