Multi-Service Reactive Streams using RSocket, Reactor, and Spring Ben Hale, Cloud Foundry Java Experience Lead @nebhale Stephane Maldini, Project Reactor Lead @smaldini
Reactive Programming • Reactive programming is the next frontier in Java for high-efficiency applications • Fundamentally non-blocking and often paired with asynchronous behaviors • Reactive has no opinion on async and many flows are totally synchronous • Key differentiator from "async" is Reactive (pull-push) back pressure
WebFlux and WebClient @GetMapping("/health") Mono<Health> compositeHealth() { return Mono.zip( webClient.get().uri("https://alpha-service/health") .retrieve().bodyToMono(Health.class), webClient.get().uri("https://bravo-service/health") .retrieve().bodyToMono(Health.class)) .map(t -> composite(t.getT1(), t.getT2())); }
Roadblocks • But there are still some barriers to using Reactive everywhere* • Data Access • MongoDB, Apache Cassandra, and Redis • No relational database access • Cross-process back pressure (networking)
Roadblocks • But there are still some barriers to using Reactive everywhere* • Data Access • MongoDB, Apache Cassandra, and Redis • No relational database access • Cross-process back pressure (networking) Until R2DBC ! Check out r2dbc.io • No relational database access
http://rsocket.io
Message Driven Binary Protocol • Requester-Responder interaction is broken down into frames that encapsulate messages • The framing is binary (not human readable like JSON or XML) • Massive efficiencies for machine-to-machine communication • Downsides only manifest rarely and can be mitigated with tooling • Payloads are bags of bytes • Can be JSON or XML (it's all just 1's and 0's)
request/reply request/void (fire&forget) request/stream stream/stream (channel) 4 defined interaction models
request/replyrequest/reply
request/reply request/reply
request/reply request/replyrequest/reply
request/reply request/replyrequest/reply
request/reply request/replyrequest/reply 🔎multiplexed
🔎transport agnostic 
 e.g. Websocket
request/streamrequest/stream
request/streamrequest/stream
request/stream 🔎 🔎 bidirectional
Bi-Directional • Many protocols (notably not TCP) have a distinction between the client and server for the lifetime of a connection • This division means that one side of the connection must initiate all requests, and the other side must initiate all responses • Even more flexible protocols like HTTP/2 do not fully drop the distinction • Servers cannot start an unrequested stream of data to the client • Once a client initiates a connection to a server, both parties can be requestors or responders to a logical stream
2 per-message
 flow-control
2 2 per-message
 flow-control
2 per-message
 flow-control
00 per-message
 flow-control
0 per-message
 flow-control
Reactive Streams Back Pressure • Network protocols generally send a single request, and receive an arbitrarily large response in return • There is nothing to stop the responder (or even the requestor) from sending an arbitrarily large amount of data and overwhelming the receiver • In cases where TCP back pressure throttles the responder, queues fill with large amounts of un-transferred data • Reactive Streams (pull-push) back pressure ensures that data is only materialized and transferred when receiver is ready to process it
20 0
302 0 21
3 2 2 2
3 22
23 2 3
3 2 3 ❌
3 22
3 2 Resumption 22
3 23 3 Resumption
3 33 3 Resumption
Resumption/Resumability • Starting as a client-to-edge-server protocol highlighted a common failing of existing options • Clients on unstable connections would often drop and need to re-establish current state • Led to inefficiencies in both network traffic and data-center compute • Resumability allows both parties in a "logical connection" to identify themselves on reconnection • On Resumption both parties handshake about the last frame received and all missed frames are re-transmitted • Frame caching to support is not defined by spec so it can be very flexible 36
language agnostic
compose with no semantics loss 🔎 🔎 🔎 ws tcp udp
RSocket Protocol TCP WebSocket Aeron/UDPHTTP/2 Protobuf JSON Custom Binary RPC-style Messaging Java JavaScript C++ Kotlin Flow
Using the RSocket API
Java API public interface RSocket { Mono<Payload> requestResponse(Payload payload); Mono<Void> fireAndForget(Payload payload); Flux<Payload> requestStream(Payload payload); Flux<Payload> requestChannel(Flux<Payload> payloads); }
Java API public interface RSocket { Mono<Payload> requestResponse(Payload payload); Mono<Void> fireAndForget(Payload payload); Flux<Payload> requestStream(Payload payload); Flux<Payload> requestChannel(Flux<Payload> payloads); }
Interaction Models – Request-Response Mono<Payload> resp = client.requestResponse(requestPayload) • Standard Request-Response semantics • Likely to represent the majority of requests for the foreseeable future • Even this obvious interaction model surpasses HTTP because it is asynchronous and multiplexed • Request with account number, respond with account balance
Interaction Models – Fire-and-Forget Mono<Void> resp = client.fireAndForget(requestPayload) • An optimization of Request-Response when a response isn't necessary • Significant efficiencies • Networking (no ack) • Client/Server processing (immediate release of resources) • Non-critical event logging
Interaction Models – Request-Stream Flux<Payload> resp = client.requestStream(requestPayload) • Analogous to Request-Response returning a collection • The collection is streamed back instead of queuing until complete • RequestN semantics mean data is not materialized until ready to send • Request with account number, respond with real-time stream of account transactions
Interaction Models – Channel Flux<Payload> out = client.requestChannel(Flux<Payload> in) • A bi-directional stream of messages in both directions • An unstructured channel allows arbitrary interaction models • Request burst of initial state, listen for subsequent updates, client updates subscription without starting new connection • Request with account number, respond with real-time stream of account transactions, update subscription to filter certain transaction types, respond with filtered real-time stream of account transactions
Raw Client RSocket client = RSocketFactory.connect() .transport(TcpClientTransport.create("1.2.3.4", 80)) .start() .block();
Raw Client RSocket client = RSocketFactory.connect() .transport(TcpClientTransport.create("1.2.3.4", 80)) .start() .block(); client.requestResponse(new DefaultPayload(…)) .doOnComplete(() -> System.out.println(“hooray”) .subscribe();
Raw Client RSocket client = RSocketFactory.connect() .transport(TcpClientTransport.create("1.2.3.4", 80)) .start() .block(); client.requestResponse(new DefaultPayload(…)) .doOnComplete(() -> System.out.println(“hooray”) .subscribe(); Censored ByteBuffer creation
Raw Client Too Low Level ?
 Shift to the next gear with existing tech built on RSocket 😱
Building applications with RSocket API • Programming Model Agnostic • The RSocket interface is a serviceable programming model but not great • Designed to be a building block that multiple other programming models could build upon
Building applications with RSocket API • Making things simpler: • RPC-style (protobuf code generation) • Messaging-style (Spring message handlers/controllers)
RPC-style (Contract Driven) service RecordsService { rpc records (RecordsRequest) returns (stream Record) {} }
RPC-style (Contract Driven) service RecordsService { rpc records (RecordsRequest) returns (stream Record) {} } RecordsServiceClient rankingService = new RecordsServiceClient(rsocket); recordsService.records(RecordsRequest.newBuilder() .setMaxResults(16) .build()) .subscribe(record -> System.out.println(record));
RPC-style (Contract Driven) service RecordsService { rpc records (RecordsRequest) returns (stream Record) {} } RecordsServiceClient rankingService = new RecordsServiceClient(rsocket); recordsService.records(RecordsRequest.newBuilder() .setMaxResults(16) .build()) .subscribe(record -> System.out.println(record)); You still need to manage this part
RPC-style (Contract Driven) service RecordsService { rpc records (RecordsRequest) returns (stream Record) {} }
RPC-style (Contract Driven) service RecordsService { rpc records (RecordsRequest) returns (stream Record) {} } let recordServiceClient = new RecordsServiceClient(rsocket); let req = new RecordRequest(); req.setMaxResults(16); recordServiceClient.records(req) .subscribe();
Messaging-Style static class TestHandler implements RSocketHandler { @MessageMapping("/canonical") Mono<String> handleCanonical(String payload) { return Mono.delay(Duration.ofMillis(10)) .map(l -> createResponse(payload)); } @MessageMapping("/async-arg") Mono<String> handleMonoArg(Mono<String> payload) { return payload.map(ServerResponderApp::createResponse); } }
Additional Protocol Features
Metadata and Data in Frames • Each Frame has an optional metadata payload • The metadata payload has a MIME-Type but is otherwise unstructured • Very flexible • Can be used to carry metadata about the data payload • Can be used to carry metadata in order to decode the payload • Generally means that payloads can be heterogenous and each message decoded uniquely
Fragmentation • Payload frames have no maximum size • The protocol is well suited to serving large payloads • Still Images (Facebook), Video (Netflix) • Both TCP MTUs and reliability on slow connections lead towards smaller payloads • Fragmentation provides a way to continue to reason about "logical frames" while ensuring that individual payloads are smaller • Applied transparently, after enforcement of RequestN semantics
Cancellation • All interaction types support cancellation • Cancellation is a signal by the requestor that any inflight processing should be terminated eagerly and aggressively • An obvious requirement for Request-Stream and Channel • But useful even in Request-Response where the response can be expensive to generate • Early termination can lead to significant improvement in efficiency
Leasing • Reactive back pressure ensures that a responder (or either party in a Channel) cannot overwhelm the receiver • This does not prevent a requestor from overwhelming a responder • This commonly happens in server-to-server environments where throughput is high • Leasing enables responders to signal capacity to requestors • This signal is useful for client-side load-balancing • Without preventing server-side load-balancing
RSocket • RSocket is a bi-directional, multiplexed, message-based, binary protocol • Utilizes Reactive Streams back pressure for efficiency and predictability • Provides primitives for the four common interaction models • Flexibility in transport, payload, language, and programming model • Let us know which programming model you prefer! • Myriad other features that make it great for modern application-to- application communication
> Stay Connected. https://projectreactor.io
 http://rsocket.io

Multi-service reactive streams using Spring, Reactor, RSocket

  • 1.
    Multi-Service Reactive Streamsusing RSocket, Reactor, and Spring Ben Hale, Cloud Foundry Java Experience Lead @nebhale Stephane Maldini, Project Reactor Lead @smaldini
  • 2.
    Reactive Programming • Reactiveprogramming is the next frontier in Java for high-efficiency applications • Fundamentally non-blocking and often paired with asynchronous behaviors • Reactive has no opinion on async and many flows are totally synchronous • Key differentiator from "async" is Reactive (pull-push) back pressure
  • 3.
    WebFlux and WebClient @GetMapping("/health") Mono<Health>compositeHealth() { return Mono.zip( webClient.get().uri("https://alpha-service/health") .retrieve().bodyToMono(Health.class), webClient.get().uri("https://bravo-service/health") .retrieve().bodyToMono(Health.class)) .map(t -> composite(t.getT1(), t.getT2())); }
  • 4.
    Roadblocks • But thereare still some barriers to using Reactive everywhere* • Data Access • MongoDB, Apache Cassandra, and Redis • No relational database access • Cross-process back pressure (networking)
  • 5.
    Roadblocks • But thereare still some barriers to using Reactive everywhere* • Data Access • MongoDB, Apache Cassandra, and Redis • No relational database access • Cross-process back pressure (networking) Until R2DBC ! Check out r2dbc.io • No relational database access
  • 6.
  • 7.
    Message Driven BinaryProtocol • Requester-Responder interaction is broken down into frames that encapsulate messages • The framing is binary (not human readable like JSON or XML) • Massive efficiencies for machine-to-machine communication • Downsides only manifest rarely and can be mitigated with tooling • Payloads are bags of bytes • Can be JSON or XML (it's all just 1's and 0's)
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
    Bi-Directional • Many protocols(notably not TCP) have a distinction between the client and server for the lifetime of a connection • This division means that one side of the connection must initiate all requests, and the other side must initiate all responses • Even more flexible protocols like HTTP/2 do not fully drop the distinction • Servers cannot start an unrequested stream of data to the client • Once a client initiates a connection to a server, both parties can be requestors or responders to a logical stream
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
    Reactive Streams BackPressure • Network protocols generally send a single request, and receive an arbitrarily large response in return • There is nothing to stop the responder (or even the requestor) from sending an arbitrarily large amount of data and overwhelming the receiver • In cases where TCP back pressure throttles the responder, queues fill with large amounts of un-transferred data • Reactive Streams (pull-push) back pressure ensures that data is only materialized and transferred when receiver is ready to process it
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
    Resumption/Resumability • Starting asa client-to-edge-server protocol highlighted a common failing of existing options • Clients on unstable connections would often drop and need to re-establish current state • Led to inefficiencies in both network traffic and data-center compute • Resumability allows both parties in a "logical connection" to identify themselves on reconnection • On Resumption both parties handshake about the last frame received and all missed frames are re-transmitted • Frame caching to support is not defined by spec so it can be very flexible 36
  • 38.
  • 39.
    compose with nosemantics loss 🔎 🔎 🔎 ws tcp udp
  • 40.
    RSocket Protocol TCP WebSocketAeron/UDPHTTP/2 Protobuf JSON Custom Binary RPC-style Messaging Java JavaScript C++ Kotlin Flow
  • 41.
  • 42.
    Java API public interfaceRSocket { Mono<Payload> requestResponse(Payload payload); Mono<Void> fireAndForget(Payload payload); Flux<Payload> requestStream(Payload payload); Flux<Payload> requestChannel(Flux<Payload> payloads); }
  • 43.
    Java API public interfaceRSocket { Mono<Payload> requestResponse(Payload payload); Mono<Void> fireAndForget(Payload payload); Flux<Payload> requestStream(Payload payload); Flux<Payload> requestChannel(Flux<Payload> payloads); }
  • 44.
    Interaction Models –Request-Response Mono<Payload> resp = client.requestResponse(requestPayload) • Standard Request-Response semantics • Likely to represent the majority of requests for the foreseeable future • Even this obvious interaction model surpasses HTTP because it is asynchronous and multiplexed • Request with account number, respond with account balance
  • 45.
    Interaction Models –Fire-and-Forget Mono<Void> resp = client.fireAndForget(requestPayload) • An optimization of Request-Response when a response isn't necessary • Significant efficiencies • Networking (no ack) • Client/Server processing (immediate release of resources) • Non-critical event logging
  • 46.
    Interaction Models –Request-Stream Flux<Payload> resp = client.requestStream(requestPayload) • Analogous to Request-Response returning a collection • The collection is streamed back instead of queuing until complete • RequestN semantics mean data is not materialized until ready to send • Request with account number, respond with real-time stream of account transactions
  • 47.
    Interaction Models –Channel Flux<Payload> out = client.requestChannel(Flux<Payload> in) • A bi-directional stream of messages in both directions • An unstructured channel allows arbitrary interaction models • Request burst of initial state, listen for subsequent updates, client updates subscription without starting new connection • Request with account number, respond with real-time stream of account transactions, update subscription to filter certain transaction types, respond with filtered real-time stream of account transactions
  • 48.
    Raw Client RSocket client= RSocketFactory.connect() .transport(TcpClientTransport.create("1.2.3.4", 80)) .start() .block();
  • 49.
    Raw Client RSocket client= RSocketFactory.connect() .transport(TcpClientTransport.create("1.2.3.4", 80)) .start() .block(); client.requestResponse(new DefaultPayload(…)) .doOnComplete(() -> System.out.println(“hooray”) .subscribe();
  • 50.
    Raw Client RSocket client= RSocketFactory.connect() .transport(TcpClientTransport.create("1.2.3.4", 80)) .start() .block(); client.requestResponse(new DefaultPayload(…)) .doOnComplete(() -> System.out.println(“hooray”) .subscribe(); Censored ByteBuffer creation
  • 51.
    Raw Client Too LowLevel ?
 Shift to the next gear with existing tech built on RSocket 😱
  • 52.
    Building applications withRSocket API • Programming Model Agnostic • The RSocket interface is a serviceable programming model but not great • Designed to be a building block that multiple other programming models could build upon
  • 53.
    Building applications withRSocket API • Making things simpler: • RPC-style (protobuf code generation) • Messaging-style (Spring message handlers/controllers)
  • 54.
    RPC-style (Contract Driven) serviceRecordsService { rpc records (RecordsRequest) returns (stream Record) {} }
  • 55.
    RPC-style (Contract Driven) serviceRecordsService { rpc records (RecordsRequest) returns (stream Record) {} } RecordsServiceClient rankingService = new RecordsServiceClient(rsocket); recordsService.records(RecordsRequest.newBuilder() .setMaxResults(16) .build()) .subscribe(record -> System.out.println(record));
  • 56.
    RPC-style (Contract Driven) serviceRecordsService { rpc records (RecordsRequest) returns (stream Record) {} } RecordsServiceClient rankingService = new RecordsServiceClient(rsocket); recordsService.records(RecordsRequest.newBuilder() .setMaxResults(16) .build()) .subscribe(record -> System.out.println(record)); You still need to manage this part
  • 57.
    RPC-style (Contract Driven) serviceRecordsService { rpc records (RecordsRequest) returns (stream Record) {} }
  • 58.
    RPC-style (Contract Driven) serviceRecordsService { rpc records (RecordsRequest) returns (stream Record) {} } let recordServiceClient = new RecordsServiceClient(rsocket); let req = new RecordRequest(); req.setMaxResults(16); recordServiceClient.records(req) .subscribe();
  • 59.
    Messaging-Style static class TestHandlerimplements RSocketHandler { @MessageMapping("/canonical") Mono<String> handleCanonical(String payload) { return Mono.delay(Duration.ofMillis(10)) .map(l -> createResponse(payload)); } @MessageMapping("/async-arg") Mono<String> handleMonoArg(Mono<String> payload) { return payload.map(ServerResponderApp::createResponse); } }
  • 60.
  • 61.
    Metadata and Datain Frames • Each Frame has an optional metadata payload • The metadata payload has a MIME-Type but is otherwise unstructured • Very flexible • Can be used to carry metadata about the data payload • Can be used to carry metadata in order to decode the payload • Generally means that payloads can be heterogenous and each message decoded uniquely
  • 62.
    Fragmentation • Payload frameshave no maximum size • The protocol is well suited to serving large payloads • Still Images (Facebook), Video (Netflix) • Both TCP MTUs and reliability on slow connections lead towards smaller payloads • Fragmentation provides a way to continue to reason about "logical frames" while ensuring that individual payloads are smaller • Applied transparently, after enforcement of RequestN semantics
  • 63.
    Cancellation • All interactiontypes support cancellation • Cancellation is a signal by the requestor that any inflight processing should be terminated eagerly and aggressively • An obvious requirement for Request-Stream and Channel • But useful even in Request-Response where the response can be expensive to generate • Early termination can lead to significant improvement in efficiency
  • 64.
    Leasing • Reactive backpressure ensures that a responder (or either party in a Channel) cannot overwhelm the receiver • This does not prevent a requestor from overwhelming a responder • This commonly happens in server-to-server environments where throughput is high • Leasing enables responders to signal capacity to requestors • This signal is useful for client-side load-balancing • Without preventing server-side load-balancing
  • 65.
    RSocket • RSocket isa bi-directional, multiplexed, message-based, binary protocol • Utilizes Reactive Streams back pressure for efficiency and predictability • Provides primitives for the four common interaction models • Flexibility in transport, payload, language, and programming model • Let us know which programming model you prefer! • Myriad other features that make it great for modern application-to- application communication
  • 66.