Skip to content

rsocket/rsocket-rust

rsocket-rust

GitHub Workflow Status Build Status Crates.io Crates.io License GitHub Release

rsocket-rust is an implementation of the RSocket protocol in Rust(1.39+). It's an alpha version and still under active development. Do not use it in a production environment!

Example

Here are some example codes which show how RSocket works in Rust.

Dependencies

Add dependencies in your Cargo.toml.

[dependencies] tokio = "1.0.3" rsocket_rust = "0.7" # add transport dependencies: # rsocket_rust_transport_tcp = "0.7" # rsocket_rust_transport_websocket = "0.7"

Server

extern crate log; use futures::executor::block_on; use rsocket_rust::prelude::*; use rsocket_rust::utils::EchoRSocket; use rsocket_rust::Result; use rsocket_rust_transport_tcp::*; #[tokio::main] async fn main() -> Result<()> { env_logger::builder().format_timestamp_millis().init(); RSocketFactory::receive() .transport(TcpServerTransport::from("127.0.0.1:7979")) .acceptor(Box::new(|setup, _sending_socket| { info!("incoming socket: setup={:?}", setup); Ok(Box::new(block_on(async move { RSocketFactory::connect() .transport(TcpClientTransport::from("127.0.0.1:7878")) .acceptor(Box::new(|| Box::new(EchoRSocket))) .setup(Payload::from("I'm Rust!")) .start() .await .unwrap() }))) })) .serve() .await }

Client

extern crate log; use rsocket_rust::prelude::*; use rsocket_rust::utils::EchoRSocket; use rsocket_rust::Result; use rsocket_rust_transport_tcp::TcpClientTransport; #[tokio::main] async fn main() -> Result<()> { env_logger::builder().format_timestamp_millis().init(); let client = RSocketFactory::connect() .transport(TcpClientTransport::from("127.0.0.1:7878")) .acceptor(Box::new(|| { // Return a responder. Box::new(EchoRSocket) })) .start() .await .expect("Connect failed!"); let req = Payload::builder().set_data_utf8("Ping!").build(); match client.request_response(req).await { Ok(res) => info!("{:?}", res), Err(e) => error!("{}", e), } Ok(()) }

Implement RSocket trait

Example for access Redis(crates):

NOTICE: add dependency in Cargo.toml => redis = { version = "0.19.0", features = [ "aio" ] }

use std::str::FromStr; use redis::Client as RedisClient; use rsocket_rust::async_trait; use rsocket_rust::prelude::*; use rsocket_rust::Result; #[derive(Clone)] pub struct RedisDao { inner: RedisClient, } // Create RedisDao from str. // Example: RedisDao::from_str("redis://127.0.0.1").expect("Connect redis failed!"); impl FromStr for RedisDao { type Err = redis::RedisError; fn from_str(s: &str) -> std::result::Result<Self, Self::Err> { let client = redis::Client::open(s)?; Ok(RedisDao { inner: client }) } } #[async_trait] impl RSocket for RedisDao { async fn request_response(&self, req: Payload) -> Result<Option<Payload>> { let client = self.inner.clone(); let mut conn = client.get_async_connection().await?; let value: redis::RedisResult<Option<String>> = redis::cmd("GET") .arg(&[req.data_utf8()]) .query_async(&mut conn) .await; match value { Ok(Some(value)) => Ok(Some(Payload::builder().set_data_utf8(&value).build())), Ok(None) => Ok(None), Err(e) => Err(e.into()), } } async fn metadata_push(&self, _req: Payload) -> Result<()> { todo!() } async fn fire_and_forget(&self, _req: Payload) -> Result<()> { todo!() } fn request_stream(&self, _req: Payload) -> Flux<Result<Payload>> { todo!() } fn request_channel(&self, _reqs: Flux<Result<Payload>>) -> Flux<Result<Payload>> { todo!() } }

TODO

  • Operations
    • METADATA_PUSH
    • REQUEST_FNF
    • REQUEST_RESPONSE
    • REQUEST_STREAM
    • REQUEST_CHANNEL
  • More Operations
    • Error
    • Cancel
    • Fragmentation
    • Resume
    • Keepalive
  • QoS
    • RequestN
    • Lease
  • Transport
    • TCP
    • Websocket
    • WASM
  • Reactor
    • ...
  • High Level APIs
    • Client
    • Server

About

RSocket Rust Implementation using Tokio

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Packages

No packages published

Contributors 9