DEV Community

Scott
Scott

Posted on

Using Cap'n Proto on Rust

Introduction

Recently at work, I have been working a lot with gRPC and Golang, and I felt enjoy using gRPC as an application interface. It is simple to implement and works well.. Anyway about 2 weeks ago while reading about various technologies related to my work. I came across Cap'n Proto and was interested to learn that it was created by one of the primary authors of Protocol Buffers v2. Since I was already using Protocol Buffers with gRPC at work, I decided to try out Cap'n Proto + Cap'n Proto RPC with Rust.

What is Cap'n Proto?

Cap'n Proto is a incredibly fast data serialization format. If you want to know more about Cap'n Proto in general, please visit their website!. If you are familiar with Protocol buffers, then a majority of the following concepts will be familiar to you.

My Goal

My goal here is to create a simple API that accepts a list of parameters, and a source directory then return a job ID, or accepts a job ID and returns the results. See the following diagrams.

Alt Text

Alt Text

NOTE: The example built in this article will be split into the following files. main.rs, server.rs, client.rs, build.rs, service.capnp

Cap'n Proto Interface Definition

With a goal in mind, I set out and defined my service.capnp as follows:

@0xeb1bbbd418f18514; interface Job { register @0 Request -> RegistryItem; getResult @1 RegistryItem -> Result; struct Request { parms @0 :List(Text); source @1 :Text; } struct Result { union { link @0 :Text; error @1 :Text; } } struct RegistryItem { # Single Job Identity id @0 :Text; } } 

NOTE: You can write your methods in a more verbose format, however this is not recommended as you will have to pry! multiple times in your server implementation.

Right away you can see we have defined a Job interface with two methods register and getResults, and three data structures Request, Result, and RegistryItem.

To actually use the defined RPC interface, we will need to generate Rust code from the above service.capnp file.

Generate the Code

In order to generate the code, you will need to create a build.rs file as follows.

extern crate capnpc; fn main() { ::capnpc::CompilerCommand::new().file("service.capnp").run().unwrap(); } 

This will generate the RPC code from the service.capnp file defined in the previous section, it roughly translates into the following:

  • interface Job {...}

    • Creates pub mod job for client/server communications
  • register @0 Request -> RegistryItem

    • Adds register method to job::Server trait
    • Implements register_request for the job::Client struct to generating a Cap'nProto Request object for the register method.
    • Creates type job::RegisterParams which controls access to incoming data on the server side.
    • creates type job::RegisterResults which controls access to outgoing data on the server side.
  • getResult @1 RegistryItem -> Result

    • Adds get_result method to job::Server trait
    • Implements get_result_request for the job::Client struct to generating a Cap'nProto Request object for the get_result method.
    • Creates type job::GetResultParams which controls access to incoming data on the server side.
    • creates type job::GetResultResults which controls access to outgoing data on the server side.
  • struct Request {...}

    • Creates a pub mod request (job::request) submodule
    • Creates type job::request::Reader containing getters for each capnp parameter
    • Creates type job::request::Builder containing setters for each capnp parameter
  • struct Result {...}

    • Creates a pub mod result (job::result) submodule
    • Creates type job::result::Reader containing getters for each capnp parameter
    • Creates type job::result::Builder containing setters for each capnp parameter
  • struct RegistryItem {...}

    • Creates a pub mod registry_item (job::registry_item) submodule
    • Creates type job::registry_item::Reader containing getters for each capnp parameter
    • Creates type job::registry_item::Builder containing setters for each capnp parameter

Importing the Generated Code

To use the code we just generated, we must import into our source as follows.

use crate::service_capnp::{job}; 

To build our service we still need to create an RPC client and server.

Creating the Server

The requires implementing 2 things.

  • Create the local implementation of your interface
  • Create your interface methods

Implement the local interface

First things first, we need to create a local struct upon which we can implement our interface methods. This is easy.

struct Job; 

Once this is finished we get to the interesting part. The implementation of our methods.

Implement the methods

Cap'nProto methods will always follow the below signature.

impl <imported_rpc_interface>::Server for <local_struct> { fn <rpc_method_name>(&mut self, params: <imported_rpc_interface>::<sentence_case_rpc_method_name>Params, mut results: <imported_rpc_interface>::<sentence_case_rpc_method_name>Results) -> Promise<(), capnp::Error> { // Do Something } } 

or in our case, the following..

impl job::Server for Job { fn register(&mut self, params: job::RegisterParams, mut results: job::RegisterResults) -> Promise<(), capnp::Error> { // Do Something } fn get_result(&mut self, params: job::GetResultParams, mut results: job::GetResultResults) -> Promise<(), capnp::Error> { // Do Something } } 

For the purposes of this article the business logic for the methods is scaled back to printing out received values, and sending back static responses.

impl job::Server for Job { fn register(&mut self, params: job::RegisterParams, mut results: job::RegisterResults) -> Promise<(), capnp::Error> { println!("Processing Register") // get a reader object for the sent request let request_reader = pry!(params.get()); // get the value for the sent parms parameter let parms = request_reader.get_parms().unwrap(); // get the value for the sent source parameter let source = request_reader.get_source().unwrap(); for item in parms.iter() { // print each parms value println!("Parameter: {}", item); } // print the source value println!("DataSource: {}", source); // set return value for id results.get().set_id("01E3S4Q9SN3VHVXB2KCAGD9P62"); Promise::ok(()) } fn get_result(&mut self, params: job::GetResultParams, mut results: job::GetResultResults) -> Promise<(), capnp::Error> { println!("Processing Get Result"); // get a reader object for the sent request let request_reader = pry!(params.get()); // get the send ID let id = request_reader.get_id(); // print the sent ID println!("ID: {}", id); // set return value for link results.get().set_link("https://link.to.your/output.csv"); Promise::ok(()) } } 

NOTE: pry! is a macro from the capnp_rpc crate that acts like try!(), but for functions that return a Promise rather than a Result.

We wrap it all together and build our server::main() with the following.

use capnp::capability::Promise; use capnp_rpc::{pry, RpcSystem}; use capnp_rpc::twoparty::{VatNetwork}; use capnp_rpc::rpc_twoparty_capnp::{Side}; use async_std::net::{TcpListener}; use futures::{AsyncReadExt, FutureExt, StreamExt}; use futures::task::LocalSpawn; use crate::service_capnp::{job}; struct Job; impl job::Server for Job { fn register(&mut self, params: job::RegisterParams, mut results: job::RegisterResults) -> Promise<(), capnp::Error> { println!("Processing Register") // get a reader object for the sent request let request_reader = pry!(params.get()); // get the value for the sent parms parameter let parms = request_reader.get_parms().unwrap(); // get the value for the sent source parameter let source = request_reader.get_source().unwrap(); for item in parms.iter() { // print each parms value println!("Parameter: {}", item); } // print the source value println!("DataSource: {}", source); // set return value for id results.get().set_id("01E3S4Q9SN3VHVXB2KCAGD9P62"); Promise::ok(()) } fn get_result(&mut self, params: job::GetResultParams, mut results: job::GetResultResults) -> Promise<(), capnp::Error> { println!("Processing Get Result"); // get a reader object for the sent request let request_reader = pry!(params.get()); // get the send ID let id = request_reader.get_id(); // print the sent ID println!("ID: {}", id); // set return value for link results.get().set_link("https://link.to.your/output.csv"); Promise::ok(()) } } pub fn main(arg_addr: &String) { use std::net::ToSocketAddrs; // parse input address.. let addr = arg_addr.to_socket_addrs().unwrap().next().expect("Invalid Address"); let mut exec = futures::executor::LocalPool::new(); let spawner = exec.spawner(); // Start Server let _server: Result<(), Box<dyn std::error::Error>> = exec.run_until(async move { // Initialize TCP listener @ addr let listener = TcpListener::bind(&addr).await?; // Initialize local interface defined above let job_client = job::ToClient::new(Job).into_client::<capnp_rpc::Server>(); // Initialize and execute async handler let mut incoming = listener.incoming(); let handle_incoming = async move { while let Some(socket_result) = incoming.next().await { let sock = socket_result?; println!("Accepted connection from {:?}", sock.peer_addr()); sock.set_nodelay(true)?; let (reader, writer) = sock.split(); let network = VatNetwork::new(reader, writer, Side::Server, Default::default()); let rpc_system = RpcSystem::new(Box::new(network), Some(job_client.clone().client)); spawner.spawn_local_obj(Box::pin(rpc_system.map(|_| ())).into())?; } Ok::<(), Box<dyn std::error::Error>>(()) }; handle_incoming.await?; Ok(()) }); } 

Creating the Client

Creating the client only requires creating the main loop for sending our requests as follows.

use capnp_rpc::RpcSystem; use capnp_rpc::twoparty::VatNetwork; use capnp_rpc::rpc_twoparty_capnp::Side; use async_std::net::TcpStream; use futures::{AsyncReadExt, FutureExt}; use futures::task::LocalSpawn; use crate::service_capnp::{job}; pub fn main(arg_addr: &String) { use std::net::ToSocketAddrs; // Resolve input server address let addr = arg_addr.to_socket_addrs().unwrap().next().expect("Invalid Address"); // Create a pool for executing async requests let mut exec = futures::executor::LocalPool::new(); let spawner = exec.spawner(); // Execute the client let _client: Result<(), Box<dyn std::error::Error>> = exec.run_until(async move { // Create RPC connection to the server let stream = TcpStream::connect(&addr).await?; stream.set_nodelay(true)?; let (reader, writer) = stream.split(); let network = Box::new( VatNetwork::new(reader, writer, Side::Client, Default::default()) ); let mut rpc_system = RpcSystem::new(network, None); // Setup the client interface let job_client: job::Client = rpc_system.bootstrap(Side::Server); spawner.spawn_local_obj(Box::pin(rpc_system.map(|_|())).into())?; { println!("Sending Register Job Request..."); // Create a register request object let mut request = job_client.register_request(); // Set parms - list parameters require initialization let mut parms_setter = request.get().init_parms(2); parms_setter.set(0, "dark"); parms_setter.set(1, "world"); // Set source - strings do not require prior initialization request.get().set_source("/link/to/the/path"); // Send request, and await response let response = request.send().promise.await?; println!("Received JobID: {}", response.get().unwrap().get_id().unwrap()) } { println!("Sending Get Result Request..."); // Create get_result request object let mut request = job_client.get_result_request(); // Set JobID request.get().set_id("01E3S4Q9SN3VHVXB2KCAGD9P62"); // Send request, and await response let response = request.send().promise.await?; // The result is of the union type, so we must determine which type it is match response.get()?.which()? { job::result::Link(t) => { println!("Received Link: {}", t?); } job::result::Error(t) => { println!("Received Error: {}", t?); } } } Ok(()) }); } 

Creating The main()

For the purposes of this article, both the client and server will be part of the same binary. As such, we have a dedicated main.rs which looks as follows.

// Include capnp generated code pub mod source_capnp { include!(concat!(env!("OUT_DIR"), "/source_capnp.rs")); } pub mod client; pub mod server; fn main() { let args: Vec<String> = ::std::env::args().collect(); if args.len() >= 2 { match &args[1][..] { "client" => return client::main(&args[2]), "server" => return server::main(&args[2]), _ => () } } println!("usage: {} [client | server] ADDRESS", args[0]); } 

Running the server

We will run our server on localhost:2020, and should receive the following output (pre-client)

16:46:22 🖎 export-rust master* ± cargo run server localhost:2020 Compiling export-rust v0.1.0 (/home/procyclinsur/test/export-rust) Finished dev [unoptimized + debuginfo] target(s) in 2.90s Running `target/debug/export-rust server 'localhost:2020'` 

Running the client

Similarly the client will run against the server running at localhost:2020. When the client is run however there should be output in both the server terminal and the client terminal as follows.

Client

16:50:53 🖎 export-rust master* ± cargo run client localhost:2020 Finished dev [unoptimized + debuginfo] target(s) in 0.04s Running `target/debug/export-rust client 'localhost:2020'` Sending Register Job Request... Received JobID: 01E3S4Q9SN3VHVXB2KCAGD9P62 Sending Get Result Request... https://link.to.your/output.csv 

Server

16:46:22 🖎 export-rust master* ± cargo run server localhost:2020 Compiling export-rust v0.1.0 (/home/procyclinsur/test/export-rust) Finished dev [unoptimized + debuginfo] target(s) in 2.90s Running `target/debug/export-rust server 'localhost:2020'` Accepted connection from Ok(V6([::1]:36374)) Processing Register Parameter: dark Parameter: world DataSource: /link/to/the/path Processing Get Result ID: 01E3S4Q9SN3VHVXB2KCAGD9P62 

As you can see, we are able to send and receive data via our API powered by Cap'n Proto RPC. If you love it, hate it, have any questions, or suggestions for improvement please let me know!

Top comments (1)

Collapse
 
dotxlem profile image
Dan

Thanks for this! Great resource.

Can I ask what you use to make your sequence diagrams? I really like the aesthetic :)