Skip to content
14 changes: 14 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
version = "3.5.3"
runner.dialect = scala213
maxColumn = 120
fileOverride {
"glob:**/fs2/src/**" {
runner.dialect = scala213source3
}
"glob:**/fs2/test/src/**" {
runner.dialect = scala213source3
}
"glob:**/core/test/src-jvm-native/**" {
runner.dialect = scala213source3
}
"glob:**/core/src/**" {
runner.dialect = scala213source3
}
}
40 changes: 29 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,23 +1,41 @@
[![CI](https://github.com/neandertech/jsonrpclib/actions/workflows/ci.yml/badge.svg)](https://github.com/neandertech/jsonrpclib/actions/workflows/ci.yml)

[![jsonrpclib-fs2 Scala version support](https://index.scala-lang.org/neandertech/jsonrpclib/jsonrpclib-fs2/latest-by-scala-version.svg?platform=jvm](https://index.scala-lang.org/neandertech/jsonrpclib/jsonrpclib-fs2)

[![jsonrpclib-fs2 Scala version support](https://index.scala-lang.org/neandertech/jsonrpclib/jsonrpclib-fs2/latest-by-scala-version.svg?platform=sjs1](https://index.scala-lang.org/neandertech/jsonrpclib/jsonrpclib-fs2)


# jsonrpclib

This is a cross-platform, cross-scala-version [jsonrpc](https://www.jsonrpc.org/) library that provides construct for bidirectional communication using
the jsonrpc protocol.
This is a cross-platform, cross-scala-version library that provides construct for bidirectional communication using the [jsonrpc](https://www.jsonrpc.org/) protocol. It is built on top of [fs2](https://fs2.io/#/) and [jsoniter-scala](https://github.com/plokhotnyuk/jsoniter-scala)

This library does not enforce any transport, and can work on top of stdin/stdout or other channels.

## Installation

The dependencies below are following [cross-platform semantics](http://youforgotapercentagesignoracolon.com/).
Adapt according to your needs

### SBT

This library does not enforce any transport, and works as long as you can provide input/output byte streams.
```scala
libraryDependencies += "tech.neander" %%% "jsonrpclib-fs2" % version
```

### Mill

## Dev Notes
```scala
override def ivyDeps = super.ivyDeps() ++ Agg(ivy"tech.neander::jsonrpclib-fs2::$version")
```

### Scala-native
### Scala-cli

See
* https://github.com/scala-native/scala-native/blob/63d07093f6d0a6e9de28cd8f9fb6bc1d6596c6ec/test-interface/src/main/scala/scala/scalanative/testinterface/NativeRPC.scala
```scala
//> using lib "tech.neander::jsonrpclib-fs2:<VERSION>"
```

## Usage

### Scala-js
**/!\ Please be aware that this library is in its early days and offers strictly no guarantee with regards to backward compatibility**

See
* https://github.com/scala-js/scala-js-js-envs/blob/main/nodejs-env/src/main/scala/org/scalajs/jsenv/nodejs/ComSupport.scala#L245
* https://github.com/scala-js/scala-js/blob/0708917912938714d52be1426364f78a3d1fd269/test-bridge/src/main/scala/org/scalajs/testing/bridge/JSRPC.scala
See the examples folder
25 changes: 24 additions & 1 deletion build.sc
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import mill.define.Target
import mill.util.Jvm
import $ivy.`com.lihaoyi::mill-contrib-bloop:$MILL_VERSION`
import $ivy.`io.github.davidgregory084::mill-tpolecat::0.3.1`
import $ivy.`de.tototec::de.tobiasroeser.mill.vcs.version::0.1.4`
Expand All @@ -20,6 +22,7 @@ object versions {
val scalaNativeVersion = "0.4.4"
val munitVersion = "0.7.29"
val munitNativeVersion = "1.0.0-M6"
val fs2 = "3.2.11"

val scala213 = "2.13"
val scala212 = "2.12"
Expand Down Expand Up @@ -59,7 +62,7 @@ object fs2 extends RPCCrossPlatformModule { cross =>

override def crossPlatformModuleDeps = Seq(core)
def crossPlatformIvyDeps: T[Agg[Dep]] = Agg(
ivy"co.fs2::fs2-core::3.2.8"
ivy"co.fs2::fs2-core::${versions.fs2}"
)

object jvm extends mill.Cross[JvmModule](scala213, scala3)
Expand All @@ -74,6 +77,26 @@ object fs2 extends RPCCrossPlatformModule { cross =>

}

object examples extends mill.define.Module {

object server extends ScalaModule {
def ivyDeps = Agg(ivy"co.fs2::fs2-io:${versions.fs2}")
def moduleDeps = Seq(fs2.jvm(versions.scala213))
def scalaVersion = versions.scala213Version
}

object client extends ScalaModule {
def ivyDeps = Agg(ivy"co.fs2::fs2-io:${versions.fs2}", ivy"eu.monniot::fs2-process:0.4.4")
def moduleDeps = Seq(fs2.jvm(versions.scala213))
def scalaVersion = versions.scala213Version
def forkEnv: Target[Map[String, String]] = T {
val assembledServer = server.assembly()
super.forkEnv() ++ Map("SERVER_JAR" -> assembledServer.path.toString())
}
}

}

// #############################################################################
// COMMON SETUP
// #############################################################################
Expand Down
15 changes: 15 additions & 0 deletions devnotes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Dev Notes

In case somebody wants to implement future-based channels, here are some source of inspiration :

### Scala-native

See
* https://github.com/scala-native/scala-native/blob/63d07093f6d0a6e9de28cd8f9fb6bc1d6596c6ec/test-interface/src/main/scala/scala/scalanative/testinterface/NativeRPC.scala


### Scala-js

See
* https://github.com/scala-js/scala-js-js-envs/blob/main/nodejs-env/src/main/scala/org/scalajs/jsenv/nodejs/ComSupport.scala#L245
* https://github.com/scala-js/scala-js/blob/0708917912938714d52be1426364f78a3d1fd269/test-bridge/src/main/scala/org/scalajs/testing/bridge/JSRPC.scala
106 changes: 106 additions & 0 deletions examples/client/src/examples/client/ClientMain.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package examples.server

import jsonrpclib.CallId
import jsonrpclib.fs2._
import cats.effect._
import fs2.io._
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
import jsonrpclib.Endpoint
import cats.syntax.all._
import fs2.Stream
import jsonrpclib.StubTemplate
import cats.effect.std.Dispatcher
import scala.sys.process.ProcessIO
import cats.effect.implicits._
import scala.sys.process.{Process => SProcess}
import java.io.OutputStream
import java.io.InputStream

object ClientMain extends IOApp.Simple {

// Reserving a method for cancelation.
val cancelEndpoint = CancelTemplate.make[CallId]("$/cancel", identity, identity)

// Creating a datatype that'll serve as a request (and response) of an endpoint
case class IntWrapper(value: Int)
object IntWrapper {
implicit val jcodec: JsonValueCodec[IntWrapper] = JsonCodecMaker.make
}

type IOStream[A] = fs2.Stream[IO, A]
def log(str: String): IOStream[Unit] = Stream.eval(IO.consoleForIO.errorln(str))

def run: IO[Unit] = {
import scala.concurrent.duration._
// Using errorln as stdout is used by the RPC channel
val run = for {
_ <- log("Starting client")
serverJar <- sys.env.get("SERVER_JAR").liftTo[IOStream](new Exception("SERVER_JAR env var does not exist"))
// Starting the server
(serverStdin, serverStdout, serverStderr) <- Stream.resource(process("java", "-jar", serverJar))
pipeErrors = serverStderr.through(fs2.io.stderr)
// Creating a channel that will be used to communicate to the server
fs2Channel <- FS2Channel
.lspCompliant[IO](serverStdout, serverStdin, cancelTemplate = cancelEndpoint.some)
.concurrently(pipeErrors)
// Opening the stream to be able to send and receive data
_ <- fs2Channel.openStream
// Creating a `IntWrapper => IO[IntWrapper]` stub that can call the server
increment = fs2Channel.simpleStub[IntWrapper, IntWrapper]("increment")
result <- Stream.eval(increment(IntWrapper(0)))
_ <- log(s"Client received $result")
_ <- log("Terminating client")
} yield ()
run.compile.drain.timeout(2.second)
}

/** Wraps the spawning of a subprocess into fs2 friendly semantics
*/
import scala.concurrent.duration._
def process(command: String*) = for {
dispatcher <- Dispatcher[IO]
stdinPromise <- IO.deferred[fs2.Pipe[IO, Byte, Unit]].toResource
stdoutPromise <- IO.deferred[fs2.Stream[IO, Byte]].toResource
stderrPromise <- IO.deferred[fs2.Stream[IO, Byte]].toResource
makeProcessBuilder = IO(sys.process.stringSeqToProcess(command))
makeProcessIO = IO(
new ProcessIO(
in = { (outputStream: OutputStream) =>
val pipe = writeOutputStreamFlushingChunks(IO(outputStream))
val fulfil = stdinPromise.complete(pipe)
dispatcher.unsafeRunSync(fulfil)
},
out = { (inputStream: InputStream) =>
val stream = fs2.io.readInputStream(IO(inputStream), 512)
val fulfil = stdoutPromise.complete(stream)
dispatcher.unsafeRunSync(fulfil)
},
err = { (inputStream: InputStream) =>
val stream = fs2.io.readInputStream(IO(inputStream), 512)
val fulfil = stderrPromise.complete(stream)
dispatcher.unsafeRunSync(fulfil)
}
)
)
makeProcess = (makeProcessBuilder, makeProcessIO).flatMapN { case (b, io) => IO.blocking(b.run(io)) }
_ <- Resource.make(makeProcess)((runningProcess) => IO.blocking(runningProcess.destroy()))
pipes <- (stdinPromise.get, stdoutPromise.get, stderrPromise.get).tupled.toResource
} yield pipes

/** Adds a flush after each chunk
*/
def writeOutputStreamFlushingChunks[F[_]](
fos: F[OutputStream],
closeAfterUse: Boolean = true
)(implicit F: Sync[F]): fs2.Pipe[F, Byte, Nothing] =
s => {
def useOs(os: OutputStream): Stream[F, Nothing] =
s.chunks.foreach(c => F.interruptible(os.write(c.toArray)) >> F.blocking(os.flush()))

val os =
if (closeAfterUse) Stream.bracket(fos)(os => F.blocking(os.close()))
else Stream.eval(fos)
os.flatMap(os => useOs(os) ++ Stream.exec(F.blocking(os.flush())))
}
}
41 changes: 41 additions & 0 deletions examples/server/src/examples/server/ServerMain.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package examples.server

import jsonrpclib.CallId
import jsonrpclib.fs2._
import cats.effect._
import fs2.io._
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
import jsonrpclib.Endpoint
import cats.syntax.all._

object ServerMain extends IOApp.Simple {

// Reserving a method for cancelation.
val cancelEndpoint = CancelTemplate.make[CallId]("$/cancel", identity, identity)

// Creating a datatype that'll serve as a request (and response) of an endpoint
case class IntWrapper(value: Int)
object IntWrapper {
implicit val jcodec: JsonValueCodec[IntWrapper] = JsonCodecMaker.make
}

// Implementing an incrementation endpoint
val increment = Endpoint[IO]("increment").simple { in: IntWrapper =>
IO.consoleForIO.errorln(s"Server received $in") >>
IO.pure(in.copy(value = in.value + 1))
}

def run: IO[Unit] = {
// Using errorln as stdout is used by the RPC channel
IO.consoleForIO.errorln("Starting server") >>
FS2Channel
.lspCompliant[IO](fs2.io.stdin[IO](bufSize = 512), fs2.io.stdout[IO], cancelTemplate = cancelEndpoint.some)
.flatMap(_.withEndpointStream(increment)) // mounting an endpoint onto the channel
.flatMap(_.openStreamForever) // starts the communication
.compile
.drain
.guarantee(IO.consoleForIO.errorln("Terminating server"))
}

}
37 changes: 24 additions & 13 deletions fs2/src/jsonrpclib/fs2/FS2Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,44 +19,53 @@ import scala.util.Try
import _root_.fs2.concurrent.SignallingRef

trait FS2Channel[F[_]] extends Channel[F] {
def withEndpoint(endpoint: Endpoint[F])(implicit F: Functor[F]): Resource[F, Unit] =
Resource.make(mountEndpoint(endpoint))(_ => unmountEndpoint(endpoint.method))
def withEndpoint(endpoint: Endpoint[F])(implicit F: Functor[F]): Resource[F, FS2Channel[F]] =
Resource.make(mountEndpoint(endpoint))(_ => unmountEndpoint(endpoint.method)).map(_ => this)

def withEndpoints(endpoint: Endpoint[F], rest: Endpoint[F]*)(implicit F: Monad[F]): Resource[F, Unit] =
(endpoint :: rest.toList).traverse_(withEndpoint)
def withEndpointStream(endpoint: Endpoint[F])(implicit F: MonadCancelThrow[F]): Stream[F, FS2Channel[F]] =
Stream.resource(withEndpoint(endpoint))

def withEndpoints(endpoint: Endpoint[F], rest: Endpoint[F]*)(implicit F: Monad[F]): Resource[F, FS2Channel[F]] =
(endpoint :: rest.toList).traverse_(withEndpoint).as(this)

def withEndpointStream(endpoint: Endpoint[F], rest: Endpoint[F]*)(implicit
F: MonadCancelThrow[F]
): Stream[F, FS2Channel[F]] =
Stream.resource(withEndpoints(endpoint, rest: _*))

def open: Resource[F, Unit]
def openStream: Stream[F, Unit]
def openStreamForever: Stream[F, Nothing]
}

object FS2Channel {

def lspCompliant[F[_]: Concurrent](
byteStream: Stream[F, Byte],
byteSink: Pipe[F, Byte, Nothing],
byteSink: Pipe[F, Byte, Unit],
bufferSize: Int = 512,
maybeCancelTemplate: Option[CancelTemplate] = None
cancelTemplate: Option[CancelTemplate] = None
): Stream[F, FS2Channel[F]] = internals.LSP.writeSink(byteSink, bufferSize).flatMap { sink =>
apply[F](internals.LSP.readStream(byteStream), sink, maybeCancelTemplate)
apply[F](internals.LSP.readStream(byteStream), sink, cancelTemplate)
}

def apply[F[_]: Concurrent](
payloadStream: Stream[F, Payload],
payloadSink: Payload => F[Unit],
maybeCancelTemplate: Option[CancelTemplate] = None
cancelTemplate: Option[CancelTemplate] = None
): Stream[F, FS2Channel[F]] = {
for {
supervisor <- Stream.resource(Supervisor[F])
ref <- Ref[F].of(State[F](Map.empty, Map.empty, Map.empty, 0)).toStream
isOpen <- SignallingRef[F].of(false).toStream
awaitingSink = isOpen.waitUntil(identity) >> payloadSink(_: Payload)
impl = new Impl(awaitingSink, ref, isOpen, supervisor, maybeCancelTemplate)
impl = new Impl(awaitingSink, ref, isOpen, supervisor, cancelTemplate)

// Creating a bespoke endpoint to receive cancelation requests
maybeCancelEndpoint: Option[Endpoint[F]] = maybeCancelTemplate.map { cancelTemplate =>
implicit val codec: Codec[cancelTemplate.C] = cancelTemplate.codec
Endpoint[F](cancelTemplate.method).notification[cancelTemplate.C] { request =>
val callId = cancelTemplate.toCallId(request)
maybeCancelEndpoint: Option[Endpoint[F]] = cancelTemplate.map { ct =>
implicit val codec: Codec[ct.C] = ct.codec
Endpoint[F](ct.method).notification[ct.C] { request =>
val callId = ct.toCallId(request)
impl.cancel(callId)
}
}
Expand Down Expand Up @@ -120,6 +129,7 @@ object FS2Channel {

def open: Resource[F, Unit] = Resource.make[F, Unit](isOpen.set(true))(_ => isOpen.set(false))
def openStream: Stream[F, Unit] = Stream.resource(open)
def openStreamForever: Stream[F, Nothing] = openStream.evalMap(_ => F.never)

protected[fs2] def cancel(callId: CallId): F[Unit] = state.get.map(_.runningCalls.get(callId)).flatMap {
case None => F.unit
Expand All @@ -138,6 +148,7 @@ object FS2Channel {
protected def reportError(params: Option[Payload], error: ProtocolError, method: String): F[Unit] = ???
protected def getEndpoint(method: String): F[Option[Endpoint[F]]] = state.get.map(_.endpoints.get(method))
protected def sendMessage(message: Message): F[Unit] = sink(Codec.encode(message))

protected def nextCallId(): F[CallId] = state.modify(_.nextCallId)
protected def createPromise[A](callId: CallId): F[(Try[A] => F[Unit], () => F[A])] = Deferred[F, Try[A]].map {
promise =>
Expand Down
2 changes: 1 addition & 1 deletion fs2/src/jsonrpclib/fs2/internals/LSP.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import java.nio.charset.StandardCharsets
object LSP {

def writeSink[F[_]: Concurrent](
writePipe: fs2.Pipe[F, Byte, Nothing],
writePipe: fs2.Pipe[F, Byte, Unit],
bufferSize: Int
): Stream[F, Payload => F[Unit]] =
Stream.eval(Queue.bounded[F, Payload](bufferSize)).flatMap { queue =>
Expand Down