Skip to content

Commit f11c355

Browse files
authored
[Delta-Standalone] Compute Snapshot protocol and metadata more efficiently (#533)
1 parent 5759de8 commit f11c355

File tree

5 files changed

+254
-41
lines changed

5 files changed

+254
-41
lines changed

standalone/src/main/scala/io/delta/standalone/internal/SnapshotImpl.scala

Lines changed: 88 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package io.delta.standalone.internal
1818

1919
import java.net.URI
20-
import java.util.concurrent.{Executors, ExecutorService}
2120

2221
import scala.collection.JavaConverters._
2322
import scala.collection.parallel.ExecutionContextTaskSupport
@@ -40,12 +39,30 @@ import io.delta.standalone.internal.logging.Logging
4039
import io.delta.standalone.internal.scan.{DeltaScanImpl, FilteredDeltaScanImpl}
4140
import io.delta.standalone.internal.util.{ConversionUtils, FileNames, JsonUtils}
4241

42+
/**
43+
* Contains the protocol, metadata, and corresponding table version. The protocol and metadata
44+
* will be used as the defaults in the Snapshot if no newer values are found in logs > `version`.
45+
*/
46+
case class SnapshotProtocolMetadataHint(protocol: Protocol, metadata: Metadata, version: Long)
47+
48+
/**
49+
* Visible for testing.
50+
*
51+
* Will contain various metrics collected while finding/loading the latest protocol and metadata
52+
* for this Snapshot. This can be used to verify that the minimal log replay occurred.
53+
*/
54+
case class ProtocolMetadataLoadMetrics(fileVersions: Seq[Long])
55+
4356
/**
4457
* Scala implementation of Java interface [[Snapshot]].
4558
*
4659
* @param timestamp The timestamp of the latest commit in milliseconds. Can also be set to -1 if the
4760
* timestamp of the commit is unknown or the table has not been initialized, i.e.
4861
* `version = -1`.
62+
* @param protocolMetadataHint The optional protocol, metadata, and table version that can be used
63+
* to speed up loading *this* Snapshot's protocol and metadata (P&M).
64+
* Essentially, when computing *this* Snapshot's P&M, we only need to
65+
* look at the log files *newer* than the hint version.
4966
*/
5067
private[internal] class SnapshotImpl(
5168
val hadoopConf: Configuration,
@@ -54,7 +71,14 @@ private[internal] class SnapshotImpl(
5471
val logSegment: LogSegment,
5572
val minFileRetentionTimestamp: Long,
5673
val deltaLog: DeltaLogImpl,
57-
val timestamp: Long) extends Snapshot with Logging {
74+
val timestamp: Long,
75+
protocolMetadataHint: Option[SnapshotProtocolMetadataHint] = Option.empty)
76+
extends Snapshot with Logging {
77+
78+
protocolMetadataHint.foreach { hint =>
79+
require(hint.version <= version, s"Cannot use a protocolMetadataHint with a version newer " +
80+
s"than that of this Snapshot. Hint version: ${hint.version}, Snapshot version: $version")
81+
}
5882

5983
import SnapshotImpl._
6084

@@ -124,36 +148,85 @@ private[internal] class SnapshotImpl(
124148
lazy val transactions: Map[String, Long] =
125149
setTransactionsScala.map(t => t.appId -> t.version).toMap
126150

127-
// These values need to be declared lazy. In Scala, strict values (i.e. non-lazy) in superclasses
128-
// (e.g. SnapshotImpl) are fully initialized before subclasses (e.g. InitialSnapshotImpl).
129-
// If these were 'strict', or 'eager', vals, then `loadTableProtocolAndMetadata` would be called
130-
// for all new InitialSnapshotImpl instances, causing an exception.
131-
lazy val (protocolScala, metadataScala) = loadTableProtocolAndMetadata()
151+
/**
152+
* protocolScala, metadataScala are internals APIs.
153+
* protocolMetadataLoadMetrics is visible for testing only.
154+
*
155+
* NOTE: These values need to be declared lazy. In Scala, strict values (i.e. non-lazy) in
156+
* superclasses (e.g. SnapshotImpl) are fully initialized before subclasses
157+
* (e.g. InitialSnapshotImpl). If these were 'strict', or 'eager', vals, then
158+
* `loadTableProtocolAndMetadata` would be called for all new InitialSnapshotImpl instances,
159+
* causing an exception.
160+
*/
161+
lazy val (protocolScala, metadataScala, protocolMetadataLoadMetrics) =
162+
loadTableProtocolAndMetadata()
163+
164+
private def loadTableProtocolAndMetadata(): (Protocol, Metadata, ProtocolMetadataLoadMetrics) = {
165+
val fileVersionsScanned = scala.collection.mutable.Set[Long]()
166+
def createMetrics = ProtocolMetadataLoadMetrics(fileVersionsScanned.toSeq.sorted.reverse)
132167

133-
private def loadTableProtocolAndMetadata(): (Protocol, Metadata) = {
134168
var protocol: Protocol = null
135169
var metadata: Metadata = null
170+
136171
val iter = memoryOptimizedLogReplay.getReverseIterator
137172

138173
try {
139174
// We replay logs from newest to oldest and will stop when we find the latest Protocol and
140-
// Metadata.
141-
iter.asScala.foreach { case (action, _) =>
175+
// Metadata (P&M).
176+
//
177+
// If the protocolMetadataHint is defined, then we will only look at log files strictly newer
178+
// (>) than the protocolMetadataHint's version. If we don't find any new P&M, then we will
179+
// default to those from the protocolMetadataHint.
180+
//
181+
// If the protocolMetadataHint is not defined, then we must look at all log files. If no
182+
// P&M is found, then we fail.
183+
iter.asScala.foreach { case (action, _, actionTableVersion) =>
184+
185+
// We have not yet found the latest P&M. If we had found BOTH, we would have returned
186+
// already. Note that we may have already found ONE of them.
187+
protocolMetadataHint.foreach { hint =>
188+
if (actionTableVersion == hint.version) {
189+
// Furthermore, we have already looked at all the actions in all the log files strictly
190+
// newer (>) than the hint version. Thus, we can short circuit early and use the P&M
191+
// from the hint.
192+
193+
val newestProtocol = if (protocol == null) {
194+
logInfo(s"Using the protocol from the protocolMetadataHint: ${hint.protocol}")
195+
hint.protocol
196+
} else {
197+
logInfo(s"Found a newer protocol: $protocol")
198+
protocol
199+
}
200+
201+
val newestMetadata = if (metadata == null) {
202+
logInfo(s"Using the metadata from the protocolMetadataHint: ${hint.metadata}")
203+
hint.metadata
204+
} else {
205+
logInfo(s"Found a newer metadata: $metadata")
206+
metadata
207+
}
208+
209+
return (newestProtocol, newestMetadata, createMetrics)
210+
}
211+
}
212+
213+
fileVersionsScanned += actionTableVersion
214+
142215
action match {
143216
case p: Protocol if null == protocol =>
144217
// We only need the latest protocol
145218
protocol = p
146219

147220
if (protocol != null && metadata != null) {
148221
// Stop since we have found the latest Protocol and metadata.
149-
return (protocol, metadata)
222+
return (protocol, metadata, createMetrics)
150223
}
151224
case m: Metadata if null == metadata =>
152225
metadata = m
153226

154227
if (protocol != null && metadata != null) {
155228
// Stop since we have found the latest Protocol and metadata.
156-
return (protocol, metadata)
229+
return (protocol, metadata, createMetrics)
157230
}
158231
case _ => // do nothing
159232
}
@@ -330,6 +403,9 @@ private class InitialSnapshotImpl(
330403

331404
override lazy val metadataScala: Metadata = Metadata()
332405

406+
override lazy val protocolMetadataLoadMetrics: ProtocolMetadataLoadMetrics =
407+
ProtocolMetadataLoadMetrics(Seq.empty)
408+
333409
override def scan(): DeltaScan = new DeltaScanImpl(memoryOptimizedLogReplay)
334410

335411
override def scan(predicate: Expression): DeltaScan =

standalone/src/main/scala/io/delta/standalone/internal/SnapshotManagement.scala

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,18 @@ private[internal] trait SnapshotManagement { self: DeltaLogImpl =>
6464
*/
6565
private def updateInternal(): SnapshotImpl = {
6666
try {
67-
val segment = getLogSegmentForVersion(currentSnapshot.logSegment.checkpointVersion)
68-
if (segment != currentSnapshot.logSegment) {
69-
val startingFrom = segment.checkpointVersion
67+
val newSegment = getLogSegmentForVersion(
68+
startCheckpoint = currentSnapshot.logSegment.checkpointVersion)
69+
if (newSegment != currentSnapshot.logSegment) {
70+
val startingFrom = newSegment.checkpointVersion
7071
.map(v => s" starting from checkpoint version $v.").getOrElse(".")
71-
logInfo(s"Loading version ${segment.version}$startingFrom")
72+
logInfo(s"Loading version ${newSegment.version}$startingFrom")
7273

73-
val newSnapshot = createSnapshot(segment, segment.lastCommitTimestamp)
74+
val newSnapshot = createSnapshot(
75+
newSegment,
76+
newSegment.lastCommitTimestamp,
77+
previousSnapshotOpt = Some(currentSnapshot) // We are updating to the newSegment!
78+
)
7479

7580
if (currentSnapshot.version > -1 &&
7681
currentSnapshot.metadataScala.id != newSnapshot.metadataScala.id) {
@@ -221,7 +226,11 @@ private[internal] trait SnapshotManagement { self: DeltaLogImpl =>
221226
.map(v => s" starting from checkpoint $v.").getOrElse(".")
222227
logInfo(s"Loading version ${logSegment.version}$startCheckpoint")
223228

224-
val snapshot = createSnapshot(logSegment, logSegment.lastCommitTimestamp)
229+
val snapshot = createSnapshot(
230+
logSegment,
231+
logSegment.lastCommitTimestamp,
232+
previousSnapshotOpt = None // This is the `init`. There's no previous snapshot.
233+
)
225234

226235
logInfo(s"Returning initial snapshot $snapshot")
227236

@@ -240,21 +249,46 @@ private[internal] trait SnapshotManagement { self: DeltaLogImpl =>
240249
val startingCheckpoint = findLastCompleteCheckpoint(CheckpointInstance(version, None))
241250
val segment = getLogSegmentForVersion(startingCheckpoint.map(_.version), Some(version))
242251

252+
// In practice, this will always be None because all callers of this method have already called
253+
// deltaLog.update() (to determine the full list of versions, to understand the full history).
254+
// Thus, `snapshot.version` will always be > version. (If they were equal, we would have already
255+
// returned early above).
256+
val previousSnapshotOpt =
257+
if (currentSnapshot.version <= version) Some(currentSnapshot) else None
258+
243259
createSnapshot(
244260
segment,
245-
segment.lastCommitTimestamp
261+
segment.lastCommitTimestamp,
262+
previousSnapshotOpt
246263
)
247264
}
248265

249-
private def createSnapshot(segment: LogSegment, lastCommitTimestamp: Long): SnapshotImpl = {
266+
private def createSnapshot(
267+
segment: LogSegment,
268+
lastCommitTimestamp: Long,
269+
previousSnapshotOpt: Option[SnapshotImpl]): SnapshotImpl = {
270+
271+
previousSnapshotOpt.foreach { previousSnapshot =>
272+
assert(
273+
previousSnapshot.version <= segment.version,
274+
s"Trying to create a Snapshot at version ${segment.version} yet you are passing a " +
275+
s"newer `previousSnapshotOpt` with version ${previousSnapshot.version}."
276+
)
277+
}
278+
250279
new SnapshotImpl(
251280
hadoopConf,
252281
logPath,
253282
segment.version,
254283
segment,
255284
minFileRetentionTimestamp,
256285
this,
257-
lastCommitTimestamp)
286+
lastCommitTimestamp,
287+
previousSnapshotOpt.map { previousSnapshot =>
288+
SnapshotProtocolMetadataHint(
289+
previousSnapshot.protocolScala, previousSnapshot.metadataScala, previousSnapshot.version)
290+
}
291+
)
258292
}
259293

260294
private def verifyDeltaVersions(versions: Array[Long]): Unit = {

standalone/src/main/scala/io/delta/standalone/internal/actions/MemoryOptimizedLogReplay.scala

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import io.delta.storage.{CloseableIterator, LogStore}
2323
import org.apache.hadoop.conf.Configuration
2424
import org.apache.hadoop.fs.Path
2525

26-
import io.delta.standalone.internal.util.JsonUtils
26+
import io.delta.standalone.internal.util.{FileNames, JsonUtils}
2727

2828
/**
2929
* Used to replay the transaction logs from the newest log file to the oldest log file, in a
@@ -36,28 +36,30 @@ private[internal] class MemoryOptimizedLogReplay(
3636
timeZone: TimeZone) {
3737

3838
/**
39-
* @return a [[CloseableIterator]] of tuple (Action, isLoadedFromCheckpoint) in reverse
40-
* transaction log order
39+
* @return a [[CloseableIterator]] of tuple (Action, isLoadedFromCheckpoint, tableVersion) in
40+
* reverse transaction log order
4141
*/
42-
def getReverseIterator: CloseableIterator[(Action, Boolean)] =
43-
new CloseableIterator[(Action, Boolean)] {
42+
def getReverseIterator: CloseableIterator[(Action, Boolean, Long)] =
43+
new CloseableIterator[(Action, Boolean, Long)] {
4444
private val reverseFilesIter: Iterator[Path] = files.sortWith(_.getName > _.getName).iterator
45-
private var actionIter: Option[CloseableIterator[(Action, Boolean)]] = None
45+
private var actionIter: Option[CloseableIterator[(Action, Boolean, Long)]] = None
4646

4747
/**
4848
* Requires that `reverseFilesIter.hasNext` is true
4949
*/
50-
private def getNextIter: Option[CloseableIterator[(Action, Boolean)]] = {
50+
private def getNextIter: Option[CloseableIterator[(Action, Boolean, Long)]] = {
5151
val nextFile = reverseFilesIter.next()
5252

5353
if (nextFile.getName.endsWith(".json")) {
54-
Some(new CustomJsonIterator(logStore.read(nextFile, hadoopConf)))
54+
val fileVersion = FileNames.deltaVersion(nextFile)
55+
Some(new CustomJsonIterator(logStore.read(nextFile, hadoopConf), fileVersion))
5556
} else if (nextFile.getName.endsWith(".parquet")) {
57+
val fileVersion = FileNames.checkpointVersion(nextFile)
5658
val parquetIterable = ParquetReader.read[Parquet4sSingleActionWrapper](
5759
nextFile.toString,
5860
ParquetReader.Options(timeZone, hadoopConf)
5961
)
60-
Some(new CustomParquetIterator(parquetIterable))
62+
Some(new CustomParquetIterator(parquetIterable, fileVersion))
6163
} else {
6264
throw new IllegalStateException(s"unexpected log file path: $nextFile")
6365
}
@@ -94,7 +96,7 @@ private[internal] class MemoryOptimizedLogReplay(
9496
actionIter.isDefined
9597
}
9698

97-
override def next(): (Action, Boolean) = {
99+
override def next(): (Action, Boolean, Long) = {
98100
if (!hasNext()) throw new NoSuchElementException
99101

100102
if (actionIter.isEmpty) throw new IllegalStateException("Impossible")
@@ -112,27 +114,29 @@ private[internal] class MemoryOptimizedLogReplay(
112114
// Helper Classes
113115
///////////////////////////////////////////////////////////////////////////
114116

115-
private class CustomJsonIterator(iter: CloseableIterator[String])
116-
extends CloseableIterator[(Action, Boolean)] {
117+
private class CustomJsonIterator(iter: CloseableIterator[String], version: Long)
118+
extends CloseableIterator[(Action, Boolean, Long)] {
117119

118120
override def hasNext: Boolean = iter.hasNext
119121

120-
override def next(): (Action, Boolean) = {
121-
(JsonUtils.mapper.readValue[SingleAction](iter.next()).unwrap, false)
122+
override def next(): (Action, Boolean, Long) = {
123+
(JsonUtils.mapper.readValue[SingleAction](iter.next()).unwrap, false, version)
122124
}
123125

124126
override def close(): Unit = iter.close()
125127
}
126128

127-
private class CustomParquetIterator(iterable: ParquetIterable[Parquet4sSingleActionWrapper])
128-
extends CloseableIterator[(Action, Boolean)] {
129+
private class CustomParquetIterator(
130+
iterable: ParquetIterable[Parquet4sSingleActionWrapper],
131+
version: Long)
132+
extends CloseableIterator[(Action, Boolean, Long)] {
129133

130134
private val iter = iterable.iterator
131135

132136
override def hasNext: Boolean = iter.hasNext
133137

134-
override def next(): (Action, Boolean) = {
135-
(iter.next().unwrap.unwrap, true)
138+
override def next(): (Action, Boolean, Long) = {
139+
(iter.next().unwrap.unwrap, true, version)
136140
}
137141

138142
override def close(): Unit = iterable.close()

standalone/src/main/scala/io/delta/standalone/internal/scan/DeltaScanImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ private[internal] class DeltaScanImpl(replay: MemoryOptimizedLogReplay) extends
8282
*/
8383
private def findNextValid(): Option[AddFile] = {
8484
while (iter.hasNext) {
85-
val (action, isCheckpoint) = iter.next()
85+
val (action, isCheckpoint, _) = iter.next()
8686

8787
action match {
8888
case add: AddFile =>

0 commit comments

Comments
 (0)