Developing I/O connectors for Java
IMPORTANT: Use Splittable DoFn
to develop your new I/O. For more details, read the new I/O connector overview.
To connect to a data store that isn’t supported by Beam’s existing I/O connectors, you must create a custom I/O connector that usually consist of a source and a sink. All Beam sources and sinks are composite transforms; however, the implementation of your custom I/O depends on your use case. Before you start, read the new I/O connector overview for an overview of developing a new I/O connector, the available implementation options, and how to choose the right option for your use case.
This guide covers using the Source
and FileBasedSink
interfaces using Java. The Python SDK offers the same functionality, but uses a slightly different API. See Developing I/O connectors for Python for information specific to the Python SDK.
Basic code requirements
Beam runners use the classes you provide to read and/or write data using multiple worker instances in parallel. As such, the code you provide for Source
and FileBasedSink
subclasses must meet some basic requirements:
Serializability: Your
Source
orFileBasedSink
subclass, whether bounded or unbounded, must be Serializable. A runner might create multiple instances of yourSource
orFileBasedSink
subclass to be sent to multiple remote workers to facilitate reading or writing in parallel.Immutability: Your
Source
orFileBasedSink
subclass must be effectively immutable. All private fields must be declared final, and all private variables of collection type must be effectively immutable. If your class has setter methods, those methods must return an independent copy of the object with the relevant field modified.You should only use mutable state in your
Source
orFileBasedSink
subclass if you are using lazy evaluation of expensive computations that you need to implement the source or sink; in that case, you must declare all mutable instance variables transient.Thread-Safety: Your code must be thread-safe. If you build your source to work with dynamic work rebalancing, it is critical that you make your code thread-safe. The Beam SDK provides a helper class to make this easier. See Using Your BoundedSource with dynamic work rebalancing for more details.
Testability: It is critical to exhaustively unit test all of your
Source
andFileBasedSink
subclasses, especially if you build your classes to work with advanced features such as dynamic work rebalancing. A minor implementation error can lead to data corruption or data loss (such as skipping or duplicating records) that can be hard to detect.To assist in testing
BoundedSource
implementations, you can use the SourceTestUtils class.SourceTestUtils
contains utilities for automatically verifying some of the properties of yourBoundedSource
implementation. You can useSourceTestUtils
to increase your implementation’s test coverage using a wide range of inputs with relatively few lines of code. For examples that useSourceTestUtils
, see the AvroSourceTest and TextIOReadTest source code.
In addition, see the PTransform style guide for Beam’s transform style guidance.
Implementing the Source interface
To create a data source for your pipeline, you must provide the format-specific logic that tells a runner how to read data from your input source, and how to split your data source into multiple parts so that multiple worker instances can read your data in parallel. If you’re creating a data source that reads unbounded data, you must provide additional logic for managing your source’s watermark and optional checkpointing.
Supply the logic for your source by creating the following classes:
A subclass of
BoundedSource
if you want to read a finite (batch) data set, or a subclass ofUnboundedSource
if you want to read an infinite (streaming) data set. These subclasses describe the data you want to read, including the data’s location and parameters (such as how much data to read).A subclass of
Source.Reader
. Each Source must have an associated Reader that captures all the state involved in reading from thatSource
. This can include things like file handles, RPC connections, and other parameters that depend on the specific requirements of the data format you want to read.The
Reader
class hierarchy mirrors the Source hierarchy. If you’re extendingBoundedSource
, you’ll need to provide an associatedBoundedReader
. if you’re extendingUnboundedSource
, you’ll need to provide an associatedUnboundedReader
.One or more user-facing wrapper composite transforms (
PTransform
) that wrap read operations. PTransform wrappers discusses why you should avoid exposing your sources.
Implementing the Source subclass
You must create a subclass of either BoundedSource
or UnboundedSource
, depending on whether your data is a finite batch or an infinite stream. In either case, your Source
subclass must override the abstract methods in the superclass. A runner might call these methods when using your data source. For example, when reading from a bounded source, a runner uses these methods to estimate the size of your data set and to split it up for parallel reading.
Your Source
subclass should also manage basic information about your data source, such as the location. For example, the example Source
implementation in Beam’s DatastoreIO class takes host, datasetID, and query as arguments. The connector uses these values to obtain data from Cloud Datastore.
BoundedSource
BoundedSource
represents a finite data set from which a Beam runner may read, possibly in parallel. BoundedSource
contains a set of abstract methods that the runner uses to split the data set for reading by multiple workers.
To implement a BoundedSource
, your subclass must override the following abstract methods:
split
: The runner uses this method to split your finite data into bundles of a given size.getEstimatedSizeBytes
: The runner uses this method to estimate the total size of your data, in bytes.createReader
: Creates the associatedBoundedReader
for thisBoundedSource
.
You can see a model of how to implement BoundedSource
and the required abstract methods in Beam’s implementations for Cloud BigTable (BigtableIO.java) and BigQuery (BigQuerySourceBase.java).
UnboundedSource
UnboundedSource
represents an infinite data stream from which the runner may read, possibly in parallel. UnboundedSource
contains a set of abstract methods that the runner uses to support streaming reads in parallel; these include checkpointing for failure recovery, record IDs to prevent data duplication, and watermarking for estimating data completeness in downstream parts of your pipeline.
To implement an UnboundedSource
, your subclass must override the following abstract methods:
split
: The runner uses this method to generate a list ofUnboundedSource
objects which represent the number of sub-stream instances from which the service should read in parallel.getCheckpointMarkCoder
: The runner uses this method to obtain the Coder for the checkpoints for your source (if any).requiresDeduping
: The runner uses this method to determine whether the data requires explicit removal of duplicate records. If this method returns true, the runner will automatically insert a step to remove duplicates from your source’s output. This should return true if and only if your source provides record IDs for each record. SeeUnboundedReader.getCurrentRecordId
for when this should be done.createReader
: Creates the associatedUnboundedReader
for thisUnboundedSource
.
Implementing the Reader subclass
You must create a subclass of either BoundedReader
or UnboundedReader
to be returned by your source subclass’s createReader
method. The runner uses the methods in your Reader
(whether bounded or unbounded) to do the actual reading of your dataset.
BoundedReader
and UnboundedReader
have similar basic interfaces, which you’ll need to define. In addition, there are some additional methods unique to UnboundedReader
that you’ll need to implement for working with unbounded data, and an optional method you can implement if you want your BoundedReader
to take advantage of dynamic work rebalancing. There are also minor differences in the semantics for the start()
and advance()
methods when using UnboundedReader
.
Reader methods common to both BoundedReader and UnboundedReader
A runner uses the following methods to read data using BoundedReader
or UnboundedReader
:
start
: Initializes theReader
and advances to the first record to be read. This method is called exactly once when the runner begins reading your data, and is a good place to put expensive operations needed for initialization.advance
: Advances the reader to the next valid record. This method must return false if there is no more input available.BoundedReader
should stop reading once advance returns false, butUnboundedReader
can return true in future calls once more data is available from your stream.getCurrent
: Returns the data record at the current position, last read by start or advance.getCurrentTimestamp
: Returns the timestamp for the current data record. You only need to overridegetCurrentTimestamp
if your source reads data that has intrinsic timestamps. The runner uses this value to set the intrinsic timestamp for each element in the resulting outputPCollection
.
Reader methods unique to UnboundedReader
In addition to the basic Reader
interface, UnboundedReader
has some additional methods for managing reads from an unbounded data source:
getCurrentRecordId
: Returns a unique identifier for the current record. The runner uses these record IDs to filter out duplicate records. If your data has logical IDs present in each record, you can have this method return them; otherwise, you can return a hash of the record contents, using at least a 128-bit hash. It is incorrect to use Java’sObject.hashCode()
, as a 32-bit hash is generally insufficient for preventing collisions, andhasCode()
is not guaranteed to be stable across processes.Implementing
getCurrentRecordId
is optional if your source uses a checkpointing scheme that uniquely identifies each record. For example, if your splits are files and the checkpoints are file positions up to which all data has been read, you do not need record IDs. However, record IDs can still be useful if upstream systems writing data to your source occasionally produce duplicate records that your source might then read.getWatermark
: Returns a watermark that yourReader
provides. The watermark is the approximate lower bound on timestamps of future elements to be read by yourReader
. The runner uses the watermark as an estimate of data completeness. Watermarks are used in windowing and triggers.getCheckpointMark
: The runner uses this method to create a checkpoint in your data stream. The checkpoint represents the progress of theUnboundedReader
, which can be used for failure recovery. Different data streams may use different checkpointing methods; some sources might require received records to be acknowledged, while others might use positional checkpointing. You’ll need to tailor this method to the most appropriate checkpointing scheme. For example, you might have this method return the most recently acked record(s).getCheckpointMark
is optional; you don’t need to implement it if your data does not have meaningful checkpoints. However, if you choose not to implement checkpointing in your source, you may encounter duplicate data or data loss in your pipeline, depending on whether your data source tries to re-send records in case of errors.
You can read a bounded PCollection
from an UnboundedSource
by specifying either .withMaxNumRecords
or .withMaxReadTime
when you read from your source. .withMaxNumRecords
reads a fixed maximum number of records from your unbounded source, while .withMaxReadTime
reads from your unbounded source for a fixed maximum time duration.
Using your BoundedSource with dynamic work rebalancing
If your source provides bounded data, you can have your BoundedReader
work with dynamic work rebalancing by implementing the method splitAtFraction
. The runner may call splitAtFraction
concurrently with start or advance on a given reader so that the remaining data in your Source
can be split and redistributed to other workers.
When you implement splitAtFraction
, your code must produce a mutually-exclusive set of splits where the union of those splits matches the total data set.
If you implement splitAtFraction
, you must implement both splitAtFraction
and getFractionConsumed
in a thread-safe manner, or data loss is possible. You should also unit-test your implementation exhaustively to avoid data duplication or data loss.
To ensure that your code is thread-safe, use the RangeTracker
thread-safe helper object to manage positions in your data source when implementing splitAtFraction
and getFractionConsumed
.
We highly recommended that you unit test your implementations of splitAtFraction
using the SourceTestUtils
class. SourceTestUtils
contains a number of methods for testing your implementation of splitAtFraction
, including exhaustive automatic testing.
Convenience Source and Reader base classes
The Beam SDK contains some convenient abstract base classes to help you create Source
and Reader
classes that work with common data storage formats, like files.
FileBasedSource
If your data source uses files, you can derive your Source
and Reader
classes from the FileBasedSource
and FileBasedReader
abstract base classes. FileBasedSource
is a bounded source subclass that implements code common to Beam sources that interact with files, including:
- File pattern expansion
- Sequential record reading
- Split points
Using the FileBasedSink abstraction
If your data source uses files, you can implement the FileBasedSink
abstraction to create a file-based sink. For other sinks, use ParDo
, GroupByKey
, and other transforms offered by the Beam SDK for Java. See the developing I/O connectors overview for more details.
When using the FileBasedSink
interface, you must provide the format-specific logic that tells the runner how to write bounded data from your pipeline’s PCollection
s to an output sink. The runner writes bundles of data in parallel using multiple workers.
Supply the logic for your file-based sink by implementing the following classes:
A subclass of the abstract base class
FileBasedSink
.FileBasedSink
describes a location or resource that your pipeline can write to in parallel. To avoid exposing your sink to end-users, yourFileBasedSink
subclass should be protected or private.A user-facing wrapper
PTransform
that, as part of the logic, calls WriteFiles and passes yourFileBasedSink
as a parameter. A user should not need to callWriteFiles
directly.
The FileBasedSink
abstract base class implements code that is common to Beam sinks that interact with files, including:
- Setting file headers and footers
- Sequential record writing
- Setting the output MIME type
FileBasedSink
and its subclasses support writing files to any Beam-supported FileSystem
implementations. See the following Beam-provided FileBasedSink
implementations for examples:
PTransform wrappers
When you create a source or sink that end-users will use, avoid exposing your source or sink code. To avoid exposing your sources and sinks to end-users, your new classes should be protected or private. Then, implement a user-facing wrapper PTransform
. By exposing your source or sink as a transform, your implementation is hidden and can be arbitrarily complex or simple. The greatest benefit of not exposing implementation details is that later on, you can add additional functionality without breaking the existing implementation for users.
For example, if your users’ pipelines read from your source using read
and you want to insert a reshard into the pipeline, all users would need to add the reshard themselves (using the GroupByKey
transform). To solve this, we recommended that you expose the source as a composite PTransform
that performs both the read operation and the reshard.
See Beam’s PTransform style guide for additional information about wrapping with a PTransform
.
Last updated on 2025/10/10
Have you found everything you were looking for?
Was it all useful and clear? Is there anything that you would like to change? Let us know!