温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

spark delta如何读数据

发布时间:2021-12-16 16:14:02 来源:亿速云 阅读:166 作者:小新 栏目:大数据

小编给大家分享一下spark delta如何读数据,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

分析

spark 的delta datasource的构建要从DataSource.lookupDataSourceV2开始,之后会流向到loadV1Source,这里会进行dataSource.createRelation进行构建datasource的Relation的构建,直接转到deltaDataSource 的createRelation:

override def createRelation(       sqlContext: SQLContext,       parameters: Map[String, String]): BaseRelation = {     val maybePath = parameters.getOrElse("path", {       throw DeltaErrors.pathNotSpecifiedException     })     // Log any invalid options that are being passed in     DeltaOptions.verifyOptions(CaseInsensitiveMap(parameters))     val timeTravelByParams = DeltaDataSource.getTimeTravelVersion(parameters)     DeltaTableV2(       sqlContext.sparkSession,       new Path(maybePath),       timeTravelOpt = timeTravelByParams).toBaseRelation   }
  1. DeltaOptions.verifyOptions进行参数校验,有效的参数如下:

val validOptionKeys : Set[String] = Set(     REPLACE_WHERE_OPTION,     MERGE_SCHEMA_OPTION,     EXCLUDE_REGEX_OPTION,     OVERWRITE_SCHEMA_OPTION,     USER_METADATA_OPTION,     MAX_FILES_PER_TRIGGER_OPTION,     IGNORE_FILE_DELETION_OPTION,     IGNORE_CHANGES_OPTION,     IGNORE_DELETES_OPTION,     OPTIMIZE_WRITE_OPTION,     DATA_CHANGE_OPTION,     "queryName",     "checkpointLocation",     "path",     "timestampAsOf",     "versionAsOf"   )
  1. DeltaDataSource.getTimeTravelVersion根据指定的timestampAsOf或者versionAsOf获取指定的版本

  2. 直接调用DeltaTableV2的toBaseRelation方法:

def toBaseRelation: BaseRelation = {     if (deltaLog.snapshot.version == -1) {       val id = catalogTable.map(ct => DeltaTableIdentifier(table = Some(ct.identifier)))         .getOrElse(DeltaTableIdentifier(path = Some(path.toString)))       throw DeltaErrors.notADeltaTableException(id)     }     val partitionPredicates = DeltaDataSource.verifyAndCreatePartitionFilters(       path.toString, deltaLog.snapshot, partitionFilters)     // TODO(burak): We should pass in the snapshot here     deltaLog.createRelation(partitionPredicates, timeTravelSpec)   }
  • 如果存在分区,则DeltaDataSource.verifyAndCreatePartitionFilter创建partitionPredicates

  • timeTravelSpec,这里优先选择用户指定的timeTravelByParams,否则通过DeltaDataSource.parsePathIdentifier选择path指定的version,格式如:/some/path/partition=1@v1234 或者/some/path/partition=1@yyyyMMddHHmmssSSS

  • 直接调用deltaLog.createRelation:

    def createRelation(    partitionFilters: Seq[Expression] = Nil,    timeTravel: Option[DeltaTimeTravelSpec] = None): BaseRelation = {  val versionToUse = timeTravel.map { tt =>    val (version, accessType) = DeltaTableUtils.resolveTimeTravelVersion(      spark.sessionState.conf, this, tt)    val source = tt.creationSource.getOrElse("unknown")    recordDeltaEvent(this, s"delta.timeTravel.$source", data = Map(      "tableVersion" -> snapshot.version,      "queriedVersion" -> version,      "accessType" -> accessType    ))    version  }  /** Used to link the files present in the table into the query planner. */  val snapshotToUse = versionToUse.map(getSnapshotAt(_)).getOrElse(snapshot)  val fileIndex = TahoeLogFileIndex(    spark, this, dataPath, snapshotToUse.metadata.schema, partitionFilters, versionToUse)  new HadoopFsRelation(    fileIndex,    partitionSchema = snapshotToUse.metadata.partitionSchema,    dataSchema = snapshotToUse.metadata.schema,    bucketSpec = None,    snapshotToUse.fileFormat,    snapshotToUse.metadata.format.options)(spark) with InsertableRelation {    def insert(data: DataFrame, overwrite: Boolean): Unit = {      val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append      WriteIntoDelta(        deltaLog = DeltaLog.this,        mode = mode,        new DeltaOptions(Map.empty[String, String], spark.sessionState.conf),        partitionColumns = Seq.empty,        configuration = Map.empty,        data = data).run(spark)    }  }

     

    override def inputFiles: Array[String] = { getSnapshot(stalenessAcceptable = false).filesForScan(   projection = Nil, partitionFilters).files.map(f => absolutePath(f.path).toString).toArray }


    该方法调用了snapshot的filesForScan方法:

    def filesForScan(projection: Seq[Attribute], filters: Seq[Expression]): DeltaScan = { implicit val enc = SingleAction.addFileEncoder val partitionFilters = filters.flatMap { filter =>   DeltaTableUtils.splitMetadataAndDataPredicates(filter, metadata.partitionColumns, spark)._1 } val files = DeltaLog.filterFileList(   metadata.partitionSchema,   allFiles.toDF(),   partitionFilters).as[AddFile].collect() DeltaScan(version = version, files, null, null, null)(null, null, null, null) }


    • . 通过指定版本获取对应的snapshot

    • . 构建TahoeLogFileIndex,因为这里构建的是HadoopFsRelation,所以我们关注TahoeLogFileIndex的inputfiles方法:

通过之前文章的分析,我们直到deltalog记录了AddFile和Remove记录,那现在读数据怎么读取呢?全部在allFiles方法。
重点看一下:allFiles方法:

def allFiles: Dataset[AddFile] = {  val implicits = spark.implicits  import implicits._  state.where("add IS NOT NULL").select($"add".as[AddFile])  }

这里调用了state方法,而它又调用了stateReconstruction方法,

private lazy val cachedState =  cacheDS(stateReconstruction, s"Delta Table State #$version - $redactedPath")  /** The current set of actions in this [[Snapshot]]. */  def state: Dataset[SingleAction] = cachedState.getDS

stateReconstruction方法在checkpoint的时用到了,在这里也用到了,主要是重新构造文件状态,合并AddFile和RemoveFile:

private def stateReconstruction: Dataset[SingleAction] = {  ...  loadActions.mapPartitions { actions =>      val hdpConf = hadoopConf.value.value      actions.flatMap {        _.unwrap match {          case add: AddFile => Some(add.copy(path = canonicalizePath(add.path, hdpConf)).wrap)          case rm: RemoveFile => Some(rm.copy(path = canonicalizePath(rm.path, hdpConf)).wrap)          case other if other == null => None          case other => Some(other.wrap)        }      }     }    ...    .mapPartitions { iter =>      val state = new InMemoryLogReplay(time)      state.append(0, iter.map(_.unwrap))      state.checkpoint.map(_.wrap)    }   }

而关键在于InMemoryLogReplay的append方法和checkpoint方法,这里做到了文件状态的合并:

  assert(currentVersion == -1 || version == currentVersion + 1,    s"Attempted to replay version $version, but state is at $currentVersion")  currentVersion = version  actions.foreach {    case a: SetTransaction =>      transactions(a.appId) = a    case a: Metadata =>      currentMetaData = a    case a: Protocol =>      currentProtocolVersion = a    case add: AddFile =>      activeFiles(add.pathAsUri) = add.copy(dataChange = false)      // Remove the tombstone to make sure we only output one `FileAction`.      tombstones.remove(add.pathAsUri)    case remove: RemoveFile =>      activeFiles.remove(remove.pathAsUri)      tombstones(remove.pathAsUri) = remove.copy(dataChange = false)    case ci: CommitInfo => // do nothing    case null => // Some crazy future feature. Ignore   }  }

重点就在case add: AddFile和 case remove: RemoveFile处理以及checkpoint方法,能够很好的合并文件状态。

再调用collect方法,返回DeltaScan,之后获取文件路径作为要处理的文件路径。

  • 把TahoeLogFileIndex传入HadoopFsRelation得到最后的BaseRelation 返回

注意:spark读取delta格式整个流程和spark读取其他数据格式流程一致,主要区别在于读取数据之前,会把文件状态在内存中进行一次合并,这样只需要读取文件状态为Addfile的就行了

以上是“spark delta如何读数据”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI