Skip to content

Commit 376dea5

Browse files
committed
Using stream to manage lifecycles instead of Resource
When using concurrently, if a background stream fails, the foreground stream fails too.
1 parent c3f43e9 commit 376dea5

File tree

9 files changed

+82
-73
lines changed

9 files changed

+82
-73
lines changed

core/src/jsonrpclib/Payload.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package jsonrpclib
22

3-
import java.util.Base64
4-
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
53
import com.github.plokhotnyuk.jsoniter_scala.core.JsonReader
4+
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
65
import com.github.plokhotnyuk.jsoniter_scala.core.JsonWriter
76

7+
import java.util.Base64
8+
89
final case class Payload(array: Array[Byte]) {
910
override def equals(other: Any) = other match {
1011
case bytes: Payload => java.util.Arrays.equals(array, bytes.array)

core/src/jsonrpclib/internals/FutureBaseChannel.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package jsonrpclib
22

3+
import jsonrpclib.Endpoint.NotificationEndpoint
4+
import jsonrpclib.Endpoint.RequestResponseEndpoint
5+
import jsonrpclib.internals._
6+
7+
import java.util.concurrent.atomic.AtomicLong
38
import scala.concurrent.ExecutionContext
49
import scala.concurrent.Future
5-
import jsonrpclib.internals._
610
import scala.concurrent.Promise
7-
import java.util.concurrent.atomic.AtomicLong
8-
import jsonrpclib.Endpoint.NotificationEndpoint
9-
import jsonrpclib.Endpoint.RequestResponseEndpoint
1011
import scala.util.Try
1112

1213
abstract class FutureBasedChannel(endpoints: List[Endpoint[Future]])(implicit ec: ExecutionContext)

core/src/jsonrpclib/internals/LSPHeaders.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package jsonrpclib
22
package internals
33

4-
import java.io.InputStream
5-
import java.io.DataInputStream
6-
import java.io.Reader
74
import java.io.BufferedReader
8-
import java.io.OutputStream
9-
import java.io.DataOutputStream
105
import java.io.BufferedWriter
6+
import java.io.DataInputStream
7+
import java.io.DataOutputStream
8+
import java.io.InputStream
9+
import java.io.OutputStream
10+
import java.io.Reader
1111
import java.nio.charset.StandardCharsets
1212

1313
// Content-Length: ...\r\n

core/src/jsonrpclib/internals/Message.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package jsonrpclib
22
package internals
33

4-
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
54
import com.github.plokhotnyuk.jsoniter_scala.core.JsonReader
5+
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
66
import com.github.plokhotnyuk.jsoniter_scala.core.JsonWriter
77

88
sealed trait Message { def maybeCallId: Option[CallId] }

core/src/jsonrpclib/internals/RawMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import java.io.OutputStream
77
import java.io.InputStream
88

99
private[jsonrpclib] case class RawMessage(
10-
jsonRPC: String,
10+
jsonrpc: String,
1111
method: Option[String] = None,
1212
result: Option[Payload] = None,
1313
error: Option[ErrorPayload] = None,

fs2/src/jsonrpclib/fs2interop/FS2Channel.scala

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,27 @@
11
package jsonrpclib
22
package fs2interop
33

4-
import jsonrpclib.internals.MessageDispatcher
5-
6-
import _root_.fs2.Stream
74
import _root_.fs2.Pipe
8-
import jsonrpclib.internals._
9-
import scala.util.Try
5+
import _root_.fs2.Stream
6+
import cats.Applicative
7+
import cats.Defer
8+
import cats.Functor
109
import cats.Monad
11-
import cats.syntax.all._
10+
import cats.MonadThrow
11+
import cats.data.Kleisli
1212
import cats.effect.implicits._
1313
import cats.effect.kernel._
14-
import scala.util.Failure
15-
import scala.util.Success
16-
import cats.Applicative
17-
import cats.data.Kleisli
18-
import cats.MonadThrow
14+
import cats.effect.std.Supervisor
15+
import cats.effect.std.syntax.supervisor
16+
import cats.syntax.all._
1917
import jsonrpclib.StubTemplate._
20-
import cats.Defer
21-
import cats.Functor
18+
import jsonrpclib.internals.MessageDispatcher
2219
import jsonrpclib.internals.OutputMessage._
23-
import cats.effect.std.syntax.supervisor
24-
import cats.effect.std.Supervisor
20+
import jsonrpclib.internals._
21+
22+
import scala.util.Failure
23+
import scala.util.Success
24+
import scala.util.Try
2525

2626
trait FS2Channel[F[_]] extends Channel[F] {
2727
def withEndpoint(endpoint: Endpoint[F])(implicit F: Functor[F]): Resource[F, Unit] =
@@ -38,21 +38,21 @@ object FS2Channel {
3838
byteSink: Pipe[F, Byte, Nothing],
3939
startingEndpoints: List[Endpoint[F]] = List.empty,
4040
bufferSize: Int = 512
41-
): Resource[F, FS2Channel[F]] = internals.LSP.writeSink(byteSink, bufferSize).flatMap { sink =>
41+
): Stream[F, FS2Channel[F]] = internals.LSP.writeSink(byteSink, bufferSize).flatMap { sink =>
4242
apply[F](internals.LSP.readStream(byteStream), sink, startingEndpoints)
4343
}
4444

4545
def apply[F[_]: Concurrent](
4646
payloadStream: Stream[F, Payload],
4747
payloadSink: Payload => F[Unit],
4848
startingEndpoints: List[Endpoint[F]] = List.empty[Endpoint[F]]
49-
): Resource[F, FS2Channel[F]] = {
49+
): Stream[F, FS2Channel[F]] = {
5050
val endpointsMap = startingEndpoints.map(ep => ep.method -> ep).toMap
5151
for {
52-
supervisor <- Supervisor[F]
53-
ref <- Ref[F].of(State[F](Map.empty, endpointsMap, 0)).toResource
52+
supervisor <- Stream.resource(Supervisor[F])
53+
ref <- Ref[F].of(State[F](Map.empty, endpointsMap, 0)).toStream
5454
impl = new Impl(payloadSink, ref, supervisor)
55-
_ <- payloadStream.evalMap(impl.handleReceivedPayload).compile.drain.background
55+
_ <- Stream(()).concurrently(payloadStream.evalMap(impl.handleReceivedPayload))
5656
} yield impl
5757
}
5858

fs2/src/jsonrpclib/fs2interop/internals/LSP.scala

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,27 @@
11
package jsonrpclib.fs2interop.internals
22

3-
import fs2.Chunk
4-
import fs2.Stream
5-
import java.nio.charset.Charset
6-
import java.nio.charset.StandardCharsets
7-
import jsonrpclib.Payload
83
import cats.MonadThrow
9-
import cats.effect.std.Queue
104
import cats.effect.Concurrent
11-
import cats.implicits._
125
import cats.effect.implicits._
136
import cats.effect.kernel.Resource
7+
import cats.effect.std.Queue
8+
import cats.implicits._
9+
import fs2.Chunk
10+
import fs2.Stream
11+
import jsonrpclib.Payload
12+
13+
import java.nio.charset.Charset
14+
import java.nio.charset.StandardCharsets
1415

1516
object LSP {
1617

1718
def writeSink[F[_]: Concurrent](
1819
writePipe: fs2.Pipe[F, Byte, Nothing],
1920
bufferSize: Int
20-
): Resource[F, Payload => F[Unit]] =
21-
Queue.bounded[F, Payload](bufferSize).toResource.flatMap { queue =>
21+
): Stream[F, Payload => F[Unit]] =
22+
Stream.eval(Queue.bounded[F, Payload](bufferSize)).flatMap { queue =>
2223
val payloads = fs2.Stream.fromQueueUnterminated(queue, bufferSize)
23-
payloads.map(writeChunk).flatMap(Stream.chunk(_)).compile.drain.background.void.as(queue.offer(_))
24+
Stream(queue.offer(_)).concurrently(payloads.map(writeChunk).flatMap(Stream.chunk(_)))
2425
}
2526

2627
/** Split a stream of bytes into payloads by extracting each frame based on information contained in the headers.

fs2/src/jsonrpclib/fs2interop/package.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
package jsonrpclib
22

3+
import _root_.fs2.Stream
34
import cats.MonadThrow
45
import cats.Monad
56

67
package object fs2interop {
78

9+
private[jsonrpclib] implicit class EffectOps[F[_], A](private val fa: F[A]) extends AnyVal {
10+
def toStream: Stream[F, A] = Stream.eval(fa)
11+
}
12+
813
implicit def catsMonadic[F[_]: MonadThrow]: Monadic[F] = new Monadic[F] {
914
def doFlatMap[A, B](fa: F[A])(f: A => F[B]): F[B] = Monad[F].flatMap(fa)(f)
1015

fs2/test/src/jsonrpclib/fs2/FS2ChannelSpec.scala

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,23 @@
11
package jsonrpclib.fs2interop
22

3-
import weaver._
4-
import jsonrpclib._
5-
import jsonrpclib.fs2interop.FS2Channel
3+
import cats.data.Chain
64
import cats.effect.IO
5+
import cats.effect.implicits._
76
import cats.effect.kernel.Ref
8-
import cats.data.Chain
7+
import cats.effect.kernel.Resource
98
import cats.effect.std.Queue
10-
import jsonrpclib.Payload
11-
import jsonrpclib.Endpoint
12-
import cats.effect.implicits._
13-
import scala.concurrent.duration._
149
import cats.syntax.all._
15-
16-
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
1710
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
11+
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
12+
import fs2.Stream
1813
import jsonrpclib.Codec
19-
import cats.effect.kernel.Resource
14+
import jsonrpclib.Endpoint
15+
import jsonrpclib.Payload
16+
import jsonrpclib._
17+
import jsonrpclib.fs2interop.FS2Channel
18+
import weaver._
19+
20+
import scala.concurrent.duration._
2021

2122
object FS2ChannelSpec extends SimpleIOSuite {
2223

@@ -25,20 +26,20 @@ object FS2ChannelSpec extends SimpleIOSuite {
2526
implicit val jcodec: JsonValueCodec[IntWrapper] = JsonCodecMaker.make
2627
}
2728

28-
def testRes(name: TestName)(run: Resource[IO, Expectations]): Unit =
29-
test(name)(run.use(e => IO.pure(e)))
29+
def testRes(name: TestName)(run: Stream[IO, Expectations]): Unit =
30+
test(name)(run.compile.lastOrError)
3031

3132
testRes("Round trip") {
3233
val endpoint: Endpoint[IO] = Endpoint[IO]("inc").simple((int: IntWrapper) => IO(IntWrapper(int.int + 1)))
3334

3435
for {
35-
stdout <- Queue.bounded[IO, Payload](10).toResource
36-
stdin <- Queue.bounded[IO, Payload](10).toResource
37-
serverSideChannel <- FS2Channel[IO](fs2.Stream.fromQueueUnterminated(stdin), stdout.offer)
38-
clientSideChannel <- FS2Channel[IO](fs2.Stream.fromQueueUnterminated(stdout), stdin.offer)
39-
_ <- serverSideChannel.withEndpoint(endpoint)
36+
stdout <- Queue.bounded[IO, Payload](10).toStream
37+
stdin <- Queue.bounded[IO, Payload](10).toStream
38+
serverSideChannel <- FS2Channel[IO](Stream.fromQueueUnterminated(stdin), stdout.offer)
39+
clientSideChannel <- FS2Channel[IO](Stream.fromQueueUnterminated(stdout), stdin.offer)
40+
_ <- Stream.resource(serverSideChannel.withEndpoint(endpoint))
4041
remoteFunction = clientSideChannel.simpleStub[IntWrapper, IntWrapper]("inc")
41-
result <- remoteFunction(IntWrapper(1)).toResource
42+
result <- remoteFunction(IntWrapper(1)).toStream
4243
} yield {
4344
expect.same(result, IntWrapper(2))
4445
}
@@ -47,12 +48,12 @@ object FS2ChannelSpec extends SimpleIOSuite {
4748
testRes("Endpoint not mounted") {
4849

4950
for {
50-
stdout <- Queue.bounded[IO, Payload](10).toResource
51-
stdin <- Queue.bounded[IO, Payload](10).toResource
52-
serverSideChannel <- FS2Channel[IO](fs2.Stream.fromQueueUnterminated(stdin), stdout.offer)
53-
clientSideChannel <- FS2Channel[IO](fs2.Stream.fromQueueUnterminated(stdout), stdin.offer)
51+
stdout <- Queue.bounded[IO, Payload](10).toStream
52+
stdin <- Queue.bounded[IO, Payload](10).toStream
53+
serverSideChannel <- FS2Channel[IO](Stream.fromQueueUnterminated(stdin), stdout.offer)
54+
clientSideChannel <- FS2Channel[IO](Stream.fromQueueUnterminated(stdout), stdin.offer)
5455
remoteFunction = clientSideChannel.simpleStub[IntWrapper, IntWrapper]("inc")
55-
result <- remoteFunction(IntWrapper(1)).attempt.toResource
56+
result <- remoteFunction(IntWrapper(1)).attempt.toStream
5657
} yield {
5758
expect.same(result, Left(ErrorPayload(-32601, "Method inc not found", None)))
5859
}
@@ -66,13 +67,13 @@ object FS2ChannelSpec extends SimpleIOSuite {
6667
}
6768

6869
for {
69-
stdout <- Queue.bounded[IO, Payload](10).toResource
70-
stdin <- Queue.bounded[IO, Payload](10).toResource
71-
serverSideChannel <- FS2Channel[IO](fs2.Stream.fromQueueUnterminated(stdin), payload => stdout.offer(payload))
72-
clientSideChannel <- FS2Channel[IO](fs2.Stream.fromQueueUnterminated(stdout), payload => stdin.offer(payload))
73-
_ <- serverSideChannel.withEndpoint(endpoint)
70+
stdout <- Queue.bounded[IO, Payload](10).toStream
71+
stdin <- Queue.bounded[IO, Payload](10).toStream
72+
serverSideChannel <- FS2Channel[IO](Stream.fromQueueUnterminated(stdin), payload => stdout.offer(payload))
73+
clientSideChannel <- FS2Channel[IO](Stream.fromQueueUnterminated(stdout), payload => stdin.offer(payload))
74+
_ <- Stream.resource(serverSideChannel.withEndpoint(endpoint))
7475
remoteFunction = clientSideChannel.simpleStub[IntWrapper, IntWrapper]("inc")
75-
timedResults <- (1 to 10).toList.map(IntWrapper(_)).parTraverse(remoteFunction).toResource.timed
76+
timedResults <- (1 to 10).toList.map(IntWrapper(_)).parTraverse(remoteFunction).timed.toStream
7677
} yield {
7778
val (time, results) = timedResults
7879
expect.same(results, (2 to 11).toList.map(IntWrapper(_))) &&

0 commit comments

Comments
 (0)