Reactive MicroServices using RSocket New reactive features in Spring 5.2
Reactive ?
Reactive Programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. - Wikipedia
RSocket ?
S-Rocket ?
WebSockets ?
Binary application protocol providing reactive streams semantics using (a)sync message passing over a single connection for use on byte stream transports (as TCP, WebSockets and Aeron)
Who ?
Supported by
How ?
4 Symmetric Interaction Models ● Request-Response ○ Stream of one (1 to 1) ○ Mono<Payload> response = client.requestResponse(Payload request) ● Fire-and-Forget ○ No response (1 to 0) ○ Mono<Void> response = client.fireForget(Payload request) ● Request-Stream ○ Finite stream of many (1 to N) ○ Flux<Payload> out = client.requestStream(Payload request) ● Channel ○ Bi-directional streams (M to N) ○ Flux<Payload> out = client.requestChannel(Flux<Payload> in)
Transport Agnostic ● TCP ● WebSocket ● Aeron UDP
Implementations ● Java ● Kotlin ● Go ● JavaScript ● Python ● .Net ● C++
Features ● Bi-directional ● Multiplexed ● Message based ● Binary ● Protocol ● Back Pressure ● Session Resumption
Bi-Directional ● Both partners can be client and server ● No client/server distinction ● No requestor/responder distinction ● Even HTTP/2 makes this distinction
Multiplexed ● HTTP 1.0 one connection per request => very inefficient ● HTTP 1.1 pipeline (no random order) => head-of-line blocking ● Multiple “logical streams” within one connection ● Messages are tagged with stream-id ● Random order
Message Driven Binary Protocol ● Message split into discrete binary frames ● Machine-to-machine efficiency ● Payload can be anything
Reactive Streams Back Pressure ● TCP already queues data to some extent ● Reactive Streams (pull-push) backpressure: ○ Materialize data ○ Transfer data ○ Only when receiver is ready
Session Resumption ● Long-lived streams ● Across different transport connections ● E.g. mobile re-connection
Why ?
It removes some of the roadblocks to use reactive everywhere
Motivation ● support for interaction models beyond request/response such as streaming responses and push ● application-level flow control semantics across network boundaries (async pull/push of bounded batch sizes) ● binary, multiplexed use of a single connection ● support resumption of long-lived subscriptions across transport connections ● need of an application protocol in order to use transport protocols such as WebSockets and Aeron
gRPC vs RSocket ● Framework ● HTTP/2 ● Protobuf ● No native web component ● No reactive semantics
Future ?
Other Incubation Projects ● Spring R2DBC ● Spring Reactive Gateway
Netifi Cloud-Native Application Platform Built on RSocket Dramatically reduces operational overhead AI-driven load balancing and management Reactive software components
Netifi https://www.netifi.com/ ● Netifi ● RSocket
Demo
On gitlab https://gitlab.com/tothepoint/tutorials/rsocket-demo.git
Client Properties Server Properties server: port: 7001 rsocket: port: 7000 spring: rsocket: server: port: 7000 main: lazy-initialization: true <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-rsocket</artifactId> </dependency> RSocket Dependency
Client Beans @Bean fun rSocket(@Value("${rsocket.port}") port: Int) = RSocketFactory .connect() .dataMimeType(MimeTypeUtils.APPLICATION_JSON_VALUE) .frameDecoder(PayloadDecoder.ZERO_COPY) .transport(TcpClientTransport.create(port)) .start() .block() @Bean fun requester(rSocketStrategies: RSocketStrategies, @Value("${rsocket.port}") port: Int) = RSocketRequester .wrap(rSocket(port)!!, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies)
Client Fire-Forget Server Fire-Forget @GetMapping("/fire-forget") fun fireForget(): Mono<Void> = requester .route("fire-forget") .data(Request()) .send() @MessageMapping("fire-forget") fun fireForget(request: Request): Mono<Void> { logger.debug { "fireForget($request)" } return Mono.empty() }
Client Request-Response Server Request-Response @GetMapping("/request-response") fun requestResponse(): Mono<Response> = requester .route("request-response") .data(Request()) .retrieveMono(Response::class.java) @MessageMapping("request-response") fun requestResponse(request: Request): Mono<Response> { logger.debug { "requestResponse($request)" } val response = Response.random() return Mono.just(response) }
Client Request-Stream Server Request-Stream @GetMapping(value = ["/request-stream"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE]) fun requestStream(): Flux<Response> = requester .route("request-stream") .data(Request()) .retrieveFlux(Response::class.java) @MessageMapping("request-stream") fun requestStream(request: Request): Flux<Response> { logger.debug { "requestStream($request)" } val responseStream = Stream.generate { Response.random() } return Flux .fromStream(responseStream) .delayElements(Duration.ofSeconds(5)) }
Links
Links ● https://rsocket.io/ ● https://www.slideshare.net/Pivotal/welcome-to-the-reactive-revolutionrsocket-and-spring-cloud-gateway-spencer-gibb ● https://www.youtube.com/watch?v=dGNv-Djm7h0

Reactive micro services using RSocket