Introducing the R2DBC connector Reactive Programming with MariaDB
Who is this guy? Rob Hedgpeth Developer Evangelist robh@mariadb.com @probablyrealrob rhedgpeth
Agenda ● Reactive Programming ○ What is it? ○ Why is it important? ● R2DBC ○ What is it? ○ How can you use it? ● Demo Non-blocking Database Euphoria
Reactive Programming
What is it?
@mariadb reactive programming /rēˈaktiv/ /ˈprōˌgramiNG/ A declarative programming paradigm concerned with data streams and the propagation of change.
Data Streams A B Stream of Data C @mariadb
What is a stream? @mariadb Variables changes in data Starting event initiates stream of data Error disruption in data Completion all data is processed Time
Reactive Streams @mariadb ● An initiative started in 2013 by several companies (e.g. Netflix, Pivotal, Lightbend and many others) ● A specification ○ API types ○ Technology Compatibility Kit (TCK) ● The goal was to provide a standard for asynchronous stream processing with non-blocking back pressure.
Back Pressure @mariadb Data
Back Pressure @mariadb A B Push stream of N data items Request N data items Publisher Subscriber
Back Pressure @mariadb Publisher Subscriber Request (10) Send Request (1) Send Send Request (3)
Reactive Streams API @mariadb Publisher Subscriber Subscription onSubscribe request(n) cancel() onNext* (onError | onComplete)
Reactive Streams API @mariadb public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); } public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); } public interface Subscription { public void request(long n); public void cancel(); } public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
Why is it important?
Reactive Programming @mariadb ● The next frontier in Java (and the JVM) for high-efficiency applications ● Fundamentally non-blocking ○ Often paired with asynchronous behaviors ○ ...but is completely agnostic to sync/async ● The key takeaway - back pressure Async != Reactive
JDBC Limitations @mariadb Reactive App
R2DBC
What is it?
Reactive Relational Database Connectivity R2 DB C
Design principles* @mariadb 1. Be completely non-blocking, all the way to the database 2. Utilize Reactive Streams Types and Patterns 3. Provide a minimal set of operations that are implementation specific 4. Enable “humane” APIs to be built on top of the driver * Ben Hale, Creator of R2DBC
Goals @mariadb ✓ Fit seamlessly into Reactive JVM platforms ✓ Offer vendor-neutral access to standard features ✓ Embrace vendor-specific features ✓ Keep the focus on SQL ✓ Keep it simple ○ Provide a foundation for tools and higher-level API’s ○ Compliance should be unambiguous and easy to identify
Reactive App
Why a service-provider interface (SPI)? @mariadb ● One of JDBC’s biggest failings was that the same API had to serve as both humane API for users as well as an inhumane API for alternative clients like JPA, Jdbi, etc. ○ API that users didn’t like using ○ Driver authors didn’t like implementing ● It also lead to drivers duplicating effort ○ ? binding ○ URL parsing
URL Parsing @mariadb r2dbc:a-driver:pipes://localhost:3306/my_database?locale=en_US scheme driver protocol authority path query
R2DBC SPI Compliance @mariadb io.r2dbc.spi.ConnectionFactory io.r2dbc.spi.ConnectionFactoryMetadata io.r2dbc.spi.ConnectionFactoryProvider io.r2dbc.spi.Result io.r2dbc.spi.Row io.r2dbc.spi.RowMetadata io.r2dbc.spi.Batch Fully Support Partially Support io.r2dbc.spi.Connection io.r2dbc.spi.Statement io.r2dbc.spi.ColumnMetaData https://github.com/r2dbc/r2dbc-spi io.r2dbc.spi.ConnectionFactory io.r2dbc.spi.ConnectionFactoryMetadata io.r2dbc.spi.ConnectionFactoryProvider io.r2dbc.spi.Result io.r2dbc.spi.Row io.r2dbc.spi.RowMetadata io.r2dbc.spi.Batch io.r2dbc.spi.Connection io.r2dbc.spi.Statement io.r2dbc.spi.ColumnMetaData
SPI - Connection Factory @mariadb package io.r2dbc.spi; import org.reactivestreams.Publisher; public interface ConnectionFactory { Publisher<? extends Connection> create(); ConnectionFactoryMetadata getMetadata(); }
SPI - Connection @mariadb package io.r2dbc.spi; import org.reactivestreams.Publisher; public interface Connection { Publisher<Void> beginTransaction(); Publisher<Void> close(); Publisher<Void> commitTransaction(); Batch createBatch(); Statement createStatement(String sql); ConnectionMetadata getMetadata(); ... } Lots of other methods
SPI - Statement @mariadb package io.r2dbc.spi; import org.reactivestreams.Publisher; public interface Statement { Publisher<? extends Result> execute(); Statement add(); Statement bind(int index, Object value); Statement bind(String name, Object value); Statement bindNull(int index, Class<?> type); Statement bindNull(String name, Class<?> type); }
SPI - Result @mariadb package io.r2dbc.spi; import org.reactivestreams.Publisher; import java.util.function.BiFunction; public interface Result { Publisher<Integer> getRowsUpdated(); <T> Publisher<T> map(BiFunction<Row, RowMetadata, ? extends T> mappingFunction); }
SPI - Row @mariadb package io.r2dbc.spi; import java.util.function.BiFunction; public interface Row { @Nullable <T> T get(int index, Class<T> type); @Nullable <T> T get(String name, Class<T> type); ... } A couple default methods
How can you use it?
Establishing a connection @mariadb // Configuration the Connection config = MariadbConnectionConfiguration.builder() .host(props.getProperty("host")) .port(Integer.parseInt(props.getProperty("port"))) .username(props.getProperty("username")) .password(props.getProperty("password")) .build();
Establishing a connection @mariadb // Instantiate Connection Factory connFactory = new MariadbConnectionFactory(config); // Instantiate Connection conn = connFactory.create().block();
SELECT @mariadb Statement select = conn.createStatement("select * from todo.tasks"); Flux<Task> = Flux.from(select.execute()) .flatMap( res -> res.map( (row, metadata) -> { int id = row.get(0, Integer.class); String description = row.get(1, String.class); Boolean completed = row.get(2, Boolean.class); return new Task(id,description,completed); }) );
INSERT (Prepared) @mariadb Statement insert = conn.createStatement("insert into tasks (description) values (?)"); insert.bind(0, task.getDescription()); Mono.from(insert.execute()).subscribe();
But... @mariadb
Demo A hands on look at the new MariaDB R2DBC Connector
@mariadb
Roadmap @mariadb ● Performance ● Fast batch using MariaDB bulk (processing) ● GeoJSON data type ● Pluggable types for MariaDB 10.5 (JSON, INET4, INET6, BOOLEAN, ...)
https://mariadb.com/products/skysql/get-started/ $500 credit to get started @mariadb
developers@mariadb.com @mariadb mariadb-corporation Thank you! https://github.com/mariadb-corporation/mariadb-connector-r2dbc https://github.com/mariadb-corporation/developer-examples Open Source Developer Examples
Introducing the R2DBC async Java connector

Introducing the R2DBC async Java connector