@@ -10,8 +10,8 @@ import scala.concurrent.Promise
1010import java .util .concurrent .atomic .AtomicLong
1111import jsonrpclib .Endpoint .NotificationEndpoint
1212import jsonrpclib .Endpoint .RequestResponseEndpoint
13- import jsonrpclib .EndpointTemplate .NotificationTemplate
14- import jsonrpclib .EndpointTemplate .RequestResponseTemplate
13+ import jsonrpclib .StubTemplate .NotificationTemplate
14+ import jsonrpclib .StubTemplate .RequestResponseTemplate
1515import jsonrpclib .internals .OutputMessage .ErrorMessage
1616import jsonrpclib .internals .OutputMessage .ResponseMessage
1717import scala .util .Try
@@ -20,6 +20,7 @@ private[jsonrpclib] abstract class MessageDispatcher[F[_]](implicit F: Monadic[F
2020
2121 import F ._
2222
23+ protected def background [A ](fa : F [A ]): F [Unit ]
2324 protected def reportError (params : Option [Payload ], error : ProtocolError , method : String ): F [Unit ]
2425 protected def getEndpoint (method : String ): F [Option [Endpoint [F ]]]
2526 protected def sendMessage (message : Message ): F [Unit ]
@@ -34,7 +35,7 @@ private[jsonrpclib] abstract class MessageDispatcher[F[_]](implicit F: Monadic[F
3435 sendMessage(message)
3536 }
3637
37- def requestResponseStub [In , Err , Out ](
38+ def stub [In , Err , Out ](
3839 method : String
3940 )(implicit inCodec : Codec [In ], errCodec : ErrorCodec [Err ], outCodec : Codec [Out ]): In => F [Either [Err , Out ]] = {
4041 (input : In ) =>
@@ -43,7 +44,7 @@ private[jsonrpclib] abstract class MessageDispatcher[F[_]](implicit F: Monadic[F
4344 val message = InputMessage .RequestMessage (method, callId, Some (encoded))
4445 doFlatMap(createPromise[Either [Err , Out ]]()) { case (fulfill, future) =>
4546 val pc = createPendingCall(method, errCodec, outCodec, fulfill)
46- doFlatMap(storePendingCall(callId, pc))(_ => future())
47+ doFlatMap(storePendingCall(callId, pc))(_ => doFlatMap(sendMessage(message))(_ => future() ))
4748 }
4849 }
4950 }
@@ -52,7 +53,7 @@ private[jsonrpclib] abstract class MessageDispatcher[F[_]](implicit F: Monadic[F
5253 Codec .decode[Message ](Some (payload)).map {
5354 case im : InputMessage =>
5455 doFlatMap(getEndpoint(im.method)) {
55- case Some (ep) => executeInputMessage(im, ep)
56+ case Some (ep) => background( executeInputMessage(im, ep) )
5657 case None =>
5758 im.maybeCallId match {
5859 case None =>
@@ -63,7 +64,11 @@ private[jsonrpclib] abstract class MessageDispatcher[F[_]](implicit F: Monadic[F
6364 sendProtocolError(callId, error)
6465 }
6566 }
66- case im : OutputMessage => doPure(())
67+ case om : OutputMessage =>
68+ doFlatMap(removePendingCall(om.callId)) {
69+ case Some (pendingCall) => pendingCall(om)
70+ case None => doPure(()) // TODO do something
71+ }
6772 } match {
6873 case Left (error) =>
6974 sendProtocolError(error)
@@ -117,8 +122,8 @@ private[jsonrpclib] abstract class MessageDispatcher[F[_]](implicit F: Monadic[F
117122 message match {
118123 case ErrorMessage (_, errorPayload) =>
119124 errCodec.decode(errorPayload) match {
120- case Left (decodeError) => fulfill(scala.util.Failure (decodeError ))
121- case Right (value) => fulfill(scala.util.Success (Left (value)))
125+ case Left (_) => fulfill(scala.util.Failure (errorPayload ))
126+ case Right (value) => fulfill(scala.util.Success (Left (value)))
122127 }
123128 case ResponseMessage (_, data) =>
124129 outCodec.decode(Some (data)) match {
0 commit comments