Going Reactive with Relational Databases Ivaylo Pashov 20 Feb 2020
Reactive 101 What you might have missed so far
Architectural Shift The Reactive Manifesto
Architectural Shift Processor ProcessorPublisher
Internal Synchronous Processing Processor
Internal ImplementationInternal Asynchronous Processing Processor 2 1 3 5 4
Asynchronous Programming is Hard
Reactive Programming Flux.interval(Duration.ofSeconds(1)).take(5) .filter(i -> i % 3 != 0) .map( i -> i * i ) .subscribe(System.out::println); Subset of Asynchronous programming where availability of new information (events) drives the logic forward Declarative
Subscription demand/cancel Reactive Streams SubscriberPublisher subscribe 0 … * events Error Complete publish
Transform Filter ? Aggregate Combine
Reactive Streams - Backpressure Slow Operator Operator Slow Operator Operator BackpressureBackpressure Normal Scenario High Load Scenario
Reactive Streams Landscape Reactive Streams ClientsImpl
Blocking Web Stack HTTP Server Servlet API Web Framework Application Persistence Database Blocking I/O Blocking I/O
Blocking Web Stack HTTP Server Persistence T 1 T 2 T 3 T 4 T 5 T 6 T 7 T 8 T 9 T N … Thread Lifetime 1000 RPS  1 RPMS Database Blocking I/O 35 ms Blocking I/O (queries) 5 ms Server Processing ~45 Threads
Blocking I/O
Non-Blocking Web Stack HTTP Server Servlet API 3.1 Web Framework Application Persistence Tomcat, Jetty, Netty, Undertow Database Non-Blocking I/O Non-Blocking I/O Spring Web Flux, Vert.x, Micronaut, Quarkus Spring Data + Elastic Search, Mongo DB, Cassandra
Backpressure on Insert @PostMapping(path="/cars", consumes = "application/stream+json") public Mono<Void> loadCars(@RequestBody Flux<Car> cars) { return repository.insert(cars).then(); }
Backpressure on Get @GetMapping(path = "/cars", produces = "application/stream+json") public Flux<Car> getCars() { return this.repository.findCars(); }
R2DBC Mind the Gap between Reactive and Relational Bridge
Overview JDBC R2DBC • Non-humane API • Humane API • Non-blocking • Back-pressure • Declarative • Usually asynchronous ✓ ✓ ✓ ✓ ✓ ✓ ❌ ❌ ❌ ❌ ❌ ✓
Stack JDBC R2DBC JDBC API Driver Database Wire Protocol Blocking I/O Connection Pool Client R2DBC SPI Reactive Driver Database Client Wire Protocol Non-Blocking I/O R2DBC Pool
R2DBC – Connections String url = "r2dbc:pool:postgresql://localhost:5432/test"; ConnectionFactory connectionFactory = ConnectionFactories.get(url); ConnectionFactory Connection
R2DBC – Query Flux<String> mails = Mono.from(connectionFactory.create()) .flatMapMany(con -> con.createStatement("SELECT email FROM account") .execute() ) .flatMap(result -> result .map((row, metadata) -> row.get(0, String.class)) ); Statement Connection Result
R2DBC – Transactional Batch Insert Mono<Void> inserted = Mono.from(connectionFactory.create()) .flatMap(con -> Mono.from(con.beginTransaction()) .thenMany( con.createStatement("INSERT INTO TEST VALUES($1)") .bind("$1", "1").add() .bind("$1", "2").add() .bind("$1", "3").execute() ).then() .delayUntil(r -> con.commitTransaction()) .onErrorResume(e -> Mono.from(con.rollbackTransaction()) .then(Mono.error(e)) ) );
R2DBC Adoption Clients • Spring Data R2DBC • R2DBC Client • jOOQ (initial support) Drivers • Google Cloud Spanner • H2 • PostgreSQL • Microsoft SQL Server • MySQL • SAP Hana • Oracle coming soon -> OJDBC with Reactive Extensions
Alternatives • Vert.x Reactive SQL Client • jasync SQL • ADBA (discontinued) • Hibernate RX (experimental, limited features)
Upcoming Features • Stored Procedures • ExtendTransaction Support • Event Provider
Spring Data R2DBC The Humane API
Persistence Stacks – Spring Perspective JDBC API (Blocking) Hibernate/EclipseLink R2DBC SPI (Reactive) Spring Data R2DBC Spring Data JPA Dialect JPA Dialect
JPA Provider Features • Advanced Object Relational Mapping • Schema Generation andValidation • 2 Levels of Caching • Lazy Loading • Deferred Flushing and Dirty Checks • Batching
What Queries Would Be Executed?
Spring Data JDBC Persistence Stacks – Spring Perspective JDBC API (Blocking) R2DBC SPI (Reactive) Spring Data R2DBC Spring Data Relational Spring Data Relational JdbcAggregateTemplate Repositories DatabaseClient Reactive Repositories
Spring Data RDBC Models @Data @Table("movies") public class Movie { @Id private Long id; private String title; private String summary; private LocalDate releasedAt; }
Spring Data RDBC Repositories @Repository public interface MovieRepository extends ReactiveCrudRepository<Movie, Long> { @Query("SELECT * FROM movies WHERE title = :title") Mono<Movie> findByTitle(String title); @Modifying @Query("DELETE FROM movies WHERE title = :title") Mono<Integer> deleteByTitle(String title); }
Spring Data RDBC DatabaseClient private Mono<Movie> findMovie(long movieId) { return databaseClient.execute( "SELECT * FROM movies WHERE id = :movieId") .bind("movieId", movieId) .as(Movie.class) .fetch() .one(); }
How About Relations?
Relations and Entity Graph
Relations – Tables vs. Entities 1 N M 1 ∞1 1 ∞ N M 1 N 1 1 M 1 M 1 N 1 11 1 K L 1 1 ∞ 1 ∞ 1 K L1 1 1
Microservices and Bounded Context
Aggregates
Entities Lifecycle 1 N M 1 ∞1 1 ∞ N M 1 N 1 1 M 1 M 1 N 1 11 1 K L 1 1 ∞ 1 ∞ 1 K L1 1 1
Entities Lifecycle N M 1 M 1 N 1 11 1 K L 1 1 ∞ 1 ∞
Microservices and Bounded Context
Aggregates in Spring Data (JDBC) RDBC @Table("movies") public class Movie { @Id private Long id; … private Plot plot; private Set<Role> cast; } One-to-One  Embedded Object Bounded One-to-Many  Embedded Collection STILL NOT SUPPORTED EXISTING WORKAROUNDS
Aggregates in Spring Data (JDBC) RDBC @Table("movies") public class Movie { @Id private Long id; … private Director director; private Set<Actor> cast; } Many-to-One De-normalize  OR Many-to-Many Soft reference NOT ALLOWED
Reactive Transactions Spring 5.2 Way @Transactional public Mono<Movie> insert(Movie movie) { return repository.save(movie); } private final TransactionalOperator transactionalOperator; public Mono<Movie> insert(Movie movie) { return repository.save(movie) .as(transactionalOperator::transactional); }
Limitations • Derived Query Methods • Embedded Objects • Collections of arbitrary Objects • Versioning and Optimistic Locking • Paging and Sorting Repositories • Batching
N + 1 Query Problem / 1 1 1 1 1 1 1 1 1 2 1 1 1 1 3 1 1 1 1 4 1 1 1 1 5 1 1 1 2 6 1 1 1 2 7 1 1 1 2 8 1 1 1 2 9 Single Query with a Fetch Join: 10 (roots) * 5 * 5 * 5 * 5 = 6250 rows Join in Parallel: 10 + 10 * (5 * 5 * 5 * 5) = 210 rows
Query in Parallel Fetch Actors <<R2DBC>> Fetch Movie <<R2DBC>> Get Rating <<HTTP>> Get Biography <<HTTP>>
Q/A Thank You for Your Attention!

Going Reactive with Relational Databases

Editor's Notes

  • #5 Messaging: ensures loose coupling, isolation and location transparency Resiliency – Each component needs to be designed operates on its own (containment and isolation); delegate failures as messages Responsiveness -> Employ pipelining and introduce concurrency; Streaming allows new information to be received as soon as it becomes available Elastic - consume resources only while active
  • #6 Location Transparency - embrace the network and all its constraints—like partial failure, network splits, dropped messages, and its asynchronous and message-based nature by making them first class in the programming model
  • #27 Vert.x with Reactive Postgres is ranking top in TechEmpower
  • #38 Drupal