1717package io .delta .standalone .internal
1818
1919import java .net .URI
20- import java .util .concurrent .{Executors , ExecutorService }
2120
2221import scala .collection .JavaConverters ._
2322import scala .collection .parallel .ExecutionContextTaskSupport
@@ -40,12 +39,30 @@ import io.delta.standalone.internal.logging.Logging
4039import io .delta .standalone .internal .scan .{DeltaScanImpl , FilteredDeltaScanImpl }
4140import io .delta .standalone .internal .util .{ConversionUtils , FileNames , JsonUtils }
4241
42+ /**
43+ * Contains the protocol, metadata, and corresponding table version. The protocol and metadata
44+ * will be used as the defaults in the Snapshot if no newer values are found in logs > `version`.
45+ */
46+ case class SnapshotProtocolMetadataHint (protocol : Protocol , metadata : Metadata , version : Long )
47+
48+ /**
49+ * Visible for testing.
50+ *
51+ * Will contain various metrics collected while finding/loading the latest protocol and metadata
52+ * for this Snapshot. This can be used to verify that the minimal log replay occurred.
53+ */
54+ case class ProtocolMetadataLoadMetrics (fileVersions : Seq [Long ])
55+
4356/**
4457 * Scala implementation of Java interface [[Snapshot ]].
4558 *
4659 * @param timestamp The timestamp of the latest commit in milliseconds. Can also be set to -1 if the
4760 * timestamp of the commit is unknown or the table has not been initialized, i.e.
4861 * `version = -1`.
62+ * @param protocolMetadataHint The optional protocol, metadata, and table version that can be used
63+ * to speed up loading *this* Snapshot's protocol and metadata (P&M).
64+ * Essentially, when computing *this* Snapshot's P&M, we only need to
65+ * look at the log files *newer* than the hint version.
4966 */
5067private [internal] class SnapshotImpl (
5168 val hadoopConf : Configuration ,
@@ -54,7 +71,14 @@ private[internal] class SnapshotImpl(
5471 val logSegment : LogSegment ,
5572 val minFileRetentionTimestamp : Long ,
5673 val deltaLog : DeltaLogImpl ,
57- val timestamp : Long ) extends Snapshot with Logging {
74+ val timestamp : Long ,
75+ protocolMetadataHint : Option [SnapshotProtocolMetadataHint ] = Option .empty)
76+ extends Snapshot with Logging {
77+
78+ protocolMetadataHint.foreach { hint =>
79+ require(hint.version <= version, s " Cannot use a protocolMetadataHint with a version newer " +
80+ s " than that of this Snapshot. Hint version: ${hint.version}, Snapshot version: $version" )
81+ }
5882
5983 import SnapshotImpl ._
6084
@@ -124,36 +148,85 @@ private[internal] class SnapshotImpl(
124148 lazy val transactions : Map [String , Long ] =
125149 setTransactionsScala.map(t => t.appId -> t.version).toMap
126150
127- // These values need to be declared lazy. In Scala, strict values (i.e. non-lazy) in superclasses
128- // (e.g. SnapshotImpl) are fully initialized before subclasses (e.g. InitialSnapshotImpl).
129- // If these were 'strict', or 'eager', vals, then `loadTableProtocolAndMetadata` would be called
130- // for all new InitialSnapshotImpl instances, causing an exception.
131- lazy val (protocolScala, metadataScala) = loadTableProtocolAndMetadata()
151+ /**
152+ * protocolScala, metadataScala are internals APIs.
153+ * protocolMetadataLoadMetrics is visible for testing only.
154+ *
155+ * NOTE: These values need to be declared lazy. In Scala, strict values (i.e. non-lazy) in
156+ * superclasses (e.g. SnapshotImpl) are fully initialized before subclasses
157+ * (e.g. InitialSnapshotImpl). If these were 'strict', or 'eager', vals, then
158+ * `loadTableProtocolAndMetadata` would be called for all new InitialSnapshotImpl instances,
159+ * causing an exception.
160+ */
161+ lazy val (protocolScala, metadataScala, protocolMetadataLoadMetrics) =
162+ loadTableProtocolAndMetadata()
163+
164+ private def loadTableProtocolAndMetadata (): (Protocol , Metadata , ProtocolMetadataLoadMetrics ) = {
165+ val fileVersionsScanned = scala.collection.mutable.Set [Long ]()
166+ def createMetrics = ProtocolMetadataLoadMetrics (fileVersionsScanned.toSeq.sorted.reverse)
132167
133- private def loadTableProtocolAndMetadata (): (Protocol , Metadata ) = {
134168 var protocol : Protocol = null
135169 var metadata : Metadata = null
170+
136171 val iter = memoryOptimizedLogReplay.getReverseIterator
137172
138173 try {
139174 // We replay logs from newest to oldest and will stop when we find the latest Protocol and
140- // Metadata.
141- iter.asScala.foreach { case (action, _) =>
175+ // Metadata (P&M).
176+ //
177+ // If the protocolMetadataHint is defined, then we will only look at log files strictly newer
178+ // (>) than the protocolMetadataHint's version. If we don't find any new P&M, then we will
179+ // default to those from the protocolMetadataHint.
180+ //
181+ // If the protocolMetadataHint is not defined, then we must look at all log files. If no
182+ // P&M is found, then we fail.
183+ iter.asScala.foreach { case (action, _, actionTableVersion) =>
184+
185+ // We have not yet found the latest P&M. If we had found BOTH, we would have returned
186+ // already. Note that we may have already found ONE of them.
187+ protocolMetadataHint.foreach { hint =>
188+ if (actionTableVersion == hint.version) {
189+ // Furthermore, we have already looked at all the actions in all the log files strictly
190+ // newer (>) than the hint version. Thus, we can short circuit early and use the P&M
191+ // from the hint.
192+
193+ val newestProtocol = if (protocol == null ) {
194+ logInfo(s " Using the protocol from the protocolMetadataHint: ${hint.protocol}" )
195+ hint.protocol
196+ } else {
197+ logInfo(s " Found a newer protocol: $protocol" )
198+ protocol
199+ }
200+
201+ val newestMetadata = if (metadata == null ) {
202+ logInfo(s " Using the metadata from the protocolMetadataHint: ${hint.metadata}" )
203+ hint.metadata
204+ } else {
205+ logInfo(s " Found a newer metadata: $metadata" )
206+ metadata
207+ }
208+
209+ return (newestProtocol, newestMetadata, createMetrics)
210+ }
211+ }
212+
213+ fileVersionsScanned += actionTableVersion
214+
142215 action match {
143216 case p : Protocol if null == protocol =>
144217 // We only need the latest protocol
145218 protocol = p
146219
147220 if (protocol != null && metadata != null ) {
148221 // Stop since we have found the latest Protocol and metadata.
149- return (protocol, metadata)
222+ return (protocol, metadata, createMetrics )
150223 }
151224 case m : Metadata if null == metadata =>
152225 metadata = m
153226
154227 if (protocol != null && metadata != null ) {
155228 // Stop since we have found the latest Protocol and metadata.
156- return (protocol, metadata)
229+ return (protocol, metadata, createMetrics )
157230 }
158231 case _ => // do nothing
159232 }
@@ -330,6 +403,9 @@ private class InitialSnapshotImpl(
330403
331404 override lazy val metadataScala : Metadata = Metadata ()
332405
406+ override lazy val protocolMetadataLoadMetrics : ProtocolMetadataLoadMetrics =
407+ ProtocolMetadataLoadMetrics (Seq .empty)
408+
333409 override def scan (): DeltaScan = new DeltaScanImpl (memoryOptimizedLogReplay)
334410
335411 override def scan (predicate : Expression ): DeltaScan =
0 commit comments