Skip to content

Commit f4ec399

Browse files
committed
Remove deps
1 parent c24e8fc commit f4ec399

File tree

3 files changed

+76
-58
lines changed

3 files changed

+76
-58
lines changed

build.sc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ object examples extends mill.define.Module {
8686
}
8787

8888
object client extends ScalaModule {
89-
def ivyDeps = Agg(ivy"co.fs2::fs2-io:${versions.fs2}", ivy"eu.monniot::fs2-process:0.4.4")
89+
def ivyDeps = Agg(ivy"co.fs2::fs2-io:${versions.fs2}")
9090
def moduleDeps = Seq(fs2.jvm(versions.scala213))
9191
def scalaVersion = versions.scala213Version
9292
def forkEnv: Target[Map[String, String]] = T {
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package examples.client
2+
3+
import fs2.Stream
4+
import cats.effect._
5+
import cats.syntax.all._
6+
import scala.jdk.CollectionConverters._
7+
import java.io.OutputStream
8+
9+
trait ChildProcess[F[_]] {
10+
def stdin: fs2.Pipe[F, Byte, Unit]
11+
def stdout: Stream[F, Byte]
12+
def stderr: Stream[F, Byte]
13+
}
14+
15+
object ChildProcess {
16+
17+
def spawn[F[_]: Async](command: String*): Stream[F, ChildProcess[F]] =
18+
Stream.bracket(start[F](command))(_._2).map(_._1)
19+
20+
val readBufferSize = 512
21+
private def start[F[_]: Async](command: Seq[String]) = Async[F].interruptible {
22+
val p =
23+
new java.lang.ProcessBuilder(command.asJava)
24+
.start() // .directory(new java.io.File(wd)).start()
25+
val done = Async[F].fromCompletableFuture(Sync[F].delay(p.onExit()))
26+
27+
val terminate: F[Unit] = Sync[F].interruptible(p.destroy())
28+
29+
import cats._
30+
val onGlobal = new (F ~> F) {
31+
def apply[A](fa: F[A]): F[A] = Async[F].evalOn(fa, scala.concurrent.ExecutionContext.global)
32+
}
33+
34+
val cp = new ChildProcess[F] {
35+
def stdin: fs2.Pipe[F, Byte, Unit] =
36+
writeOutputStreamFlushingChunks[F](Sync[F].interruptible(p.getOutputStream()))
37+
38+
def stdout: fs2.Stream[F, Byte] = fs2.io
39+
.readInputStream[F](Sync[F].interruptible(p.getInputStream()), chunkSize = readBufferSize).translate(onGlobal)
40+
41+
def stderr: fs2.Stream[F, Byte] = fs2.io
42+
.readInputStream[F](Sync[F].blocking(p.getErrorStream()), chunkSize = readBufferSize).translate(onGlobal)
43+
// Avoids broken pipe - we cut off when the program ends.
44+
// Users can decide what to do with the error logs using the exitCode value
45+
.interruptWhen(done.void.attempt)
46+
}
47+
(cp, terminate)
48+
}
49+
50+
/** Adds a flush after each chunk
51+
*/
52+
def writeOutputStreamFlushingChunks[F[_]](
53+
fos: F[OutputStream],
54+
closeAfterUse: Boolean = true
55+
)(implicit F: Sync[F]): fs2.Pipe[F, Byte, Nothing] =
56+
s => {
57+
def useOs(os: OutputStream): Stream[F, Nothing] =
58+
s.chunks.foreach(c => F.interruptible(os.write(c.toArray)) >> F.blocking(os.flush()))
59+
60+
val os =
61+
if (closeAfterUse) Stream.bracket(fos)(os => F.blocking(os.close()))
62+
else Stream.eval(fos)
63+
os.flatMap(os => useOs(os) ++ Stream.exec(F.blocking(os.flush())))
64+
}
65+
66+
}

examples/client/src/examples/client/ClientMain.scala

Lines changed: 9 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,10 @@ import cats.syntax.all._
1111
import fs2.Stream
1212
import jsonrpclib.StubTemplate
1313
import cats.effect.std.Dispatcher
14-
import scala.sys.process.ProcessIO
1514
import cats.effect.implicits._
16-
import scala.sys.process.{Process => SProcess}
1715
import java.io.OutputStream
1816
import java.io.InputStream
17+
import examples.client.ChildProcess
1918

2019
object ClientMain extends IOApp.Simple {
2120

@@ -38,69 +37,22 @@ object ClientMain extends IOApp.Simple {
3837
_ <- log("Starting client")
3938
serverJar <- sys.env.get("SERVER_JAR").liftTo[IOStream](new Exception("SERVER_JAR env var does not exist"))
4039
// Starting the server
41-
(serverStdin, serverStdout, serverStderr) <- Stream.resource(process("java", "-jar", serverJar))
42-
pipeErrors = serverStderr.through(fs2.io.stderr)
40+
rp <- ChildProcess.spawn[IO]("java", "-jar", serverJar)
4341
// Creating a channel that will be used to communicate to the server
4442
fs2Channel <- FS2Channel
45-
.lspCompliant[IO](serverStdout, serverStdin, cancelTemplate = cancelEndpoint.some)
46-
.concurrently(pipeErrors)
43+
.lspCompliant[IO](rp.stdout, rp.stdin, cancelTemplate = cancelEndpoint.some)
44+
.concurrently(rp.stderr.through(fs2.io.stderr))
4745
// Opening the stream to be able to send and receive data
4846
_ <- fs2Channel.openStream
4947
// Creating a `IntWrapper => IO[IntWrapper]` stub that can call the server
5048
increment = fs2Channel.simpleStub[IntWrapper, IntWrapper]("increment")
51-
result <- Stream.eval(increment(IntWrapper(0)))
52-
_ <- log(s"Client received $result")
49+
result1 <- Stream.eval(increment(IntWrapper(0)))
50+
_ <- log(s"Client received $result1")
51+
result2 <- Stream.eval(increment(result1))
52+
_ <- log(s"Client received $result2")
5353
_ <- log("Terminating client")
5454
} yield ()
55-
run.compile.drain.timeout(2.second)
55+
run.compile.drain
5656
}
5757

58-
/** Wraps the spawning of a subprocess into fs2 friendly semantics
59-
*/
60-
import scala.concurrent.duration._
61-
def process(command: String*) = for {
62-
dispatcher <- Dispatcher[IO]
63-
stdinPromise <- IO.deferred[fs2.Pipe[IO, Byte, Unit]].toResource
64-
stdoutPromise <- IO.deferred[fs2.Stream[IO, Byte]].toResource
65-
stderrPromise <- IO.deferred[fs2.Stream[IO, Byte]].toResource
66-
makeProcessBuilder = IO(sys.process.stringSeqToProcess(command))
67-
makeProcessIO = IO(
68-
new ProcessIO(
69-
in = { (outputStream: OutputStream) =>
70-
val pipe = writeOutputStreamFlushingChunks(IO(outputStream))
71-
val fulfil = stdinPromise.complete(pipe)
72-
dispatcher.unsafeRunSync(fulfil)
73-
},
74-
out = { (inputStream: InputStream) =>
75-
val stream = fs2.io.readInputStream(IO(inputStream), 512)
76-
val fulfil = stdoutPromise.complete(stream)
77-
dispatcher.unsafeRunSync(fulfil)
78-
},
79-
err = { (inputStream: InputStream) =>
80-
val stream = fs2.io.readInputStream(IO(inputStream), 512)
81-
val fulfil = stderrPromise.complete(stream)
82-
dispatcher.unsafeRunSync(fulfil)
83-
}
84-
)
85-
)
86-
makeProcess = (makeProcessBuilder, makeProcessIO).flatMapN { case (b, io) => IO.blocking(b.run(io)) }
87-
_ <- Resource.make(makeProcess)((runningProcess) => IO.blocking(runningProcess.destroy()))
88-
pipes <- (stdinPromise.get, stdoutPromise.get, stderrPromise.get).tupled.toResource
89-
} yield pipes
90-
91-
/** Adds a flush after each chunk
92-
*/
93-
def writeOutputStreamFlushingChunks[F[_]](
94-
fos: F[OutputStream],
95-
closeAfterUse: Boolean = true
96-
)(implicit F: Sync[F]): fs2.Pipe[F, Byte, Nothing] =
97-
s => {
98-
def useOs(os: OutputStream): Stream[F, Nothing] =
99-
s.chunks.foreach(c => F.interruptible(os.write(c.toArray)) >> F.blocking(os.flush()))
100-
101-
val os =
102-
if (closeAfterUse) Stream.bracket(fos)(os => F.blocking(os.close()))
103-
else Stream.eval(fos)
104-
os.flatMap(os => useOs(os) ++ Stream.exec(F.blocking(os.flush())))
105-
}
10658
}

0 commit comments

Comments
 (0)