opensource.google.com

Menu
Showing posts with label BigQuery. Show all posts
Showing posts with label BigQuery. Show all posts

Apache Iceberg 1.10: Maturing the V3 spec, the REST API and Google contributions

Wednesday, September 24, 2025

Apache Iceberg 1.10: Maturing the V3 spec, the REST API and Google contributions

by Talat Uyarer, BigQuery Core

The Apache Iceberg 1.10.0 release just dropped. I've been scrolling through the release notes and community analysis, and it's a dense, significant release. You can (and should) read the full release notes, but I want to pull out the "gradients" I see—the directions the community is pushing that signal what's next for the data lakehouse.

Next-Gen Engines Have Arrived

Let's jump straight to the headline news: next-generation engine support. Version 1.10.0 delivers deep, native optimizations for both Apache Spark and Apache Flink, ensuring Iceberg is ready for the future.

For Apache Spark users, the biggest news is full compatibility with the forthcoming Spark 4.0. The release also gets much smarter about table maintenance. The compute_partition_stats procedure now supports incremental refresh, eliminating wasteful recalculations by reusing existing stats and saving massive amounts of compute. For streaming, a critical fix for Spark Structured Streaming converts the maxRecordPerMicrobatch limit to a "soft cap," resolving a common production issue where a single large file could stall an entire data stream.

Apache Flink users get an equally massive upgrade with full Flink 2.0 support. This is accompanied by a new dynamic sink, which is a huge quality-of-life improvement. This feature dramatically streamlines streaming ingestion by automatically handling schema evolution from the input stream and propagating those changes directly to the Iceberg table. It even supports "fan-out" capabilities, letting it create new tables on the fly as new data types appear in the stream, removing a huge layer of operational friction.

Hardening the Core for Speed and Stability

Beyond the big engine updates, 1.10.0 is all about hardening the core for stability and speed. A key part of this is the growing adoption of Deletion Vectors. This V3 feature is now ready for prime time and radically improves the performance of row-level updates by avoiding costly read-modify-write operations.

Speaking of core logic, the compaction code for Spark and Flink has been refactored to share the same underlying logic. This is a fantastic sign of health—it means less duplicated effort, fewer divergent bugs, and a more stable core for everyone, regardless of your engine.

With deletion vectors leading the charge, the rest of the V3 spec is also moving from "on the horizon" to "ready to use." The spec itself is now officially "closed," and we're seeing its most powerful features land, like row lineage for fine-grained traceability and the new variant type for flexibly handling semi-structured data.

The REST Catalog is Ready for Prime Time

For me, the most significant strategic shift in this release is the battle-hardening of the REST Catalog. For years, the de facto standard was the clunky, monolithic Hive Metastore. The REST Catalog spec is the future — a simple, open HTTP protocol that finally decouples compute from metadata.

The 1.10.0 notes are full of REST improvements, but one is critical: a fix for the client that prevents retrying commits after 5xx server errors. This sounds boring, but it's not. When a commit call fails, it's impossible for the client to know if the mutation actually committed or not before the error. Retrying in that ambiguous state could lead to conflicting operations and potential table corruption. This fix is about making the REST standard stable enough for mission-critical production.

Google Cloud and the Open Lakehouse

This industry-wide standardization on a stable REST API is foundational to Google Cloud's BigLake strategy and where our new contributions come in. We're thrilled to have contributed two key features to the 1.10.0 release.

The first is native BigQuery Metastore Catalog support. This isn't just another Hive-compatible API; it's a native implementation that allows you to use the battle-tested, serverless, and globally-replicated BigQuery metadata service as your Iceberg catalog.

The second contribution is the new Google AuthManager. This plugs directly into the REST Catalog ecosystem, allowing Iceberg to authenticate using standard Google credentials. You can now point your open source Spark job (running on GKE, Dataproc, or anywhere) directly at your BigLake-managed tables via the open REST protocol, using standard Google auth.

This is the whole philosophy behind our BigLake REST Catalog. It's our fully-managed, open-standard implementation of the Iceberg REST protocol. This means you get a single source of truth, managing all your Iceberg tables with BigQuery's governance, fine-grained security, and metadata. It also means true interoperability, letting you use BigQuery to analyze your data, or open source Spark, Flink, and Trino to access the exact same tables via the open REST API. And critically, it means no lock-in—you're just talking to an open standard.

You can read more about our managed BigLake REST Catalog service here.

Producer java library for Data Lineage is now open source

Tuesday, January 28, 2025

Integrating OpenLineage producers with GCP Lineage just got a lot easier


What is Data Lineage

Data Lineage is a GCP feature that allows tracking data movement. This tool helps data owners and analysts detect anomalies in data flows, find connections between data sources and verify the potential consequences of planned changes in data pipelines.

Lineage is injected automatically for some Google Cloud products (BigQuery, Cloud Data Fusion, Cloud Composer, Dataproc, Vertex AI). That means, if Lineage integration with any of those products is enabled in the projects, data movements coming from executing jobs by these products will be reported to GCP Lineage.

For custom integrations, the API can be used to report and fetch lineage.

After injecting, lineage can be viewed in the Google Cloud console (available from DataCatalog UI, BigQuery UI, Vertex UI). There are two representations: graph view, with data sources as nodes and data movements as edges, and list view, a tabular representation. Lineage information can also be fetched from the API.

More information is available in the documentation.


GCP Lineage information model

We describe data flows using the following concepts:

  • Process is a definition of some data transformation. For example, a SQL or Spark script.
  • Run is an execution of a Process.
  • Lineage Event is a data transformation event. It is reported in context of a Run.
  • A Link represents a connection between two data sources, when data in the link’s Target depends on its Source. A Lineage Event contains a list of Links.

OpenLineage support

OpenLineage is an open standard for reporting lineage information. It unifies lineage reporting between systems, which means the events generated in this format can be consumed by any product supporting it. This leads to more flexibility: adding or replacing a lineage producer does not imply changing the consumer, and vice versa.

OpenLineage format is adopted by a number of lineage producers and consumers, meaning there is already tooling available to report lineage from/to those systems. GCP Lineage is one of those consumers: users can report events in OpenLineage format, see the resulting lineage on the UI, and query it via the API.

OpenLineage is the preferred method for reporting lineage in GCP Lineage. It is used by the Dataproc lineage integration. To find out more about sending OpenLineage events to GCP Lineage refer to the documentation.

After injecting lineage in OpenLineage format, it can be accessed in the same way as if it was injected via other API methods or automatically: from the Google Cloud console or the API.


Why producer library

The GCP Lineage producer library is an extension of the client library. Client libraries are recommended for calling Cloud APIs programmatically. They handle low level API call details, leaving the necessary user code simpler and shorter.

The producer library further simplifies integration by providing ready to use code needed to call the API from Java. It adds additional functionality such as synchronous and asynchronous clients, translating OpenLineage JSON messages to the API friendly format, error handling etc.

Using the producer library, all the code needed to send a request to GCP Lineage API is:

SyncLineageProducerClient client = SyncLineageProducerClient.create(); ProcessOpenLineageRunEventRequest request = ProcessOpenLineageRunEventRequest.newBuilder() .setParent(parent) .setOpenLineage(openLineageMessage) .build(); client.processOpenLineageRunEvent(request); 

The field openLineageMessage here is a protobuf Struct that includes information about job execution, inputs and outputs and other metadata. The object model is described in the documentation. An example message is:

{ "eventType": "START", "eventTime": "2023-04-04T13:21:16.098Z", "run": { "runId": "502483d6-3e3d-474f-9380-da565eaa7516", "facets": { "spark_properties": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.22.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet", "properties": { "spark.master": "yarn", "spark.app.name": "sparkJobTest.py" } } } }, "job": { "namespace": "project-name", "name": "cluster-name", "facets": { "jobType": { "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.22.0/integration/spark", "_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet", "processingType": "BATCH", "integration": "SPARK", "jobType": "SQL_JOB" }, } }, "inputs": [ { "namespace": "bigquery", "name": "project.dataset.input_table", }], "outputs": [ { "namespace": "bigquery", "name": "project.dataset.output_table", }], "producer": "https://github.com/OpenLineage/OpenLineage/tree/0.18.0/integration/spark", "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent" } 

Learn more about building an OpenLineage message.


Best Practices for Constructing OpenLineage Messages

The openLineageMessage should follow the OpenLineage format. The fields that are required for correct parsing by the GCP Lineage API are:

job

mapped to Process

job.namespace

used to construct Process name

job.name

used to construct Process name

run

mapped to Run

run.runId

used to construct Run name

producer

URI identifying the producer of this metadata

eventTime

time of the data movement

schemaURL

URL pointing to the schema definition for this message

In addition to those, the fields used to create lineage are:

eventType

corresponds to the status of the Run

inputs

mapped to sources of links. Must be specified according to the naming conventions

outputs

mapped to targets of links. Must be specified according to the naming conventions

The GCP Lineage API supports OpenLineage major versions 1 and 2. For more information please refer to the documentation.


How to access GCP Lineage?

The code is now publicly available on GitHub. The library is also published to Maven.


GcpLineageTransport

To simplify integration with GCP Lineage, we offer GcpLineageTransport. It is available on the OpenLineage GitHub repository and is built to a separate maven artifact. It is built on top of the producer library mentioned above.

Using the transport minimises the code for sending events to GCP Lineage. The GcpLineageTransport can be configured as the event sink for any existing OpenLineage producer such as Airflow, Spark, and Flink. Find more information and examples on GCP Lineage.

By Mary Idamkina – Data Lineage

Fluent Bit WriteAPI Connector: Lowering the barrier to streaming data

Wednesday, August 21, 2024

Automating ingestion processes is crucial for modern businesses that handle vast amounts of data daily. In today's fast-paced digital landscape, the ability to seamlessly collect, process, and analyze data can make the difference between staying ahead of the competition and falling behind. To simplify ingestion, tools such as Fluent Bit enable customers to route data between pluggable sources and sinks without needing to write a single line of code. Instead, data routing is managed via a config file. The Fluent Bit WriteAPI Connector is a pluggable sink built on top of the BigQuery Storage Write API that enables organizations to rapidly develop a data ingestion pipeline.


What are the BigQuery Storage Write API and Fluent Bit?

The BigQuery Storage Write API is a high-performance data-ingestion API for BigQuery. It leverages both batching and streaming methods to ingest records into BigQuery in real-time. The WriteAPI offers features such as ability to scale and provides exactly-once delivery to guarantee that data is not duplicated. Using the Write API directly typically requires technical expertise, as users must navigate one of the client SDKs. This can create a high barrier to entry for some customers to stream data into BigQuery.

Fluent Bit is a widely-used open-source observability agent known for its lightweight design, speed, and flexibility. It operates by collecting logs, traces and metrics through various inputs such as local or network files, filtering and buffering them, and then routing them to designated outputs. Fluent Bit's high-performance parsing capabilities allow for data to be processed according to user specifications. The output component is a configurable plugin that directs data to different destinations, such as various tables in BigQuery. There can be multiple WriteAPI outputs and each output can be independently configured to use a specific write mode, enabling seamless data streaming into BigQuery based on tag/match pairs.


Why Use the Fluent Bit WriteAPI Connector?

Our solution to the technical challenges posed by using the WriteAPI is the Fluent Bit WriteAPI Connector. This connector automates the data ingestion process, eliminating the need for customers to write any code. The entire pipeline is managed through a single configuration file, making it easy to use. The flow of data is depicted in the diagram below.

Fluent Bit Flow Diagram

Example Use Case

Say we wish to monitor a log file containing JSON data, and we would like to ingest this data into a BigQuery table that has a single column titled “Text” of type String. A line from the log file looks like this:

{"Text": "Hello, World"}

Setup Process

    1. Setting Up Fluent Bit: The first step is to install and configure Fluent Bit. Once installed, Fluent Bit must be configured to collect data from your desired sources. This involves defining inputs, such as log files or system metrics, that Fluent Bit will monitor. This is explained below.
    2. Cloning the Google Git Repository: Next, clone the Google Git Repository that contains the Fluent Bit WriteAPI Connector. This repository includes all the necessary files to set up the connector, along with an example configuration file to help you get started. Let’s say the git repo is cloned at /usr/local/fluentbit-bigquery-writeapi-sink. Edit the file in the git repo named plugins.conf to provide the full path to the writeapi plugin. For example, the contents of the file can now look like this: 
    [PLUGINS]
      Path    /usr/local/fluentbit-bigquery-writeapi-sink/out_writeapi.so 
    3. Setting Up BigQuery Tables: Ensure that your BigQuery tables are set up and ready to receive data. This might involve creating new tables or configuring existing ones to match the data schema you intend to use. For example, create the BigQuery table with a schema containing the column Text of type STRING. Let’s say the table is created at myProject.myDataset.myTable.
Destination table schema
click to enlarge

    4. Prepare the input file: We will be reading data from a log file at /usr/local/logfile.log. Let’s start with an empty log file. Create the log file as follows: 
    touch /usr/local/logfile.log
    5. Configuring the Plugin: The most critical step is setting up the configuration file for the Fluent Bit WriteAPI Connector. This singular file controls the entire data pipeline, from input collection to data filtering and routing. The configuration file is straightforward and highly intuitive. It allows you to define various parameters, such as input sources, data filters, and output destinations. Create a configuration file in, say /usr/local, and call it demo.conf. See details on how to format a configuration file. It looks like this:
      Sample Config File

This routes the data from /usr/local/logfile.log to the BigQuery table at myProject.myDataset.myTable. There are additional configurable fields that control the stream, such as chunking, asynchronous response queue, and also the type of stream. These fields let you control how your data is streamed.

To run the pipeline, use the command:

fluent-bit -c /usr/local/demo.conf

As the log file is updated new lines will automatically appear in the BigQuery table. For example, to populate the log file you can run the following command:

echo "{\"Text\": \"Hello, world\"}" >> /usr/local/logfile.log

Note that the default flush interval in Fluent Bit is 1 minute, so it might take a minute before the log file is flushed. The BigQuery table will now be updated as follows:

Populated BigQuery table
click to enlarge

Key Features

The connector supports a wide variety of features including multi-instancing, dynamic scaling, exactly-once delivery, and automatic retry.

    1. Multi-Instancing

    • The multi-instancing feature of the Fluent Bit WriteAPI Connector is designed to offer flexibility in routing data. Specifically, users can configure the connector to handle multiple data inputs and outputs in various combinations. This feature also supports more complex configurations, such as multiple inputs feeding into multiple outputs, allowing data to be aggregated or distributed as needed. An input connector is labeled with a tag field. In our example, this has value log1. Data is routed to an output connector based on the value of its match field. In our example, this also has value log1, meaning there is a 1-to-1 correspondence between the input and output connector. The match field is a regex so it can be used to connect with multiple inputs. For example, if this was set to * then data from all inputs would flow to this output.

    2. Dynamic Scaling

    • Handling large volumes of data efficiently is crucial for modern pipelines. The dynamic scaling feature addresses the issue of potential overloads in the Write API. As data is streamed into BigQuery, there may be times when the API queue becomes full—by default, it can hold up to 1000 pending responses. When this limit is reached, no new data can be appended until some of the pending responses are processed, which can create back pressure in the system. To manage this, the connector automatically scales up its capacity by creating an additional network connection when it detects that the number of pending responses has reached the threshold.

    3. Exactly-Once

    • The "exactly-once" feature ensures that each piece of data is sent and recorded in BigQuery exactly once. This feature ensures no data is duplicated. If the connector encounters an intermittent issue while sending a specific piece of data, it will synchronously retry sending it until it is successful. This ensures data is delivered correctly.

    4. Retry Functionality

    • The retry functionality allows the connector to handle temporary failures gracefully. The retry mechanism is configurable, meaning users can set how many times the system should attempt to resend the data before giving up. By default, the connector will retry sending failed data up to four times. In the default stream mode, if a row of data fails to send, it is retried while other rows continue to be processed. However, in the "exactly once" mode, the retry process is synchronous, meaning the system will wait for the failed row to be successfully sent before moving on to subsequent rows.

    5. Error Handling

    • Error handling in the connector is designed to catch and manage issues that may arise during data transmission. The connector will continue processing incoming data even if earlier data had a failure. Any permanent issues that are encountered are logged to the console.

Conclusion

The ability to efficiently collect, process, and analyze data is a critical factor for business success. The Fluent Bit WriteAPI Connector stands out as a powerful solution that simplifies and automates the data ingestion process, bridging the gap between Fluent Bit's versatile data collection capabilities and Google BigQuery's robust analytics platform.

By eliminating the need for complex coding and manual data management, the Fluent Bit WriteAPI Connector lowers the barrier to entry for businesses of all sizes. Whether you're a small startup or a large enterprise, this tool allows you to effortlessly set up and manage your data pipelines with a single configuration file. Its features like multi-instancing, dynamic scaling, exactly-once delivery, and error handling ensure that your data is ingested accurately, reliably, and in real-time.

The straightforward setup process, combined with the flexibility and scalability of the connector, make it a valuable asset for any organization looking to harness the power of their data. By automating the ingestion process, businesses can focus on what truly matters: deriving actionable insights from their data to drive growth and innovation.

By Tanishqa Puhan, BigQuery WriteAPI

Logica: organizing your data queries, making them universally reusable and fun

Monday, April 12, 2021

We present Logica, a novel open source Logic Programming language. A successor to Yedalog (a language developed at Google earlier) it is a Datalog-like logic programming language. Logica code compiles to SQL and runs on Google BigQuery (with experimental support for PostgreSQL and SQLite), but it is much more concise and supports the clean and reusable abstraction mechanisms that SQL lacks. It supports modules and imports, it can be used from an interactive Python notebook and it even makes testing your queries natural and easy.

“Data is the new oil”, they say, and SQL is so far the lingua franca for working with data. When SQL (or “Structured English Query Language”, as it was first named) was invented in the 1970s, its authors might not have imagined the popularity that it would reach half a century later. Today, systems ranging from tiny smart watch applications to enterprise IT solutions, read and write their data using SQL. Even the browser that you are using to read this post now might have a working built-in SQL database in it.

Despite the widespread adoption, SQL is not flawless. Constructing statements from long chains of English words (which are often capitalized to keep the old-fashioned COBOL spirit of the 70s alive!) can be very verbose—a single query spanning hundreds of lines is a routine occurrence. The main flaw of SQL, however, lies in its very limited support for abstraction.

Good programming is about creating small, understandable, reusable pieces of logic that can be tested, given names, and organized into packages which can later be used to construct more useful pieces of logic. SQL resists this workflow. Although you can encapsulate certain repeated computations into views and functions, the syntax and support for these can vary among implementations, the notions of packages and imports are generally nonexistent, and higher-level constructions (e.g. passing a function to a function) are impossible.

This inherent resistance to decomposition of logic into bite-sized pieces is what leads into the contrived, lengthy queries, the copy-pasted chunks of code and, eventually, unmaintainable, unstructured (note the irony) SQL codebases. To make things worse, SQL code is rarely tested, because “testing SQL queries” sounds rather esoteric to most engineers, at best. Because of that, a number of alternative query languages and libraries have been developed. Of those, systems based on logic programming perhaps come the closest to addressing SQL’s limitations.

Logic programming languages solve problems of SQL by using syntax of mathematical propositional logic rather than natural English language. The language of formal logic was designed by mathematicians specifically to make expression of complex statements easier and suits this purpose much better than natural language. Logica extends classical Logic programming syntax further, most notably with aggregation, hence the name, which stands for

Logica = Logic + Aggregation.

Let us see how it all works. SQL operates with relations, which are sets of rows. In logic programming the analog of a relation is a predicate. While a predicate is a set of rows, we think of it as a logical condition, which describes the rows of a relation. Here is, for example, the definition of a simple predicate:

MagicNumber(x: 2);

MagicNumber(x: 3);

MagicNumber(x: 5);

The definition claims that the condition MagicNumber(x) must hold when X is precisely either 2, 3, or 5. That means, if we were to query this predicate (i.e. request all values of X that satisfy it), the output should be a “relation” with a single column X and rows 2, 3, and 5. The SQL equivalent would be:

SELECT 2 AS x

UNION ALL

SELECT 3 AS x

UNION ALL

SELECT 5 AS x;

Rather than listing the individual values, we could have defined the predicate by encoding a logical condition upon X as follows:

MagicNumber(x:) :-

  x in [2, 3, 5];

Now, here is where the magic starts. Firstly, any table in your database is itself already a predicate, so the following definition:

MagicComment(comment_text:) :-

 `comments`(user_id:, comment_text:),

 user_id == 5;

Defines a predicate MagicComment, which includes precisely those comment_text values, which are present in the comments table where user_id == 5. In SQL this would read:

SELECT comment_text FROM comments WHERE user_id = 5;

Observe what happens if we replace the condition “user_id == 5” in our predicate with MagicNumber(x: user_id):

MagicComment(comment_text:) :-

 `comments`(user_id:, comment_text:),

 MagicNumber(x: user_id);

Here, we are querying for comments of users whose ID is one of the “magic numbers” we just defined above. Note how easily we could reuse a previously defined piece of code without having to copy anything around. We could now even extract the MagicNumber to a common module and import it in wherever it is needed:

import my_project.magic.MagicNumber;

As a final example, let us mock the comments table, in a unittest of a query.

import my_project.magic.MagicComment;


MockComments(user_id: 1, comment_text: "Hello");

MockComments(user_id: 2, comment_text: "Logic");

MockComments(user_id: 3, comment_text: "Programming");


MagicCommentTest := MagicComment(`comments`: MockComments);

If we query the MagicComment predicate here, it will not try to read the comments table in the database. Instead, it will use the predicate we just defined, thus letting us verify its correctness by testing the output (it must include two rows “Logic” and “Programming”). Observe how natural and frictionless many of the good programming practices become with Logica, and compare that to what you would have to do to achieve the same using bare SQL.

There is much more to Logica, so make sure you give it a try—chances are, you will love it! Start with this tutorial to learn Logica. Even if you do not end up using it in your next project, learning a new powerful language may open your mind to new ideas and perspectives on data processing and computing in general.

The simple examples above are only a small sample of how concise Logica code can be over SQL for complex queries. In particular, we did not even touch the topic of aggregations in this article. For all of this see examples section of the Logica open source repository.

We also hope that some of the readers consider contributing to Logica development. That’s what open source is all about!

By Konstantin Tretyakov and Evgeny Skvortsov – Logica Open Source Project
.