@@ -14,13 +14,16 @@ import jsonrpclib.internals.MessageDispatcher
1414import jsonrpclib .internals ._
1515
1616import scala .util .Try
17+ import _root_ .fs2 .concurrent .SignallingRef
1718
1819trait FS2Channel [F [_]] extends Channel [F ] {
1920 def withEndpoint (endpoint : Endpoint [F ])(implicit F : Functor [F ]): Resource [F , Unit ] =
2021 Resource .make(mountEndpoint(endpoint))(_ => unmountEndpoint(endpoint.method))
2122
2223 def withEndpoints (endpoint : Endpoint [F ], rest : Endpoint [F ]* )(implicit F : Monad [F ]): Resource [F , Unit ] =
2324 (endpoint :: rest.toList).traverse_(withEndpoint)
25+
26+ def open : Resource [F , Unit ]
2427}
2528
2629object FS2Channel {
@@ -42,16 +45,22 @@ object FS2Channel {
4245 val endpointsMap = startingEndpoints.map(ep => ep.method -> ep).toMap
4346 for {
4447 supervisor <- Stream .resource(Supervisor [F ])
45- ref <- Ref [F ].of(State [F ](Map .empty, endpointsMap, 0 )).toStream
46- impl = new Impl (payloadSink, ref, supervisor)
47- _ <- Stream (()).concurrently(payloadStream.evalMap(impl.handleReceivedPayload))
48+ ref <- Ref [F ].of(State [F ](Map .empty, endpointsMap, 0 , false )).toStream
49+ isOpen <- SignallingRef [F ].of(false ).toStream
50+ impl = new Impl (payloadSink, ref, isOpen, supervisor)
51+ _ <- Stream (()).concurrently {
52+ // Gatekeeping the pull until the channel is actually marked as open
53+ val wait = isOpen.waitUntil(identity)
54+ payloadStream.evalTap(_ => wait).evalMap(impl.handleReceivedPayload)
55+ }
4856 } yield impl
4957 }
5058
5159 private case class State [F [_]](
5260 pendingCalls : Map [CallId , OutputMessage => F [Unit ]],
5361 endpoints : Map [String , Endpoint [F ]],
54- counter : Long
62+ counter : Long ,
63+ isOpen : Boolean
5564 ) {
5665 def nextCallId : (State [F ], CallId ) = (this .copy(counter = counter + 1 ), CallId .NumberId (counter))
5766 def storePendingCall (callId : CallId , handle : OutputMessage => F [Unit ]): State [F ] =
@@ -67,11 +76,15 @@ object FS2Channel {
6776 }
6877 def removeEndpoint (method : String ): State [F ] =
6978 copy(endpoints = endpoints.removed(method))
79+
80+ def open : State [F ] = copy(isOpen = true )
81+ def close : State [F ] = copy(isOpen = false )
7082 }
7183
7284 private class Impl [F [_]](
7385 private val sink : Payload => F [Unit ],
7486 private val state : Ref [F , FS2Channel .State [F ]],
87+ private val isOpen : SignallingRef [F , Boolean ],
7588 supervisor : Supervisor [F ]
7689 )(implicit F : Concurrent [F ])
7790 extends MessageDispatcher [F ]
@@ -88,6 +101,8 @@ object FS2Channel {
88101
89102 def unmountEndpoint (method : String ): F [Unit ] = state.update(_.removeEndpoint(method))
90103
104+ def open : Resource [F , Unit ] = Resource .make[F , Unit ](isOpen.set(true ))(_ => isOpen.set(false ))
105+
91106 protected def background [A ](fa : F [A ]): F [Unit ] = supervisor.supervise(fa).void
92107 protected def reportError (params : Option [Payload ], error : ProtocolError , method : String ): F [Unit ] = ???
93108 protected def getEndpoint (method : String ): F [Option [Endpoint [F ]]] = state.get.map(_.endpoints.get(method))
0 commit comments