温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

spark 3.0.1集成delta 0.7.0之delta如何进行DDL操作

发布时间:2021-12-16 16:15:24 来源:亿速云 阅读:191 作者:小新 栏目:大数据

小编给大家分享一下spark 3.0.1集成delta 0.7.0之delta如何进行DDL操作,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

分析

delta在0.7.0以前是不能够进行save表操作的,只能存储到文件中,也就是说他的元数据是和spark的其他元数据是分开的,delta是独立存在的,也是不能和其他表进行关联操作的,只有到了delta 0.7.0版本以后,才真正意义上和spark进行了集成,这也得益于spark 3.x的Catalog plugin API 特性。
还是先从delta的configurate sparksession入手,如下:

import org.apache.spark.sql.SparkSession val spark = SparkSession   .builder()   .appName("...")   .master("...")   .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")   .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")   .getOrCreate()

对于第二个配置 config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 从spark configuration,我们可以看到对该spark.sql.catalog.spark_catalog的解释是

A catalog implementation that will be used as the v2 interface to Spark's built-in v1 catalog: spark_catalog. This catalog shares its identifier namespace with the spark_catalog and must be consistent with it; for example, if a table can be loaded by the spark_catalog, this catalog must also return the table metadata. To delegate operations to the spark_catalog, implementations can extend 'CatalogExtension'.

也就是说,通过该配置可以实现元数据的统一性,其实这也是spark社区和delta社区进行交互的一种结果

spark 3.x的Catalog plugin API

为了能搞懂delta为什么能够进行DDL和DML操作,就得先知道spark 3.x的Catalog plugin机制SPARK-31121.

首先是interface CatalogPlugin,该接口是catalog plugin的顶级接口,正如注释所说:

 * A marker interface to provide a catalog implementation for Spark.  * <p>  * Implementations can provide catalog functions by implementing additional interfaces for tables,  * views, and functions.  * <p>  * Catalog implementations must implement this marker interface to be loaded by  * {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog classes using the  * required public no-arg constructor. After creating an instance, it will be configured by calling  * {@link #initialize(String, CaseInsensitiveStringMap)}.  * <p>  * Catalog implementations are registered to a name by adding a configuration option to Spark:  * {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties  * in the Spark configuration that share the catalog name prefix,  * {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the case insensitive  * string map of options in initialization with the prefix removed.  * {@code name}, is also passed and is the catalog's name; in this case, "catalog-name".

可以通过spark.sql.catalog.catalog-name=com.example.YourCatalogClass集成到spark中
该类的实现还可以集成其他额外的tables views functions的接口,这里就得提到接口TableCatalog,该类提供了与tables相关的方法:

/**    * List the tables in a namespace from the catalog.    * <p>    * If the catalog supports views, this must return identifiers for only tables and not views.    *    * @param namespace a multi-part namespace    * @return an array of Identifiers for tables    * @throws NoSuchNamespaceException If the namespace does not exist (optional).    */   Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException;   /**    * Load table metadata by {@link Identifier identifier} from the catalog.    * <p>    * If the catalog supports views and contains a view for the identifier and not a table, this    * must throw {@link NoSuchTableException}.    *    * @param ident a table identifier    * @return the table's metadata    * @throws NoSuchTableException If the table doesn't exist or is a view    */   Table loadTable(Identifier ident) throws NoSuchTableException;

这样就可以基于TableCatalog开发自己的catalog,从而实现multi-catalog support

还得有个接口DelegatingCatalogExtension,这是个实现了CatalogExtension接口的抽象类,而CatalogExtension继承了TableCatalog, SupportsNamespaces。DeltaCatalog实现了DelegatingCatalogExtension ,这部分后续进行分析。
最后还有一个class CatalogManager,这个类是用来管理CatalogPlugins的,且是线程安全的:

/**  * A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered catalogs, and allow  * the caller to look up a catalog by name.  *  * There are still many commands (e.g. ANALYZE TABLE) that do not support v2 catalog API. They  * ignore the current catalog and blindly go to the v1 `SessionCatalog`. To avoid tracking current  * namespace in both `SessionCatalog` and `CatalogManger`, we let `CatalogManager` to set/get  * current database of `SessionCatalog` when the current catalog is the session catalog.  */ // TODO: all commands should look up table from the current catalog. The `SessionCatalog` doesn't //       need to track current database at all. private[sql] class CatalogManager(     conf: SQLConf,     defaultSessionCatalog: CatalogPlugin,     val v1SessionCatalog: SessionCatalog) extends Logging {

我们看到CatalogManager管理了v2版本的 CatalogPlugin和v1版本的sessionCatalog,这个是因为历史的原因导致必须得兼容v1版本

那CatalogManager在哪里被调用呢。 我们看一下BaseSessionStateBuilder ,可以看到该类中才是正宗使用CatalogManager的地方:

/**    * Catalog for managing table and database states. If there is a pre-existing catalog, the state    * of that catalog (temp tables & current database) will be copied into the new catalog.    *    * Note: this depends on the `conf`, `functionRegistry` and `sqlParser` fields.    */   protected lazy val catalog: SessionCatalog = {     val catalog = new SessionCatalog(       () => session.sharedState.externalCatalog,       () => session.sharedState.globalTempViewManager,       functionRegistry,       conf,       SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf),       sqlParser,       resourceLoader)     parentState.foreach(_.catalog.copyStateTo(catalog))     catalog   }   protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog, conf)   protected lazy val catalogManager = new CatalogManager(conf, v2SessionCatalog, catalog)

SessionCatalog 是v1版本的,主要是跟底层的元数据存储通信,以及管理临时视图,udf的,这一部分暂时不分析,重点放到v2版本的sessionCatalog, 我们看一下V2SessionCatalog:

/**  * A [[TableCatalog]] that translates calls to the v1 SessionCatalog.  */ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf)   extends TableCatalog with SupportsNamespaces {   import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper   import V2SessionCatalog._   override val defaultNamespace: Array[String] = Array("default")   override def name: String = CatalogManager.SESSION_CATALOG_NAME   // This class is instantiated by Spark, so `initialize` method will not be called.   override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {}   override def listTables(namespace: Array[String]): Array[Identifier] = {     namespace match {       case Array(db) =>         catalog           .listTables(db)           .map(ident => Identifier.of(Array(ident.database.getOrElse("")), ident.table))           .toArray       case _ =>         throw new NoSuchNamespaceException(namespace)     }   }

我们分析一下listTables方法可知,v2的sessionCatalog操作 都是委托给了v1版本的sessionCatalog去操作的,其他的方法也是一样, 而且name默认为CatalogManager.SESSION_CATALOG_NAME,也就是spark_catalog,这里后面也会提到,注意一下。 而且,catalogmanager在逻辑计划中的分析器和优化器中也会用到,因为会用到其中的元数据:

protected def analyzer: Analyzer = new Analyzer(catalogManager, conf) { ... protected def optimizer: Optimizer = {     new SparkOptimizer(catalogManager, catalog, experimentalMethods) {       override def earlyScanPushDownRules: Seq[Rule[LogicalPlan]] =         super.earlyScanPushDownRules ++ customEarlyScanPushDownRules       override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] =         super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules     }   }

而analyzer和optimizer正是spark sql进行解析的核心中的核心,当然还有物理计划的生成。 那这些analyzer和optimizer是在哪里被调用呢?
我们举一个例子,DataSet中的filter方法就调用了:

 */   def filter(conditionExpr: String): Dataset[T] = {     filter(Column(sparkSession.sessionState.sqlParser.parseExpression(conditionExpr)))   }

sessionState.sqlParser就是刚才所说的sqlParser:

protected lazy val sqlParser: ParserInterface = {     extensions.buildParser(session, new SparkSqlParser(conf))   }

只有整个逻辑 从sql解析到使用元数据的数据链路,我们就能大致知道怎么一回事了。

delta的DeltaCatalog

我们回过头来看看,delta的DeltaCatalog是怎么和spark 3.x进行结合的 ,上源码DeltaCatalog:

class DeltaCatalog(val spark: SparkSession) extends DelegatingCatalogExtension   with StagingTableCatalog   with SupportsPathIdentifier {   def this() = {     this(SparkSession.active)   }   ...

就如之前所说的DeltaCatalog继承了DelegatingCatalogExtension,从名字可以看出这是一个委托类,那到底是怎么委托的呢以及委托给谁呢?

public abstract class DelegatingCatalogExtension implements CatalogExtension {   private CatalogPlugin delegate;   public final void setDelegateCatalog(CatalogPlugin delegate) {     this.delegate = delegate;   }

该类中有个setDelegateCatalog方法,该方法在CatalogManager中的loadV2SessionCatalog方法中被调用:

private def loadV2SessionCatalog(): CatalogPlugin = {     Catalogs.load(SESSION_CATALOG_NAME, conf) match {       case extension: CatalogExtension =>         extension.setDelegateCatalog(defaultSessionCatalog)         extension       case other => other     }   }

而该方法被v2SessionCatalog调用:

private[sql] def v2SessionCatalog: CatalogPlugin = {     conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).map { customV2SessionCatalog =>       try {         catalogs.getOrElseUpdate(SESSION_CATALOG_NAME, loadV2SessionCatalog())       } catch {         case NonFatal(_) =>           logError(             "Fail to instantiate the custom v2 session catalog: " + customV2SessionCatalog)           defaultSessionCatalog       }     }.getOrElse(defaultSessionCatalog)   }

这个就是返回默认的v2版本的SessionCatalog实例,分析一下这个方法:

   首先得到配置项SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION,也就是spark.sql.catalog.spark_catalog配置,    如果spark配置了的话,就调用loadV2SessionCatalog加载该类,,否则就加载默认的v2SessionCatalog,也就是V2SessionCatalog实例

这里我们就发现了:
delta配置的spark.sql.catalog.spark_catalog为"org.apache.spark.sql.delta.catalog.DeltaCatalog",也就是说,spark中的V2SessionCatalog是DeltaCatalog的实例,而DeltaCatalog的委托给了BaseSessionStateBuilder中的V2SessionCatalog实例。

具体看看DeltaCatalog 的createTable方法,其他的方法类似:

override def createTable(       ident: Identifier,       schema: StructType,       partitions: Array[Transform],       properties: util.Map[String, String]): Table = {     if (DeltaSourceUtils.isDeltaDataSourceName(getProvider(properties))) {       createDeltaTable(         ident, schema, partitions, properties, sourceQuery = None, TableCreationModes.Create)     } else {       super.createTable(ident, schema, partitions, properties)     }   } ... private def createDeltaTable(       ident: Identifier,       schema: StructType,       partitions: Array[Transform],       properties: util.Map[String, String],       sourceQuery: Option[LogicalPlan],       operation: TableCreationModes.CreationMode): Table = {      ...     val tableDesc = new CatalogTable(       identifier = TableIdentifier(ident.name(), ident.namespace().lastOption),       tableType = tableType,       storage = storage,       schema = schema,       provider = Some("delta"),       partitionColumnNames = partitionColumns,       bucketSpec = maybeBucketSpec,       properties = tableProperties.toMap,       comment = Option(properties.get("comment")))     // END: copy-paste from the super method finished.     val withDb = verifyTableAndSolidify(tableDesc, None)     ParquetSchemaConverter.checkFieldNames(tableDesc.schema.fieldNames)     CreateDeltaTableCommand(       withDb,       getExistingTableIfExists(tableDesc),       operation.mode,       sourceQuery,       operation,       tableByPath = isByPath).run(spark)     loadTable(ident)       }  override def loadTable(ident: Identifier): Table = {     try {       super.loadTable(ident) match {         case v1: V1Table if DeltaTableUtils.isDeltaTable(v1.catalogTable) =>           DeltaTableV2(             spark,             new Path(v1.catalogTable.location),             catalogTable = Some(v1.catalogTable),             tableIdentifier = Some(ident.toString))         case o => o       }        }
  • 判断是否是delta数据源,如果是的话,跳到createDeltaTable方法,否则直接调用super.createTable方法,

  • createDeltaTable先会进行delta特有的CreateDeltaTableCommand.run()命令写入delta数据,之后载loadTable

  • loadTable则会调用super的loadTable,而方法会调用V2SessionCatalog的loadTable,而V2SessionCatalog最终会调用v1版本sessionCatalog的getTableMetadata方法,从而组成V1Table(catalogTable)返回,这样就把delta的元数据信息持久化到了v1 SessionCatalog管理的元数据库中

  • 如果不是delta数据源,则调用super.createTable方法,该方法调用V2SessionCatalog的createTable,而最终还是调用v1版本sessionCatalog的createTable方法

我们这里重点分析了delta数据源到元数据的存储,非delta数据源的代码就没有粘贴过来,有兴趣的自己可以编译源码跟踪一下

我们还得提一下spark.sql.defaultCatalog的默认配置为spark_catalog,也就是sql的默认catalog为spark_catalog,对应到delta的话就是DeltaCatalog。

以上是“spark 3.0.1集成delta 0.7.0之delta如何进行DDL操作”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注亿速云行业资讯频道!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI