Flink and Hive Integration - Unifying Enterprise Data Processing Systems Bowen Li Committer@Flink, Senior Engineer@Alibaba Flink Forward Europe, Oct 2019
Agenda ● Background ● Motivations and Impacts ● Flink 1.9 - State of Union ● Flink 1.10 - What’s upcoming next? ● Q&A
Background Flink aims at unifying data processing for streaming and batch use cases ● Batch is a special case of Streaming ○ bounded v.s. unbounded data streams ● Unified and simpler tech stack, deployment and operation ● Smaller learning cost for end users ○ developers, data scientists, analysts, etc
Why integrate with Hive? ● Hive is de facto standard for batch processing (ETL, analytics, etc) in enterprises ● Hive is widely adopted with huge user base ● Hive metastore is the center of big data ecosystem ● Hive users want lower latency and near real time data warehouse ● Streaming users usually have Hive deployment and need to access Hive data/metadata
Motivations and Impacts ● Strengthen Flink’s lead in stream processing by enhancing its metadata stack ● Advance Flink’s batch capabilities ● Provide unified solution for stream and batch processings using SQL ● Enrich and extend Flink’s ecosystem ● Promote Flink’s adoption
Platform Level Integration ● Not another “Hive on Xxx” ● The integration is fully in Flink repo ● Released as part of Flink
Flink 1.9 - State of the Union check out more official documentations https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/hive/
Integrate with Hive Metadata We developed brand-new Catalog APIs to ● integrate Flink with Hive Metadata/Metastore ● completely reshape Flink’s metadata stack for both streaming and batch FLIP-30 - Unified Catalog APIs (https://cwiki.apache.org/confluence/display/FLINK/FLIP-30%3A+Unified+Catalog+APIs)
Catalog APIs Catalog ● Meta-Objects ○ Database, Table, View, Partition, Functions, TableStats, and PartitionStats ● Operations ○ Get/List/Exist/Create/Alter/Rename/Drop Namespace of Catalog Table/View/Function ● fully qualified namespace as <catalog_name>.<db_name>.<object_name>
Catalogs are pluggable and opens opportunities for ● Catalog for Hive ● Catalog for Streams and MQ ○ Pulsar Catalog is in review ○ Kafka(Confluent Schema Registry), RabbitMQ, RocketMQ, etc ● Catalog for structured data ○ RDMS like MySQL, etc ● Catalogs for semi-structured data ○ ElasticSearch, HBase, Cassandra, etc Roadmaps of Catalog APIs
CatalogManager ● manage all registered catalogs and resolve objects ● default to current catalog and current database in parsing queries select * from currentCatalog.currentDb.myTable >>>=== can be simplified as ===>>> select * from myTable
Arichtecture Flink Runtime Query processing & optimization Table API and SQL Catalog APIs SQL Client/Zeppelin
Flink 1.9 provides two catalog implementations out of shelf. ● GenericInMemoryCatalog ○ in-memory non-persistent, per session, used by default ● HiveCatalog ○ compatible with multiple Hive versions ○ supports most Hive data types ○ can read/write Hive meta-objects ○ can persist Flink non-hive streaming and batch meta-objects to Hive Metastore ■ e.g. kafka/pulsar tables Catalogs
HiveCatalog Flink-Hive interoperability: Flink can read/write Hive metadata thru HiveCatalog Flink can persist non-hive metadata using Hive Metastore as storage via HiveCatalog
Design
● Officially support Hive 2.3.4 and 1.2.1 ● Rely on Hive’s own compatibility for other 2.x and 1.x Supported Hive Versions
● Supports all Hive UDF interfaces via HiveCatalog ○ UDF ○ GenericUDF ○ UDTF ○ UDAF ○ GenericUDAFResolver Support Hive UDF
● Can read/write non-partitioned Hive tables ● Can read partitioned Hive tables ● Supports partition-pruning ● Supports text, SequenceFile, ORC, Parquet Hive Source and Sink
Example - Table API TableEnvironment tEnv = ... tEnv.registerCatalog(new HiveCatalog("myHive", "/opt/hive-conf/")); tEnv.useCatalog("myHive"); tEnv.useDatabase("myDb"); // Read Hive meta-objects Catalog myHive1 = tEnv.getCatalog("myHive1").get(); myHive1.listDatabases(); myHive1.listTables("myDb"); ObjectPath myTablePath = new ObjectPath("myDb", "myHiveTable"); myHive1.getTable(myTablePath); myHive1.listPartitions(myTablePath); // Query Hive data tEnv.sqlQuery("select * from myHiveTable").print()
SQL Client Example // Register catalogs in sql-cli-defaults.yml
SQL CLI Example (cont’) Flink SQL> SHOW CATALOGS; myhive1 default_catalog Flink SQL> SHOW DATABASES; myDb Flink SQL> USE myhive1.myDb; Flink SQL> SHOW TABLES; myTable Flink SQL> DRESCRIBE myHiveTable; ... Flink SQL> SELECT * FROM myHiveTable; ...
● Integration with Hive was released in Flink 1.9 in Beta ● It lays the foundation for Flink’s integration with Hive ● This initiative led us to vast development and enhancement of Flink’s SQL stack and metadata management capabilities Summary
Flink 1.10 - What’s upcoming next?
Supports all Hive 1.2, 2.0, 2.1, 2.2, 2.3, 3.1 versions ● 1.2.0, 1.2.1 ● 2.0.0, 2.0.1 ● 2.1.0, 2.1.1 ● 2.2.0 ● 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.3.4, 2.3.5, 2.3.6 ● 3.1.0, 3.1.1, 3.1.2 Support More Hive Versions
● HiveTableSource supports ○ projection pushdown ○ reading Hive views (in-progress) ● HiveTableSink supports ○ “INSERT OVERWRITE” ○ inserting into partitions, both dynamic and static Hive Source and Sink Improvements
FLIP-57 Rework FunctionCatalog ● Problems to solve ○ Clarify and complete function categories ■ currently definition of temp functions is ambiguous ○ Enabling referencing functions with qualified names across catalog and database ○ Redefine function resolution order https://cwiki.apache.org/confluence/display/FLINK/FLIP-57%3A+Rework+FunctionCatalog
Similarity: ● volatile, and lifespan within a session Differences: ● Temp System Functions ○ has no namespace, can be referenced anywhere with function name ○ can override system/built-in functions ○ “CREATE TEMPORARY SYSTEM FUNCTION …” ● Temp Catalog Functions ○ has catalog/db namespaces ○ can override catalog functions ○ “CREATE TEMPORARY FUNCTION …” Introducing Temp System Function v.s. Temp Catalog Function
Introducing Ambiguous v.s. Precise Function References Ambiguous function reference Precise function reference (NEW!) with only function name SELECT <func>(col) FROM T with fully or partially qualified name SELECT <cat>.<db>.<func>(col) FROM T SELECT <db>.<func>(col) FROM T enables cross-catalog/db function reference new resolution order: 1. temp system function 2. system(built-in) function 3. temp catalog function in current cat/db 4. catalog function in current cat/db resolution order: 1. temp catalog function 2. catalog function
FLIP-68 Extend Core Table System with Pluggable Modules ● Motivations: ○ Enable users to integrate Flink with cores and built-in objects of other systems ■ Supports Hive built-in functions via pluggable modules ■ To name a few more upcoming - Geo and Machine Learning modules ○ Empower users to write code and do customized developement for Flink table core https://cwiki.apache.org/confluence/display/FLINK/FLIP-68%3A+Extend+Core+Table+Sys tem+with+Modular+Plugins
Design of Pluggable Modules
Full set of DDLs and other commands supported via **Unified** SQL parser ● CREATE/DROP/ALTER/RENAME ○ CATALOG/DATABASE/FUNCTION ○ TABLE ■ CREATE TABLE AS SELECT ... User Facing Points ● SQL Client - already supported a few but not thru SQL parser ● Table APIs - TableEnvironment#sqlUpdate(“...”) and sqlQuery(“...”) https://cwiki.apache.org/confluence/display/FLINK/FLIP+69+-+Flink+SQL+DDL+Enhancement FLIP-69 DDL Enhancement in SQL and Table APIs
Beyond Flink 1.10 ● More Hive SQL compatibilities to minimize migration efforts ● More user custom objects, like serdes, storage handlers ● Performance optimization ● Feature parity with Hive (bucketing, etc) ● Enterprise readiness - security, governess ● Regular maintenance and releases
Xuefu Zhang, Rui Li, Terry Wang, Timo Walter, Dawid Wysakowicz, Kurt Young, Jingsong Lee, Jark Wu, etc Thanks to Other Contributors
Flink’s integrating with Hive ● helps Flink to realize its potential in batch processing ● is a critical step for Flink towards unified data processing ● reshapes Flink’s metadata management capabilities ● enhances Flink’s SQL stack ● brings mass user base and lays the foundation for enterprise adoption Conclusions
Thanks! Twitter: @Bowen__Li

Flink and Hive integration - unifying enterprise data processing systems

  • 1.
    Flink and HiveIntegration - Unifying Enterprise Data Processing Systems Bowen Li Committer@Flink, Senior Engineer@Alibaba Flink Forward Europe, Oct 2019
  • 2.
    Agenda ● Background ● Motivationsand Impacts ● Flink 1.9 - State of Union ● Flink 1.10 - What’s upcoming next? ● Q&A
  • 3.
    Background Flink aims atunifying data processing for streaming and batch use cases ● Batch is a special case of Streaming ○ bounded v.s. unbounded data streams ● Unified and simpler tech stack, deployment and operation ● Smaller learning cost for end users ○ developers, data scientists, analysts, etc
  • 4.
    Why integrate withHive? ● Hive is de facto standard for batch processing (ETL, analytics, etc) in enterprises ● Hive is widely adopted with huge user base ● Hive metastore is the center of big data ecosystem ● Hive users want lower latency and near real time data warehouse ● Streaming users usually have Hive deployment and need to access Hive data/metadata
  • 5.
    Motivations and Impacts ●Strengthen Flink’s lead in stream processing by enhancing its metadata stack ● Advance Flink’s batch capabilities ● Provide unified solution for stream and batch processings using SQL ● Enrich and extend Flink’s ecosystem ● Promote Flink’s adoption
  • 6.
    Platform Level Integration ●Not another “Hive on Xxx” ● The integration is fully in Flink repo ● Released as part of Flink
  • 7.
    Flink 1.9 -State of the Union check out more official documentations https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/hive/
  • 8.
    Integrate with HiveMetadata We developed brand-new Catalog APIs to ● integrate Flink with Hive Metadata/Metastore ● completely reshape Flink’s metadata stack for both streaming and batch FLIP-30 - Unified Catalog APIs (https://cwiki.apache.org/confluence/display/FLINK/FLIP-30%3A+Unified+Catalog+APIs)
  • 9.
    Catalog APIs Catalog ● Meta-Objects ○Database, Table, View, Partition, Functions, TableStats, and PartitionStats ● Operations ○ Get/List/Exist/Create/Alter/Rename/Drop Namespace of Catalog Table/View/Function ● fully qualified namespace as <catalog_name>.<db_name>.<object_name>
  • 10.
    Catalogs are pluggableand opens opportunities for ● Catalog for Hive ● Catalog for Streams and MQ ○ Pulsar Catalog is in review ○ Kafka(Confluent Schema Registry), RabbitMQ, RocketMQ, etc ● Catalog for structured data ○ RDMS like MySQL, etc ● Catalogs for semi-structured data ○ ElasticSearch, HBase, Cassandra, etc Roadmaps of Catalog APIs
  • 11.
    CatalogManager ● manage allregistered catalogs and resolve objects ● default to current catalog and current database in parsing queries select * from currentCatalog.currentDb.myTable >>>=== can be simplified as ===>>> select * from myTable
  • 12.
    Arichtecture Flink Runtime Query processing& optimization Table API and SQL Catalog APIs SQL Client/Zeppelin
  • 13.
    Flink 1.9 providestwo catalog implementations out of shelf. ● GenericInMemoryCatalog ○ in-memory non-persistent, per session, used by default ● HiveCatalog ○ compatible with multiple Hive versions ○ supports most Hive data types ○ can read/write Hive meta-objects ○ can persist Flink non-hive streaming and batch meta-objects to Hive Metastore ■ e.g. kafka/pulsar tables Catalogs
  • 14.
    HiveCatalog Flink-Hive interoperability: Flinkcan read/write Hive metadata thru HiveCatalog Flink can persist non-hive metadata using Hive Metastore as storage via HiveCatalog
  • 15.
  • 16.
    ● Officially supportHive 2.3.4 and 1.2.1 ● Rely on Hive’s own compatibility for other 2.x and 1.x Supported Hive Versions
  • 17.
    ● Supports allHive UDF interfaces via HiveCatalog ○ UDF ○ GenericUDF ○ UDTF ○ UDAF ○ GenericUDAFResolver Support Hive UDF
  • 18.
    ● Can read/writenon-partitioned Hive tables ● Can read partitioned Hive tables ● Supports partition-pruning ● Supports text, SequenceFile, ORC, Parquet Hive Source and Sink
  • 19.
    Example - TableAPI TableEnvironment tEnv = ... tEnv.registerCatalog(new HiveCatalog("myHive", "/opt/hive-conf/")); tEnv.useCatalog("myHive"); tEnv.useDatabase("myDb"); // Read Hive meta-objects Catalog myHive1 = tEnv.getCatalog("myHive1").get(); myHive1.listDatabases(); myHive1.listTables("myDb"); ObjectPath myTablePath = new ObjectPath("myDb", "myHiveTable"); myHive1.getTable(myTablePath); myHive1.listPartitions(myTablePath); // Query Hive data tEnv.sqlQuery("select * from myHiveTable").print()
  • 20.
    SQL Client Example //Register catalogs in sql-cli-defaults.yml
  • 21.
    SQL CLI Example(cont’) Flink SQL> SHOW CATALOGS; myhive1 default_catalog Flink SQL> SHOW DATABASES; myDb Flink SQL> USE myhive1.myDb; Flink SQL> SHOW TABLES; myTable Flink SQL> DRESCRIBE myHiveTable; ... Flink SQL> SELECT * FROM myHiveTable; ...
  • 22.
    ● Integration withHive was released in Flink 1.9 in Beta ● It lays the foundation for Flink’s integration with Hive ● This initiative led us to vast development and enhancement of Flink’s SQL stack and metadata management capabilities Summary
  • 23.
    Flink 1.10 -What’s upcoming next?
  • 24.
    Supports all Hive1.2, 2.0, 2.1, 2.2, 2.3, 3.1 versions ● 1.2.0, 1.2.1 ● 2.0.0, 2.0.1 ● 2.1.0, 2.1.1 ● 2.2.0 ● 2.3.0, 2.3.1, 2.3.2, 2.3.3, 2.3.4, 2.3.5, 2.3.6 ● 3.1.0, 3.1.1, 3.1.2 Support More Hive Versions
  • 25.
    ● HiveTableSource supports ○projection pushdown ○ reading Hive views (in-progress) ● HiveTableSink supports ○ “INSERT OVERWRITE” ○ inserting into partitions, both dynamic and static Hive Source and Sink Improvements
  • 26.
    FLIP-57 Rework FunctionCatalog ●Problems to solve ○ Clarify and complete function categories ■ currently definition of temp functions is ambiguous ○ Enabling referencing functions with qualified names across catalog and database ○ Redefine function resolution order https://cwiki.apache.org/confluence/display/FLINK/FLIP-57%3A+Rework+FunctionCatalog
  • 27.
    Similarity: ● volatile, andlifespan within a session Differences: ● Temp System Functions ○ has no namespace, can be referenced anywhere with function name ○ can override system/built-in functions ○ “CREATE TEMPORARY SYSTEM FUNCTION …” ● Temp Catalog Functions ○ has catalog/db namespaces ○ can override catalog functions ○ “CREATE TEMPORARY FUNCTION …” Introducing Temp System Function v.s. Temp Catalog Function
  • 28.
    Introducing Ambiguous v.s.Precise Function References Ambiguous function reference Precise function reference (NEW!) with only function name SELECT <func>(col) FROM T with fully or partially qualified name SELECT <cat>.<db>.<func>(col) FROM T SELECT <db>.<func>(col) FROM T enables cross-catalog/db function reference new resolution order: 1. temp system function 2. system(built-in) function 3. temp catalog function in current cat/db 4. catalog function in current cat/db resolution order: 1. temp catalog function 2. catalog function
  • 29.
    FLIP-68 Extend CoreTable System with Pluggable Modules ● Motivations: ○ Enable users to integrate Flink with cores and built-in objects of other systems ■ Supports Hive built-in functions via pluggable modules ■ To name a few more upcoming - Geo and Machine Learning modules ○ Empower users to write code and do customized developement for Flink table core https://cwiki.apache.org/confluence/display/FLINK/FLIP-68%3A+Extend+Core+Table+Sys tem+with+Modular+Plugins
  • 30.
  • 31.
    Full set ofDDLs and other commands supported via **Unified** SQL parser ● CREATE/DROP/ALTER/RENAME ○ CATALOG/DATABASE/FUNCTION ○ TABLE ■ CREATE TABLE AS SELECT ... User Facing Points ● SQL Client - already supported a few but not thru SQL parser ● Table APIs - TableEnvironment#sqlUpdate(“...”) and sqlQuery(“...”) https://cwiki.apache.org/confluence/display/FLINK/FLIP+69+-+Flink+SQL+DDL+Enhancement FLIP-69 DDL Enhancement in SQL and Table APIs
  • 32.
    Beyond Flink 1.10 ●More Hive SQL compatibilities to minimize migration efforts ● More user custom objects, like serdes, storage handlers ● Performance optimization ● Feature parity with Hive (bucketing, etc) ● Enterprise readiness - security, governess ● Regular maintenance and releases
  • 33.
    Xuefu Zhang, RuiLi, Terry Wang, Timo Walter, Dawid Wysakowicz, Kurt Young, Jingsong Lee, Jark Wu, etc Thanks to Other Contributors
  • 34.
    Flink’s integrating withHive ● helps Flink to realize its potential in batch processing ● is a critical step for Flink towards unified data processing ● reshapes Flink’s metadata management capabilities ● enhances Flink’s SQL stack ● brings mass user base and lays the foundation for enterprise adoption Conclusions
  • 35.