Scala(e) to the large Concurrent programming in Scala and relevant Frameworks Gianluca Aguzzi gianluca.aguzzi@unibo.it Dipartimento di Informatica – Scienza e Ingegneria (DISI) Alma Mater Studiorum – Università di Bologna Talk @ Paradigmi di Progettazione e Sviluppo 03/06/2022 Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 1 / 58
Introduction Contents 1 Introduction 2 Concurrency Low-level API Asynchronous Programming (scala.concurrent) Task (IO) Monad Observable (Functional Reactive Programming) 3 Frameworks Industry-ready Research-Oriented Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 2 / 58
Introduction Why Scala? Several languages address concurrency by design (e.g. Erlang, Go, ...) Scala is used for several distributed & concurrent computing frameworks Both in research (ScalaLoci ®, ScaFi ®) and in industry (Akka ®, Spark ®, Flink ®) Reasons: 1 Scala syntactic flexibility: enable crafting of embedded domain-specific languages, with Scala serving as a host language, emulating several programming models (e.g., actor) 2 Scala is a safe language: static type safety reduces the amount of possible errors that developers have to care of 3 Interoperability: Scala programs can seamlessly use existing Java libraries, and interact with Java’s rich ecosystem (recently with JS and Native too) Today topics: 1 Recall on low-level concurrency API (JVM model) 2 Future & Promise as platform-independent concurrency mechanisms 3 Functional concurrency management: Monad effects (IO & Task) and (Functional) Reactive streams 4 An application example leveraging Effects & Functional Reactive Programming (FRP) Code repositories: Examples of concurrency in Scala ® FRP game ® “Monadic” GUI ® Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 3 / 58
Concurrency Contents 1 Introduction 2 Concurrency Low-level API Asynchronous Programming (scala.concurrent) Task (IO) Monad Observable (Functional Reactive Programming) 3 Frameworks Industry-ready Research-Oriented Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 4 / 58
Concurrency Low-level API Contents 1 Introduction 2 Concurrency Low-level API Asynchronous Programming (scala.concurrent) Task (IO) Monad Observable (Functional Reactive Programming) 3 Frameworks Industry-ready Research-Oriented Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 5 / 58
Concurrency Low-level API JVM concurrency API in Scala I The low-level Scala API is similar to the JVM counterpart Thread Independent computations occurring in the same process Creation & starting of concurrent computation val thread = Thread(() => println("Here") //OOP style class MyThread extends Thread: override def run: Unit = ... thread.start() // side effect, the computation starts Explicit synchornization through join val threadA = ... val threadB = ... threadA.start() threadA.join() // synchornization point, A andThen B threadB.start() Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 6 / 58
Concurrency Low-level API JVM concurrency API in Scala II Monitor Ensure exclusive access to resources Synchronization and communication among users Implemented through synchronized (not a keyword!!), wait, notify and notifyAll Or by using high-level API (java.util.concurrent) trait Counter: def tick(): Unit def value: Int object ThreadSafeCounter: def apply(): Counter = new Counter: private var count = 0 // encapsulated state // synchronized is not a keyword.. // so you cannot write synchronized def tick ... def tick(): Unit = this.synchronized(count += 1) def value = this.synchronized(count) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 7 / 58
Concurrency Low-level API Considerations Typically, Threads (and low-level APIs) are not used directly in Scala Threads computations do not compose well (we will see what we mean with composition) . . . . . . and Scala loves composition (e.g., for-comprehension & monads) Threads are not declarative (“encapsulate” the effect of computations) scala.concurrent exposes several high-level patterns to handle concurrency & synchronization JVM API cannot be used in different platforms (i.e., Scala.js) Use these APIs when: you need to build very high-performant application you create an application that targets only the JVM ecosystem in all the other cases, you should use scala.concurrent API Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 8 / 58
Concurrency Low-level API Execution Context I An ExecutionContext can execute program logic asynchronously, typically but not necessarily on a thread pool First layer of abstraction over Threads used for large number of small concurrent tasks with high throughput Similar to the Executors Java framework, but platform-independent (works bot from JVM, JS, Native) Bring to pass the execution context “implictily” (i.e., using given & using) def sayGreet()(using context: ExecutionContext): Unit = context.execute(() => println("run!")) @main def executeTask: Unit = val context = ExecutionContext.global // global execution context context.execute(() => println("Task done!")) val fromExecutors = ExecutionContext .fromExecutor(Executors.newSingleThreadExecutor()) fromExecutors.execute(() => println("Java wrapper")) //sayGreet() // error given ExecutionContext = fromExecutors // enrich the context sayGreet() Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 9 / 58
Concurrency Low-level API Execution Context II Used as an “execution” context platform for high level concurrent abstraction (e.g., Future) We can built a “context”-oriented DSL for async computation: /* In Scala, a natural way to "enrich" the new "language" consists in using contexts + entrypoints. */ def async(any: => Unit)(using ExecutionContext): Unit = summon[ExecutionContext].execute(() => any) @main def tryDsl(): Unit = // express the context ==> enrich the language given ExecutionContext = ExecutionContext.global println("Do somethings") async { // I can use async like a language API println("order??") } println("After") Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 10 / 58
Concurrency Low-level API Parallell collections example: toward declative parallel computations Scala collections can be transformed into their parallel counterparts by calling the par method (as standard library for Scala < 2.13, an external dependecy for Scala >= 2.13 ®) It exists solely in order to improve the running time of the program Idea: I express a data pipeline computation and then the runtime optimizes it using several cores (0 until 100000).par.map(_.toString).filter(x => x == x.reverse) Can I reach the same declarative approach for a general concurrent program? Yes, with Future Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 11 / 58
Concurrency Asynchronous Programming (scala.concurrent) Contents 1 Introduction 2 Concurrency Low-level API Asynchronous Programming (scala.concurrent) Task (IO) Monad Observable (Functional Reactive Programming) 3 Frameworks Industry-ready Research-Oriented Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 12 / 58
Concurrency Asynchronous Programming (scala.concurrent) Future Definition “A Future represents a value which may or may not currently be available, but will be available at some point, or as an exception if that value could not be made available.” It is the idiomatic way to express concurrent computations You can handle the result of a Future with callback and transformation methods Future is a monad that models latency as an effect Futures are one-shot: the value contained by a future will be eventually available Futures are eager: the computation starts on Future creation Futures memoize the value: once the computation is over, the data will be always the same Examples: HTTP requests, File writing & reading, TCP connections, ... On notation A future value: trait Future[T] A future computation is an asynchronous computation that produces a future value: def something[T](...)(using ExecutionContext): Future[T] Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 13 / 58
Concurrency Asynchronous Programming (scala.concurrent) Future API I Future Creation // To create a future computation, you have to enrich the context using ExecutionContext = scala.concurrent.ExecutionContext.global Future { 1 } // Future are eager, i.e., the computation starts on creation val fututers = (1 to 10).map(Future.apply) // create sequence of future // standard Monad sequence, applied to Future :) Future.sequential(futures) // Map F[Future[_]] in Future[F[_]] Future Callbacks Future(10) .onComplete { case Success(value) => println(s"Hurray! $value") case Failure(exception) => println("Oh no..") } Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 14 / 58
Concurrency Asynchronous Programming (scala.concurrent) Future API II Future Manipulation // Concurrent program expressed as sequence of map val build = Future(Source.fromFile("build.sbt")) .map(source => source.getLines()) .map(lines => lines.mkString("n")) val scalafmt = Future(Source.fromFile(".scalafmt.conf")) .map(source => source.getLines()) .map(lines => lines.mkString("n")) // NB! scalafmt and build are concurrent here.. // Concurrent Program can be composed val combine = build.flatMap(data => scalafmt.map(other => data + other)) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 15 / 58
Concurrency Asynchronous Programming (scala.concurrent) Future API III For Comprehension def extractLines(source: Source): String = source.getLines().mkString("n") def readFile(name: String): Future[Source] = Future { Source.fromFile(name) } val concurrentManipulation = for { buildSbt <- readFile("build.sbt") // synchornization point, scalafmt has to wait buildSbt future // NB! these two futures are sequential scalafmt <- readFile(".scalafmt.conf") // I can map the "lazy" data inside a Future "for-comprehension".. fileSbt = extractLines(buildSbt) fileFmt = extractLines(scalafmt) } yield (fileFmt + fileSbt) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 16 / 58
Concurrency Asynchronous Programming (scala.concurrent) Future API IV Context management val myContext = ExecutionContext .fromExecutor(Executors.newSingleThreadExecutor()) def createFuture(using ExecutionContext) = Future { println(s"Hey! The thread is: ${Thread.currentThread().getName}") } /* I can pass explicitly the context (i.e., when I want a fine control for execution) */ createFuture(using global) given ExecutionContext = myContext // I can overwrite the execution context createFuture Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 17 / 58
Concurrency Asynchronous Programming (scala.concurrent) Future API V Exception Management val failed = Future.fromTry(Failure(new Exception("fail"))) failed.onComplete { case Failure(exception) => println("error..") } // Special operator that waits for a Future.. Sync point (to avoid) Await.ready(failed, Duration.Inf) // creates a future that completes if the future fails val fail = Future(1).failed fail.foreach(println) Await.ready(fail, Duration.Inf) val empty = Future(1).filter(_ => false) // You can consume the future using foreach (like a Traversable) empty.failed.foreach(println) // failed Await.ready(empty, Duration.Inf) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 18 / 58
Concurrency Asynchronous Programming (scala.concurrent) Future API VI Recover from Failure val fail = Future("hello") .filter(_ == "ciao") // the future fails since "hello" != "ciao" // but I can recover it with another future (composition) .recoverWith(_ => Future("ciao")) ) fail.foreach(println) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 19 / 58
Concurrency Asynchronous Programming (scala.concurrent) Future API VII Blocking & synchronization val blocks = 20 val latch = CountDownLatch(blocks) val all = (1 to blocks).map(i => Future { println(s"Turn $i") blocking { // NB! Does not work for thread pool latch.countDown() latch.await() } } ) Await.ready(Future.sequence(all), Duration.Inf) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 20 / 58
Concurrency Asynchronous Programming (scala.concurrent) Promise A Factory of Future A Promise can be thought of as a writable, single-assignment container, which completes a future Single assignment means that the Promise can be completed once (both with a value or with an exception) Use promises to bridge the gap between callback-based APIs and futures Use promises to extend futures with additional functional combinators Promise API: in a nutshell val promise = Promise[Int]() // Promise created.. promise.future.foreach(println) // "interface" to read the value promise.success(10) // Everything ok!! try promise.success(10) // Exception!! catch case ex: IllegalStateException => println("Already completed!!") Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 21 / 58
Concurrency Asynchronous Programming (scala.concurrent) Future Consideration about FP Futures are eager ú we cannot express lazy computation Future { println("Start!!") } // the computation starts on creation Futures are memoized ú we cannot restart a computation that brings to a certain value val result = Future { math.random } result.foreach { println(_) } result.foreach { pritnln(_) } // same result! Futures are not referential transparent ú we cannot build pure FP programs val random = Random(0) def future(random: Random): Future[Int] = Future { random.nextInt(10) } for a <- future(random); b <- future(random) yield (a + b) // !== val futureRandom = future(Random(0)) for a <- futureRandom; b <- futureRandom yield (a + b) Futures combine the execution and the computation Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 22 / 58
Concurrency Task (IO) Monad Contents 1 Introduction 2 Concurrency Low-level API Asynchronous Programming (scala.concurrent) Task (IO) Monad Observable (Functional Reactive Programming) 3 Frameworks Industry-ready Research-Oriented Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 23 / 58
Concurrency Task (IO) Monad Effects (in FP) An effect is a description of an action (or actions) that will be taken when evaluation happens In Category Theory term, each Monad has an effect Option is a monad that models the effect of optionality (of being something optional) Future is a monad that models the impact of latency IO (in Cats ® term, Task in Monix ®) is a monad that models the effect of computation Therefore, effects 6= side-effects when a piece of code contains (express with effect) a side-effect, that action just happens IO (and Task, we will use this name hereafter) can be executed asynchronously, in parallel, in different threads, ... NB! When side-effecting code is wrapped in one Task... ... the code itself still contains side-effects but we can reason about the whole thing as an effect, rather than as a side-effect // this has a side-effect, // but I can reason about this computation as Task[String] Task { Source.fromFile(...) } NB! IO is more general then Task, but today we consider these abstractions as the same Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 24 / 58
Concurrency Task (IO) Monad Task ® Definition Task represents a specification for a possibly lazy or asynchronous computation, which when executed will produce a data as a result, along with possible side effects. Does not trigger the execution, or any effects Task does not represent a running computation or a value detached from time (like Future) Allows for cancelling of a running computation Tasks can be memoized (i.e., more control about the execution) It models a producer pushing only one value to one or multiple consumers Tasks can express parallel computations It is referential transparent for most of the API, it contains unpure method marked with @UnsafeBecauseImpure they should be used in the “end-of-the-world” (i.e., the main method) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 25 / 58
Concurrency Task (IO) Monad Task API I Task Creation val now = Task.now { Math.random() } // it evaluates the value asap and wraps it inside the task //Task.apply =:= Task.eval // lazy.. the evaluation happens when the task is created val standard = Task(Math.random()) // Similar to a lazy val, the value is evaluated // once and then memoized (like Future) val once = Task.evalOnce { Math.random() } val future = Future(Math.random()) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 26 / 58
Concurrency Task (IO) Monad Task API II Task Manipulation val file = Task(Source.fromFile("build.sbt")) def linesFromSource(source: Source): Task[List[String]] = Task(source.getLines().toList).map(_.take(5)) // Sequence evaluation val result = for { build <- file lines <- linesFromSource(build) } yield lines.mkString("n") Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 27 / 58
Concurrency Task (IO) Monad Task API III Task Parallel Execution given Scheduler = monix.execution.Scheduler.Implicits.global // Combine tasks to express parallel evaluation // (it needs the Scheduler) for _ <- Task.parZip3(wait, wait, wait) do println("Parallel.") Task .sequence(wait :: wait :: wait :: Nil) .foreach(_ => println("sequential...")) Task .parSequence(wait :: wait :: wait :: Nil) .foreach(_ => println("parallel..")) Task Memoization // This data expresses a computation that produces a random value val random = Task.eval(Math.random()) // This data expresses a computation that produces the same random value val memo = random.memoize Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 28 / 58
Concurrency Task (IO) Monad Task API IV Task synchornization val semaphore = Semaphore[Task](1) // like a Lock // Just an example, indeed you should not use vars... // Consider shared resources (e.g. a file)... var shared = 0 def effect: Task[Unit] = Task { shared += 1 println(Thread.currentThread().getName) } val syncComputation = for { synch <- semaphore tasks = (1 to 1000).map(_ => synch.withPermit(effect)) par <- Task.parSequence(tasks) } yield par syncComputation.runSyncUnsafe() assert(shared == 1000) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 29 / 58
Concurrency Task (IO) Monad Task API V Task error management // Tasks can fail... val fail = Task.raiseError(new IllegalStateException("...")) // I can handle error like Future // (but with more flexibility, e.g., restarting the computation) val recovered = fail.onErrorHandle(_ => 10) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 30 / 58
Concurrency Task (IO) Monad Task API VI An Entire application expressed with Task enum State: case Continue, End val input = Task.eval(StdIn.readLine()) val hello = Task.evalOnce(println("Hello!! welcome to this beautiful game!!")) lazy val parse: Task[Int] = input .map(_.toInt) .onErrorHandleWith(_ => Task(println("Insert a number!!")) .flatMap(_ => parse) ) val toGuess = Task.evalOnce(Random.nextInt(10)) val toHigh = Task(println("The number is wrong!! (high)")).map(_ => State.Continue) val toLow = Task(println("The number is wrong!! (low)")).map(_ => State.Continue) val correct = Task(println("You won!!")).map(_ => State.End) // The main, a typical way to express IO in functional program, through flatmap operations.. val game = for { _ <- hello number <- toGuess user <- parse state <- if (user < number) toLow else if (user > number) toHigh else correct } yield state Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 31 / 58
Concurrency Observable (Functional Reactive Programming) Contents 1 Introduction 2 Concurrency Low-level API Asynchronous Programming (scala.concurrent) Task (IO) Monad Observable (Functional Reactive Programming) 3 Frameworks Industry-ready Research-Oriented Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 32 / 58
Concurrency Observable (Functional Reactive Programming) Observable ® Definition The Observable is a data type for modeling and processing asynchronous and reactive streaming of (possibly infinite) events with non-blocking back-pressure. At its simplest, an Observable is a replacement for Iterable or Scala LazyList, but with the ability to process asynchronous events without blocking Enables Functional Reactive Programming (FRP) Basically the Observer pattern on steroids async events management (decoupling of control flow) backpressure error management Expresses lazy computations Inspired by ReacitveX ® Observable vs. Task Single Multiple Synchronous A Iterable[A] Asynchronous Future[A] / Task[A] Observable[A] Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 33 / 58
Concurrency Observable (Functional Reactive Programming) Observable API I Observable Creation // print hey! because the value is evaluated asap val single = Observable.pure { println("hey!"); 10 } // hey is printed each time the observable is consumed val delay = Observable.delay { println("hey"); 10 } // hey is printed once, then the value is memoized val lazyObservable = Observable.evalOnce { println("hey"); 10 } val sideEffects = Observable.suspend { val effect = Source.fromFile("build.sbt") Observable.fromIterator(Task(effect.getLines())) } Thread.sleep(500) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 34 / 58
Concurrency Observable (Functional Reactive Programming) Observable API II Observable Composition (Sequential) // I can express infinite computation... val stepper = Observable.fromIterable(LazyList.continually(10)) .delayOnNext(500 milliseconds) val greeter = Observable.repeatEval("Hello!!").delayOnNext(100 milliseconds) val combined = for { // I can combine observable like list, option,.... number <- stepper greet <- greeter } yield (number, greet) combined.foreach(println(_)) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 35 / 58
Concurrency Observable (Functional Reactive Programming) Observable API III Observable Facade from other libraries val button = JButton("Click me!!") // Using create api val buttonObservable = Observable.create[Long](OverflowStrategy.Unbounded) { subject => button.addActionListener((e: ActionEvent) => subject.onNext(e.getWhen)) Cancelable.empty } // Or through subjects (i.e., tuple of observer and observable) val subject = ConcurrentSubject[Long](MulticastStrategy.replay) button.addActionListener((e: ActionEvent) => subject.onNext(e.getWhen)) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 36 / 58
Concurrency Observable (Functional Reactive Programming) Observable API IV Observables Backpressure val obs = Observable("A", "B", "C", "D", "E", "F", "G") val syncObs = obs // By default, the obs manipulation are sequential .mapEval(elem => Task { println(s"Processing (1) : $elem"); elem + elem }) // change the observable evaluation to another execution logic .asyncBoundary(OverflowStrategy.Unbounded) .mapEval(elem => Task { println(s"Processing (2) : $elem"); elem } .delayExecution(200 milliseconds)) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 37 / 58
Concurrency Observable (Functional Reactive Programming) Observable API V Observables Parallell (similar to par) val element = Observable.fromIterable((0 to 10)) val parallelMap = element.mapParallelOrdered(8)(elem => Task(elem * 5) .tapEval(id => Task(println(id + " " + Thread.currentThread().getName))) ) val parallelUnorderedMap = element.mapParallelUnordered(8)(elem => Task(elem * 5) .tapEval(id => Task(println(id + " " + Thread.currentThread().getName))) ) val mergeParallel = element.delayOnNext(100 milliseconds) .mergeMap(_ => Observable(1) .delayOnNext(100 milliseconds)) The Observable API is very Rich Scala Doc ® Documentation ® Code example repository (about buffering, error handling, rate emission) ® Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 38 / 58
Concurrency Observable (Functional Reactive Programming) FRP Application: A tentative design ® I The model is described using Algebraic Data Type and functional manipulation Rendering is wrapped on a Task (it expresses a side-effect) The events (e.g., input from View, time evolution) are expressed as Observable (i.e., event sources) The Model data evolves following the event Observable (creating an Observable of models) On top of Model Observable, the rendering Task is triggered (e.g. for each new Model data) Potential benefits clean model as an immutable data (the manipulation is typically performed with lens® ) model evolution expressed as a composition between Tasks simplifies the interaction between View and Model (i.e., the Controller) side effects happen only at the end of the world Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 39 / 58
Concurrency Observable (Functional Reactive Programming) FRP Application: A tentative design ® II UI (defines Input and Output boundary interface) // a general UI: trait UI[Event, Data] ==> it contains side effects!! //(but they are wrapper with monadic abstractions) // even more general: UI[Event, Data, F[_], R[_]] trait UI: // Event source produced by this UI // (outside of the "functional" world, unsafe, unpure) def events: Observable[Event] // Render the world representation. // It is an effect, therefore it returns a Task def render(world: World): Task[Unit] Model: set of data that expresses a snapshot of the "world" enum Entity(...): case Player(...), case class World(...) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 40 / 58
Concurrency Observable (Functional Reactive Programming) FRP Application: A tentative design ® III Update: namely how world should change // This should be expressed as a function, // (event, world) => Task[(Ager, Control)] // Or Controller[E, M, F[_]] => (E, M) => F[(M, Control[E, M, F])] trait Update extends ((Event, World) => Task[(World, Update)]): def andThen(control: Update): Update = ... // We cannot use type because it became a cyclic reference object Update: extension (function: (Event, World) => (World, Update)) def lift: Update = .. // Creation API def same(function: (Event, World) => World): Update = ... def on[E <: Event]( control: (E, World) => Task[World] )(using ev: ClassTag[E]): Update = ... def combineTwo(engineA: Update, engineB: Update): Update = ... def combine(engines: Update*): Update = ... Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 41 / 58
Concurrency Observable (Functional Reactive Programming) FRP Application: A tentative design ® IV Main Loop def start(ui: UI): Task[Unit] = val time: Observable[Event] = TimeFlow // time progression .tickEach(33 milliseconds).map(_.toDouble).map(Event.TimePassed.apply) val world = World.empty // Initial condition of the game val controls: Update = ... // set of controls that update the world state // Initial "loop" condition, update and world can evolve with events val init = Task((world, controls)) val events = Observable(time, ui.events.throttleLast(33 milliseconds)).merge events // "Main" loop // World evolve following the event .scanEval(init) { case ((world, logic), event) => logic(event, world) } // For each frame, a rendering is requested .doOnNext { case (world, _) => ui.render(world) } .completedL // Compute until the game is over Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 42 / 58
Concurrency Observable (Functional Reactive Programming) In Summary Use Task to express any potential computation Use Future to run a computation expressed through Task Use Observable & Subject to express computation with multiple values (e.g. event sources) In this way: your application becomes “reactive” side effects happen only at the “end-of-the-world” you can reason about effects, avoiding to care about side effects error handling cames for “free" your application works seamlessly in JVM & JS world Final remark “The IO monad does not make a function pure. It just makes it obvious that it’s impure” Martin Odersky Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 43 / 58
Frameworks Contents 1 Introduction 2 Concurrency Low-level API Asynchronous Programming (scala.concurrent) Task (IO) Monad Observable (Functional Reactive Programming) 3 Frameworks Industry-ready Research-Oriented Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 44 / 58
Frameworks Scala: Best choice to create complex & distributed Framework DSL hides complexity giving high-level API Contextual information (i.e. to use API) is passed through implicit/given giving the impression of “enrich” the language Scala’s strong type system enforces compile-time correctness Programmers focus on the intent (declarative approach) Future / Observable / Task example: intent: express possibly concurrent computations; DSL: monadic manipulation, how the data should be combined & transformed context: ExecutionContext, where the computation will be executed In the next slides: two relevant examples of industry-proven scala framework (Akka & Spark) two examples of reasearch oriented libraries & framework (ScaFi & ScalaLoci) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 45 / 58
Frameworks Industry-ready Contents 1 Introduction 2 Concurrency Low-level API Asynchronous Programming (scala.concurrent) Task (IO) Monad Observable (Functional Reactive Programming) 3 Frameworks Industry-ready Research-Oriented Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 46 / 58
Frameworks Industry-ready The Akka actor toolkit Akka is a toolkit for building highly concurrent, distributed, and resilient message-driven applications for Java and Scala It is the implementation of the Actor model on the JVM It extends the basic Actor model with more convenient & advanced features Akka supports and integrates into an ecosystem of tools for distributed systems Play (web framework), Spray (REST), Lagom (microservices), Apache Flink (stream/batch processing), Gatling (load-testing)... actors as a basic building block Production-proved: https://www.lightbend.com/case-studies Website: https://akka.io/ Docs: https://akka.io/docs/ Akka APIs and DSLs ® Akka provides APIs for developing actor-based systems with Java and Scala DSLs Akka Typed: new, type-safe API Akka Classic: original API (still fully supported) These may coexist in a single application Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 47 / 58
Frameworks Industry-ready Spark ® Apache Spark is a unified analytics engine for large-scale data processing. it provides high-level APIs in Java, Scala, Python Dataset as a primary abstraction that consists of a distributed collection of items The framework for big data management. Data processing executed on cluster it also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 48 / 58
Frameworks Industry-ready Spark – Example ® // Context lazy implicit val spark = SparkSession.builder() .master("local") .appName("spark_test") .getOrCreate() import spark.implicits._ // Required to call the .toDF function later val html = scala.io.Source .fromURL("http://files.grouplens.org/datasets/movielens/ml-100k/u.data") .mkString // Get all rows as one string val seqOfRecords = ... // Give whatever column names you want val df = seqOfRecords.toDF("userID", "movieID", "ratings", "timestamp") // Data manipulation (I do not care where this will be executre) df .select("ratings") // select the "ratings row" .groupBy("ratings") // group by on ratings .count // count the total number of row for each rating .sort(col("count").desc) // sort the ratings using the count .show() Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 49 / 58
Frameworks Research-Oriented Contents 1 Introduction 2 Concurrency Low-level API Asynchronous Programming (scala.concurrent) Task (IO) Monad Observable (Functional Reactive Programming) 3 Frameworks Industry-ready Research-Oriented Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 50 / 58
Frameworks Research-Oriented ScaFi (Scala Fields) ScaFi (Scala Fields) is a Scala-based library and framework for Aggregate Programming. A macro-programming approach to program large-scale distriubted system, formally grounded in Field Calculus developed by our research group (mainly by prof. Viroli & dott. Casadei, with contributions of several students ®) Goal: programming the collective behaviour of aggregates of devices Provides a DSL and API for writing, testing, and running aggregate programs ScaFi-Web: A web playground based on ScaFi ® ScaFI is a multi-module and cross platform scala project: scafi-commons | provides basic abstractions and utilities (e.g., spatial and temporal abstractions); scafi-core | represents the core of the project and provides an aggregate programming DSL (syntax, semantics, and a virtual machine for evaluation of programs), together with a “standard library” of reusable functions; scafi-simulator: provides a basic support for simulating aggregate systems; scafi-simulator-gui | provides a GUI for visualising and interacting with simulations of aggregate systems; spala (“spatial Scala”—i.e., a general Aggregate Computing platform | provides an actor-based aggregate computing middleware; scafi-distributed | ScaFi integration-layer for spala, which can be leveraged to set up actor-based deployments of ScaFi-programmed systems. Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 51 / 58
Frameworks Research-Oriented ScaFi (Scala Fields) – Architecture SCAFI-CORE AC PLATFORM SCAFI-TESTS AKKA-CORE AKKA-REMOTING SCAFI-SIMULATOR SCAFI-SIMULATOR-GUI SCAFI-STDLIB SCAFI-DISTRIBUTED SCAFI-COMMONS (space-time abstractions) DEMOS depends on Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 52 / 58
Frameworks Research-Oriented ScaFi (Scala Fields) – Usage example ® I Distributed sensing // Incarnation: the context of an aggregate program import it.unibo.scafi.incarnations.BasicSimulationIncarnation.* class DistributedTemperatureSensing extends AggregateProgram // Libraries, expressed as mix-in with BlockG with BlockC with BlockS with BlocksWithGC with StandardSensors { // "entry point", kind of "end-of-the-world" of effects override def main: Double = { // area size in which I want sense temperature val area = 50 // center of that area val leader = S(area, nbrRange) // used to send information to leader val potential = distanceTo(leader) // average temperature value val areaTemperature = average(leader, sense("temperature")) // share the temperature to devices broadcast(leader, areaTemperature) } } Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 53 / 58
Frameworks Research-Oriented ScaFi (Scala Fields) – Usage example ® II Self-healing channel class Channel extends AggregateProgram with StandardSensors with BlockG { override def main() = { // zone of the space that we want to link to target def source : Boolean = sense("source") // the target of the channel def target : Boolean = sense("target") def channel(source: Boolean, target: Boolean, width: Double): Boolean = { // All nodes that are inside the channel distanceTo(source) + distanceTo(target) <= distanceBetween(source, target) + width } val channelWidth = 1 channel(source, target, channelWidth) } } Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 54 / 58
Frameworks Research-Oriented ScaFi (Scala Fields) – Usage example ® III Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 55 / 58
Frameworks Research-Oriented ScalaLoci ScalaLoci is a distributed programming language embedded into Scala. based on typed-based multi-tier programming used to describe deployments & interaction among components simplifies developing distributed systems reduces error-prone communication code favors early detection of bugs Data are placed through distributed data flows, Support multiple software architectures (Client Server, P2P, ...) abstracting over low-level communication details and data conversions Based on Scala macro (<= 2.13) Support several communication protocl (TCP, WebSocket, ...) Multi-module & cross-platform Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 56 / 58
Frameworks Research-Oriented ScalaLoci – Example import loci._ // Contexts.. import loci.serializer.upickle._ // Serialization context import loci.communicator.tcp._ // Communication context import rescala.default._ // Observable (reactive stream) cotenxt @multitier object Chat { // a deployment definition @peer type Server <: { type Tie <: Multiple[Client] } // peers and connections @peer type Client <: { type Tie <: Single[Server] } // Message are placed on Deployments val message : Evt[String] on Client = on[Client] { Evt[String] } // To access to remote data, I must have a Tie val publicMessage = on[Server] sbj { client: Remote[Client] => message.asLocalFromAllSeq collect { case (remote, message) if remote == client => message } } def main() = on[Client] { // Main client logic publicMessage.asLocal observe println for (line <- scala.io.Source.stdin.getLines) message.fire(line) } } // Elsewhere... object Server extends App { multitier start new Instance[Chat.Server](listen[Chat.Client] { TCP(43053) }) } object Client extends App { multitier start new Instance[Chat.Client](connect[Chat.Server] { TCP("localhost", 43053) }) } Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 57 / 58
Frameworks Research-Oriented Scala(e) to the large Concurrent programming in Scala and relevant Frameworks Gianluca Aguzzi gianluca.aguzzi@unibo.it Dipartimento di Informatica – Scienza e Ingegneria (DISI) Alma Mater Studiorum – Università di Bologna Talk @ Paradigmi di Progettazione e Sviluppo 03/06/2022 Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 58 / 58
References References I [1] Alvin Alexander. Functional programming, simplified : simple, step-by-step approach to learning functional programming. Boulder, Colo: Alvin Alexander, 2017. isbn: 1979788782. [2] Stephen Blackheath. Functional reactive programming. Shelter Island, NY: Manning Publications Co, 2016. isbn: 9781633430105. [3] Cats Effect · The pure asynchronous runtime for Scala. https://typelevel.org/cats-effect/. (Accessed on 06/03/2022). [4] cats effect documentation - Cerca con Google. https: //www.google.com/search?q=cats+effect+documentation&oq=cats+effect+ documentation&aqs=chrome..69i57.3488j0j4&sourceid=chrome&ie=UTF-8. (Accessed on 06/03/2022). [5] Documentation | Akka. https://akka.io/docs/. (Accessed on 06/03/2022). [6] HaskellWiki. Functional programming — HaskellWiki. [Online; accessed 31-May-2021]. 2020. url: https://wiki.haskell.org/index.php?title= Functional_programming&oldid=63198. [7] Bartosz Milewski. Category theory for programmers. Bartosz Milewski, 2019. isbn: 9780464243878. Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022
References References II [8] Monix Documentation. https://monix.io/docs/current/. (Accessed on 06/03/2022). [9] Overview - Spark 3.2.1 Documentation. https://spark.apache.org/docs/latest/. (Accessed on 06/03/2022). [10] ScaFi Aggregate Programming Toolkit - Home. https://scafi.github.io/. (Accessed on 06/03/2022). [11] ScalaLoci – A Programming Language for Distributed Applications. https://scala-loci.github.io/. (Accessed on 06/03/2022). [12] Gurnell Welsh. Scala with Cats. Bartosz Milewski, 2017. Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022

Scala(e) to the large. Concurrent programming in Scala and relevant Frameworks

  • 1.
    Scala(e) to thelarge Concurrent programming in Scala and relevant Frameworks Gianluca Aguzzi gianluca.aguzzi@unibo.it Dipartimento di Informatica – Scienza e Ingegneria (DISI) Alma Mater Studiorum – Università di Bologna Talk @ Paradigmi di Progettazione e Sviluppo 03/06/2022 Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 1 / 58
  • 2.
    Introduction Contents 1 Introduction 2 Concurrency Low-levelAPI Asynchronous Programming (scala.concurrent) Task (IO) Monad Observable (Functional Reactive Programming) 3 Frameworks Industry-ready Research-Oriented Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 2 / 58
  • 3.
    Introduction Why Scala? Several languagesaddress concurrency by design (e.g. Erlang, Go, ...) Scala is used for several distributed & concurrent computing frameworks Both in research (ScalaLoci ®, ScaFi ®) and in industry (Akka ®, Spark ®, Flink ®) Reasons: 1 Scala syntactic flexibility: enable crafting of embedded domain-specific languages, with Scala serving as a host language, emulating several programming models (e.g., actor) 2 Scala is a safe language: static type safety reduces the amount of possible errors that developers have to care of 3 Interoperability: Scala programs can seamlessly use existing Java libraries, and interact with Java’s rich ecosystem (recently with JS and Native too) Today topics: 1 Recall on low-level concurrency API (JVM model) 2 Future & Promise as platform-independent concurrency mechanisms 3 Functional concurrency management: Monad effects (IO & Task) and (Functional) Reactive streams 4 An application example leveraging Effects & Functional Reactive Programming (FRP) Code repositories: Examples of concurrency in Scala ® FRP game ® “Monadic” GUI ® Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 3 / 58
  • 4.
    Concurrency Contents 1 Introduction 2 Concurrency Low-levelAPI Asynchronous Programming (scala.concurrent) Task (IO) Monad Observable (Functional Reactive Programming) 3 Frameworks Industry-ready Research-Oriented Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 4 / 58
  • 5.
    Concurrency Low-level API Contents 1Introduction 2 Concurrency Low-level API Asynchronous Programming (scala.concurrent) Task (IO) Monad Observable (Functional Reactive Programming) 3 Frameworks Industry-ready Research-Oriented Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 5 / 58
  • 6.
    Concurrency Low-level API JVMconcurrency API in Scala I The low-level Scala API is similar to the JVM counterpart Thread Independent computations occurring in the same process Creation & starting of concurrent computation val thread = Thread(() => println("Here") //OOP style class MyThread extends Thread: override def run: Unit = ... thread.start() // side effect, the computation starts Explicit synchornization through join val threadA = ... val threadB = ... threadA.start() threadA.join() // synchornization point, A andThen B threadB.start() Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 6 / 58
  • 7.
    Concurrency Low-level API JVMconcurrency API in Scala II Monitor Ensure exclusive access to resources Synchronization and communication among users Implemented through synchronized (not a keyword!!), wait, notify and notifyAll Or by using high-level API (java.util.concurrent) trait Counter: def tick(): Unit def value: Int object ThreadSafeCounter: def apply(): Counter = new Counter: private var count = 0 // encapsulated state // synchronized is not a keyword.. // so you cannot write synchronized def tick ... def tick(): Unit = this.synchronized(count += 1) def value = this.synchronized(count) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 7 / 58
  • 8.
    Concurrency Low-level API Considerations Typically,Threads (and low-level APIs) are not used directly in Scala Threads computations do not compose well (we will see what we mean with composition) . . . . . . and Scala loves composition (e.g., for-comprehension & monads) Threads are not declarative (“encapsulate” the effect of computations) scala.concurrent exposes several high-level patterns to handle concurrency & synchronization JVM API cannot be used in different platforms (i.e., Scala.js) Use these APIs when: you need to build very high-performant application you create an application that targets only the JVM ecosystem in all the other cases, you should use scala.concurrent API Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 8 / 58
  • 9.
    Concurrency Low-level API ExecutionContext I An ExecutionContext can execute program logic asynchronously, typically but not necessarily on a thread pool First layer of abstraction over Threads used for large number of small concurrent tasks with high throughput Similar to the Executors Java framework, but platform-independent (works bot from JVM, JS, Native) Bring to pass the execution context “implictily” (i.e., using given & using) def sayGreet()(using context: ExecutionContext): Unit = context.execute(() => println("run!")) @main def executeTask: Unit = val context = ExecutionContext.global // global execution context context.execute(() => println("Task done!")) val fromExecutors = ExecutionContext .fromExecutor(Executors.newSingleThreadExecutor()) fromExecutors.execute(() => println("Java wrapper")) //sayGreet() // error given ExecutionContext = fromExecutors // enrich the context sayGreet() Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 9 / 58
  • 10.
    Concurrency Low-level API ExecutionContext II Used as an “execution” context platform for high level concurrent abstraction (e.g., Future) We can built a “context”-oriented DSL for async computation: /* In Scala, a natural way to "enrich" the new "language" consists in using contexts + entrypoints. */ def async(any: => Unit)(using ExecutionContext): Unit = summon[ExecutionContext].execute(() => any) @main def tryDsl(): Unit = // express the context ==> enrich the language given ExecutionContext = ExecutionContext.global println("Do somethings") async { // I can use async like a language API println("order??") } println("After") Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 10 / 58
  • 11.
    Concurrency Low-level API Parallellcollections example: toward declative parallel computations Scala collections can be transformed into their parallel counterparts by calling the par method (as standard library for Scala < 2.13, an external dependecy for Scala >= 2.13 ®) It exists solely in order to improve the running time of the program Idea: I express a data pipeline computation and then the runtime optimizes it using several cores (0 until 100000).par.map(_.toString).filter(x => x == x.reverse) Can I reach the same declarative approach for a general concurrent program? Yes, with Future Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 11 / 58
  • 12.
    Concurrency Asynchronous Programming(scala.concurrent) Contents 1 Introduction 2 Concurrency Low-level API Asynchronous Programming (scala.concurrent) Task (IO) Monad Observable (Functional Reactive Programming) 3 Frameworks Industry-ready Research-Oriented Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 12 / 58
  • 13.
    Concurrency Asynchronous Programming(scala.concurrent) Future Definition “A Future represents a value which may or may not currently be available, but will be available at some point, or as an exception if that value could not be made available.” It is the idiomatic way to express concurrent computations You can handle the result of a Future with callback and transformation methods Future is a monad that models latency as an effect Futures are one-shot: the value contained by a future will be eventually available Futures are eager: the computation starts on Future creation Futures memoize the value: once the computation is over, the data will be always the same Examples: HTTP requests, File writing & reading, TCP connections, ... On notation A future value: trait Future[T] A future computation is an asynchronous computation that produces a future value: def something[T](...)(using ExecutionContext): Future[T] Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 13 / 58
  • 14.
    Concurrency Asynchronous Programming(scala.concurrent) Future API I Future Creation // To create a future computation, you have to enrich the context using ExecutionContext = scala.concurrent.ExecutionContext.global Future { 1 } // Future are eager, i.e., the computation starts on creation val fututers = (1 to 10).map(Future.apply) // create sequence of future // standard Monad sequence, applied to Future :) Future.sequential(futures) // Map F[Future[_]] in Future[F[_]] Future Callbacks Future(10) .onComplete { case Success(value) => println(s"Hurray! $value") case Failure(exception) => println("Oh no..") } Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 14 / 58
  • 15.
    Concurrency Asynchronous Programming(scala.concurrent) Future API II Future Manipulation // Concurrent program expressed as sequence of map val build = Future(Source.fromFile("build.sbt")) .map(source => source.getLines()) .map(lines => lines.mkString("n")) val scalafmt = Future(Source.fromFile(".scalafmt.conf")) .map(source => source.getLines()) .map(lines => lines.mkString("n")) // NB! scalafmt and build are concurrent here.. // Concurrent Program can be composed val combine = build.flatMap(data => scalafmt.map(other => data + other)) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 15 / 58
  • 16.
    Concurrency Asynchronous Programming(scala.concurrent) Future API III For Comprehension def extractLines(source: Source): String = source.getLines().mkString("n") def readFile(name: String): Future[Source] = Future { Source.fromFile(name) } val concurrentManipulation = for { buildSbt <- readFile("build.sbt") // synchornization point, scalafmt has to wait buildSbt future // NB! these two futures are sequential scalafmt <- readFile(".scalafmt.conf") // I can map the "lazy" data inside a Future "for-comprehension".. fileSbt = extractLines(buildSbt) fileFmt = extractLines(scalafmt) } yield (fileFmt + fileSbt) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 16 / 58
  • 17.
    Concurrency Asynchronous Programming(scala.concurrent) Future API IV Context management val myContext = ExecutionContext .fromExecutor(Executors.newSingleThreadExecutor()) def createFuture(using ExecutionContext) = Future { println(s"Hey! The thread is: ${Thread.currentThread().getName}") } /* I can pass explicitly the context (i.e., when I want a fine control for execution) */ createFuture(using global) given ExecutionContext = myContext // I can overwrite the execution context createFuture Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 17 / 58
  • 18.
    Concurrency Asynchronous Programming(scala.concurrent) Future API V Exception Management val failed = Future.fromTry(Failure(new Exception("fail"))) failed.onComplete { case Failure(exception) => println("error..") } // Special operator that waits for a Future.. Sync point (to avoid) Await.ready(failed, Duration.Inf) // creates a future that completes if the future fails val fail = Future(1).failed fail.foreach(println) Await.ready(fail, Duration.Inf) val empty = Future(1).filter(_ => false) // You can consume the future using foreach (like a Traversable) empty.failed.foreach(println) // failed Await.ready(empty, Duration.Inf) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 18 / 58
  • 19.
    Concurrency Asynchronous Programming(scala.concurrent) Future API VI Recover from Failure val fail = Future("hello") .filter(_ == "ciao") // the future fails since "hello" != "ciao" // but I can recover it with another future (composition) .recoverWith(_ => Future("ciao")) ) fail.foreach(println) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 19 / 58
  • 20.
    Concurrency Asynchronous Programming(scala.concurrent) Future API VII Blocking & synchronization val blocks = 20 val latch = CountDownLatch(blocks) val all = (1 to blocks).map(i => Future { println(s"Turn $i") blocking { // NB! Does not work for thread pool latch.countDown() latch.await() } } ) Await.ready(Future.sequence(all), Duration.Inf) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 20 / 58
  • 21.
    Concurrency Asynchronous Programming(scala.concurrent) Promise A Factory of Future A Promise can be thought of as a writable, single-assignment container, which completes a future Single assignment means that the Promise can be completed once (both with a value or with an exception) Use promises to bridge the gap between callback-based APIs and futures Use promises to extend futures with additional functional combinators Promise API: in a nutshell val promise = Promise[Int]() // Promise created.. promise.future.foreach(println) // "interface" to read the value promise.success(10) // Everything ok!! try promise.success(10) // Exception!! catch case ex: IllegalStateException => println("Already completed!!") Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 21 / 58
  • 22.
    Concurrency Asynchronous Programming(scala.concurrent) Future Consideration about FP Futures are eager ú we cannot express lazy computation Future { println("Start!!") } // the computation starts on creation Futures are memoized ú we cannot restart a computation that brings to a certain value val result = Future { math.random } result.foreach { println(_) } result.foreach { pritnln(_) } // same result! Futures are not referential transparent ú we cannot build pure FP programs val random = Random(0) def future(random: Random): Future[Int] = Future { random.nextInt(10) } for a <- future(random); b <- future(random) yield (a + b) // !== val futureRandom = future(Random(0)) for a <- futureRandom; b <- futureRandom yield (a + b) Futures combine the execution and the computation Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 22 / 58
  • 23.
    Concurrency Task (IO)Monad Contents 1 Introduction 2 Concurrency Low-level API Asynchronous Programming (scala.concurrent) Task (IO) Monad Observable (Functional Reactive Programming) 3 Frameworks Industry-ready Research-Oriented Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 23 / 58
  • 24.
    Concurrency Task (IO)Monad Effects (in FP) An effect is a description of an action (or actions) that will be taken when evaluation happens In Category Theory term, each Monad has an effect Option is a monad that models the effect of optionality (of being something optional) Future is a monad that models the impact of latency IO (in Cats ® term, Task in Monix ®) is a monad that models the effect of computation Therefore, effects 6= side-effects when a piece of code contains (express with effect) a side-effect, that action just happens IO (and Task, we will use this name hereafter) can be executed asynchronously, in parallel, in different threads, ... NB! When side-effecting code is wrapped in one Task... ... the code itself still contains side-effects but we can reason about the whole thing as an effect, rather than as a side-effect // this has a side-effect, // but I can reason about this computation as Task[String] Task { Source.fromFile(...) } NB! IO is more general then Task, but today we consider these abstractions as the same Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 24 / 58
  • 25.
    Concurrency Task (IO)Monad Task ® Definition Task represents a specification for a possibly lazy or asynchronous computation, which when executed will produce a data as a result, along with possible side effects. Does not trigger the execution, or any effects Task does not represent a running computation or a value detached from time (like Future) Allows for cancelling of a running computation Tasks can be memoized (i.e., more control about the execution) It models a producer pushing only one value to one or multiple consumers Tasks can express parallel computations It is referential transparent for most of the API, it contains unpure method marked with @UnsafeBecauseImpure they should be used in the “end-of-the-world” (i.e., the main method) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 25 / 58
  • 26.
    Concurrency Task (IO)Monad Task API I Task Creation val now = Task.now { Math.random() } // it evaluates the value asap and wraps it inside the task //Task.apply =:= Task.eval // lazy.. the evaluation happens when the task is created val standard = Task(Math.random()) // Similar to a lazy val, the value is evaluated // once and then memoized (like Future) val once = Task.evalOnce { Math.random() } val future = Future(Math.random()) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 26 / 58
  • 27.
    Concurrency Task (IO)Monad Task API II Task Manipulation val file = Task(Source.fromFile("build.sbt")) def linesFromSource(source: Source): Task[List[String]] = Task(source.getLines().toList).map(_.take(5)) // Sequence evaluation val result = for { build <- file lines <- linesFromSource(build) } yield lines.mkString("n") Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 27 / 58
  • 28.
    Concurrency Task (IO)Monad Task API III Task Parallel Execution given Scheduler = monix.execution.Scheduler.Implicits.global // Combine tasks to express parallel evaluation // (it needs the Scheduler) for _ <- Task.parZip3(wait, wait, wait) do println("Parallel.") Task .sequence(wait :: wait :: wait :: Nil) .foreach(_ => println("sequential...")) Task .parSequence(wait :: wait :: wait :: Nil) .foreach(_ => println("parallel..")) Task Memoization // This data expresses a computation that produces a random value val random = Task.eval(Math.random()) // This data expresses a computation that produces the same random value val memo = random.memoize Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 28 / 58
  • 29.
    Concurrency Task (IO)Monad Task API IV Task synchornization val semaphore = Semaphore[Task](1) // like a Lock // Just an example, indeed you should not use vars... // Consider shared resources (e.g. a file)... var shared = 0 def effect: Task[Unit] = Task { shared += 1 println(Thread.currentThread().getName) } val syncComputation = for { synch <- semaphore tasks = (1 to 1000).map(_ => synch.withPermit(effect)) par <- Task.parSequence(tasks) } yield par syncComputation.runSyncUnsafe() assert(shared == 1000) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 29 / 58
  • 30.
    Concurrency Task (IO)Monad Task API V Task error management // Tasks can fail... val fail = Task.raiseError(new IllegalStateException("...")) // I can handle error like Future // (but with more flexibility, e.g., restarting the computation) val recovered = fail.onErrorHandle(_ => 10) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 30 / 58
  • 31.
    Concurrency Task (IO)Monad Task API VI An Entire application expressed with Task enum State: case Continue, End val input = Task.eval(StdIn.readLine()) val hello = Task.evalOnce(println("Hello!! welcome to this beautiful game!!")) lazy val parse: Task[Int] = input .map(_.toInt) .onErrorHandleWith(_ => Task(println("Insert a number!!")) .flatMap(_ => parse) ) val toGuess = Task.evalOnce(Random.nextInt(10)) val toHigh = Task(println("The number is wrong!! (high)")).map(_ => State.Continue) val toLow = Task(println("The number is wrong!! (low)")).map(_ => State.Continue) val correct = Task(println("You won!!")).map(_ => State.End) // The main, a typical way to express IO in functional program, through flatmap operations.. val game = for { _ <- hello number <- toGuess user <- parse state <- if (user < number) toLow else if (user > number) toHigh else correct } yield state Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 31 / 58
  • 32.
    Concurrency Observable (FunctionalReactive Programming) Contents 1 Introduction 2 Concurrency Low-level API Asynchronous Programming (scala.concurrent) Task (IO) Monad Observable (Functional Reactive Programming) 3 Frameworks Industry-ready Research-Oriented Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 32 / 58
  • 33.
    Concurrency Observable (FunctionalReactive Programming) Observable ® Definition The Observable is a data type for modeling and processing asynchronous and reactive streaming of (possibly infinite) events with non-blocking back-pressure. At its simplest, an Observable is a replacement for Iterable or Scala LazyList, but with the ability to process asynchronous events without blocking Enables Functional Reactive Programming (FRP) Basically the Observer pattern on steroids async events management (decoupling of control flow) backpressure error management Expresses lazy computations Inspired by ReacitveX ® Observable vs. Task Single Multiple Synchronous A Iterable[A] Asynchronous Future[A] / Task[A] Observable[A] Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 33 / 58
  • 34.
    Concurrency Observable (FunctionalReactive Programming) Observable API I Observable Creation // print hey! because the value is evaluated asap val single = Observable.pure { println("hey!"); 10 } // hey is printed each time the observable is consumed val delay = Observable.delay { println("hey"); 10 } // hey is printed once, then the value is memoized val lazyObservable = Observable.evalOnce { println("hey"); 10 } val sideEffects = Observable.suspend { val effect = Source.fromFile("build.sbt") Observable.fromIterator(Task(effect.getLines())) } Thread.sleep(500) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 34 / 58
  • 35.
    Concurrency Observable (FunctionalReactive Programming) Observable API II Observable Composition (Sequential) // I can express infinite computation... val stepper = Observable.fromIterable(LazyList.continually(10)) .delayOnNext(500 milliseconds) val greeter = Observable.repeatEval("Hello!!").delayOnNext(100 milliseconds) val combined = for { // I can combine observable like list, option,.... number <- stepper greet <- greeter } yield (number, greet) combined.foreach(println(_)) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 35 / 58
  • 36.
    Concurrency Observable (FunctionalReactive Programming) Observable API III Observable Facade from other libraries val button = JButton("Click me!!") // Using create api val buttonObservable = Observable.create[Long](OverflowStrategy.Unbounded) { subject => button.addActionListener((e: ActionEvent) => subject.onNext(e.getWhen)) Cancelable.empty } // Or through subjects (i.e., tuple of observer and observable) val subject = ConcurrentSubject[Long](MulticastStrategy.replay) button.addActionListener((e: ActionEvent) => subject.onNext(e.getWhen)) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 36 / 58
  • 37.
    Concurrency Observable (FunctionalReactive Programming) Observable API IV Observables Backpressure val obs = Observable("A", "B", "C", "D", "E", "F", "G") val syncObs = obs // By default, the obs manipulation are sequential .mapEval(elem => Task { println(s"Processing (1) : $elem"); elem + elem }) // change the observable evaluation to another execution logic .asyncBoundary(OverflowStrategy.Unbounded) .mapEval(elem => Task { println(s"Processing (2) : $elem"); elem } .delayExecution(200 milliseconds)) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 37 / 58
  • 38.
    Concurrency Observable (FunctionalReactive Programming) Observable API V Observables Parallell (similar to par) val element = Observable.fromIterable((0 to 10)) val parallelMap = element.mapParallelOrdered(8)(elem => Task(elem * 5) .tapEval(id => Task(println(id + " " + Thread.currentThread().getName))) ) val parallelUnorderedMap = element.mapParallelUnordered(8)(elem => Task(elem * 5) .tapEval(id => Task(println(id + " " + Thread.currentThread().getName))) ) val mergeParallel = element.delayOnNext(100 milliseconds) .mergeMap(_ => Observable(1) .delayOnNext(100 milliseconds)) The Observable API is very Rich Scala Doc ® Documentation ® Code example repository (about buffering, error handling, rate emission) ® Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 38 / 58
  • 39.
    Concurrency Observable (FunctionalReactive Programming) FRP Application: A tentative design ® I The model is described using Algebraic Data Type and functional manipulation Rendering is wrapped on a Task (it expresses a side-effect) The events (e.g., input from View, time evolution) are expressed as Observable (i.e., event sources) The Model data evolves following the event Observable (creating an Observable of models) On top of Model Observable, the rendering Task is triggered (e.g. for each new Model data) Potential benefits clean model as an immutable data (the manipulation is typically performed with lens® ) model evolution expressed as a composition between Tasks simplifies the interaction between View and Model (i.e., the Controller) side effects happen only at the end of the world Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 39 / 58
  • 40.
    Concurrency Observable (FunctionalReactive Programming) FRP Application: A tentative design ® II UI (defines Input and Output boundary interface) // a general UI: trait UI[Event, Data] ==> it contains side effects!! //(but they are wrapper with monadic abstractions) // even more general: UI[Event, Data, F[_], R[_]] trait UI: // Event source produced by this UI // (outside of the "functional" world, unsafe, unpure) def events: Observable[Event] // Render the world representation. // It is an effect, therefore it returns a Task def render(world: World): Task[Unit] Model: set of data that expresses a snapshot of the "world" enum Entity(...): case Player(...), case class World(...) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 40 / 58
  • 41.
    Concurrency Observable (FunctionalReactive Programming) FRP Application: A tentative design ® III Update: namely how world should change // This should be expressed as a function, // (event, world) => Task[(Ager, Control)] // Or Controller[E, M, F[_]] => (E, M) => F[(M, Control[E, M, F])] trait Update extends ((Event, World) => Task[(World, Update)]): def andThen(control: Update): Update = ... // We cannot use type because it became a cyclic reference object Update: extension (function: (Event, World) => (World, Update)) def lift: Update = .. // Creation API def same(function: (Event, World) => World): Update = ... def on[E <: Event]( control: (E, World) => Task[World] )(using ev: ClassTag[E]): Update = ... def combineTwo(engineA: Update, engineB: Update): Update = ... def combine(engines: Update*): Update = ... Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 41 / 58
  • 42.
    Concurrency Observable (FunctionalReactive Programming) FRP Application: A tentative design ® IV Main Loop def start(ui: UI): Task[Unit] = val time: Observable[Event] = TimeFlow // time progression .tickEach(33 milliseconds).map(_.toDouble).map(Event.TimePassed.apply) val world = World.empty // Initial condition of the game val controls: Update = ... // set of controls that update the world state // Initial "loop" condition, update and world can evolve with events val init = Task((world, controls)) val events = Observable(time, ui.events.throttleLast(33 milliseconds)).merge events // "Main" loop // World evolve following the event .scanEval(init) { case ((world, logic), event) => logic(event, world) } // For each frame, a rendering is requested .doOnNext { case (world, _) => ui.render(world) } .completedL // Compute until the game is over Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 42 / 58
  • 43.
    Concurrency Observable (FunctionalReactive Programming) In Summary Use Task to express any potential computation Use Future to run a computation expressed through Task Use Observable & Subject to express computation with multiple values (e.g. event sources) In this way: your application becomes “reactive” side effects happen only at the “end-of-the-world” you can reason about effects, avoiding to care about side effects error handling cames for “free" your application works seamlessly in JVM & JS world Final remark “The IO monad does not make a function pure. It just makes it obvious that it’s impure” Martin Odersky Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 43 / 58
  • 44.
    Frameworks Contents 1 Introduction 2 Concurrency Low-levelAPI Asynchronous Programming (scala.concurrent) Task (IO) Monad Observable (Functional Reactive Programming) 3 Frameworks Industry-ready Research-Oriented Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 44 / 58
  • 45.
    Frameworks Scala: Best choiceto create complex & distributed Framework DSL hides complexity giving high-level API Contextual information (i.e. to use API) is passed through implicit/given giving the impression of “enrich” the language Scala’s strong type system enforces compile-time correctness Programmers focus on the intent (declarative approach) Future / Observable / Task example: intent: express possibly concurrent computations; DSL: monadic manipulation, how the data should be combined & transformed context: ExecutionContext, where the computation will be executed In the next slides: two relevant examples of industry-proven scala framework (Akka & Spark) two examples of reasearch oriented libraries & framework (ScaFi & ScalaLoci) Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 45 / 58
  • 46.
    Frameworks Industry-ready Contents 1 Introduction 2Concurrency Low-level API Asynchronous Programming (scala.concurrent) Task (IO) Monad Observable (Functional Reactive Programming) 3 Frameworks Industry-ready Research-Oriented Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 46 / 58
  • 47.
    Frameworks Industry-ready The Akkaactor toolkit Akka is a toolkit for building highly concurrent, distributed, and resilient message-driven applications for Java and Scala It is the implementation of the Actor model on the JVM It extends the basic Actor model with more convenient & advanced features Akka supports and integrates into an ecosystem of tools for distributed systems Play (web framework), Spray (REST), Lagom (microservices), Apache Flink (stream/batch processing), Gatling (load-testing)... actors as a basic building block Production-proved: https://www.lightbend.com/case-studies Website: https://akka.io/ Docs: https://akka.io/docs/ Akka APIs and DSLs ® Akka provides APIs for developing actor-based systems with Java and Scala DSLs Akka Typed: new, type-safe API Akka Classic: original API (still fully supported) These may coexist in a single application Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 47 / 58
  • 48.
    Frameworks Industry-ready Spark ® ApacheSpark is a unified analytics engine for large-scale data processing. it provides high-level APIs in Java, Scala, Python Dataset as a primary abstraction that consists of a distributed collection of items The framework for big data management. Data processing executed on cluster it also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 48 / 58
  • 49.
    Frameworks Industry-ready Spark –Example ® // Context lazy implicit val spark = SparkSession.builder() .master("local") .appName("spark_test") .getOrCreate() import spark.implicits._ // Required to call the .toDF function later val html = scala.io.Source .fromURL("http://files.grouplens.org/datasets/movielens/ml-100k/u.data") .mkString // Get all rows as one string val seqOfRecords = ... // Give whatever column names you want val df = seqOfRecords.toDF("userID", "movieID", "ratings", "timestamp") // Data manipulation (I do not care where this will be executre) df .select("ratings") // select the "ratings row" .groupBy("ratings") // group by on ratings .count // count the total number of row for each rating .sort(col("count").desc) // sort the ratings using the count .show() Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 49 / 58
  • 50.
    Frameworks Research-Oriented Contents 1 Introduction 2Concurrency Low-level API Asynchronous Programming (scala.concurrent) Task (IO) Monad Observable (Functional Reactive Programming) 3 Frameworks Industry-ready Research-Oriented Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 50 / 58
  • 51.
    Frameworks Research-Oriented ScaFi (ScalaFields) ScaFi (Scala Fields) is a Scala-based library and framework for Aggregate Programming. A macro-programming approach to program large-scale distriubted system, formally grounded in Field Calculus developed by our research group (mainly by prof. Viroli & dott. Casadei, with contributions of several students ®) Goal: programming the collective behaviour of aggregates of devices Provides a DSL and API for writing, testing, and running aggregate programs ScaFi-Web: A web playground based on ScaFi ® ScaFI is a multi-module and cross platform scala project: scafi-commons | provides basic abstractions and utilities (e.g., spatial and temporal abstractions); scafi-core | represents the core of the project and provides an aggregate programming DSL (syntax, semantics, and a virtual machine for evaluation of programs), together with a “standard library” of reusable functions; scafi-simulator: provides a basic support for simulating aggregate systems; scafi-simulator-gui | provides a GUI for visualising and interacting with simulations of aggregate systems; spala (“spatial Scala”—i.e., a general Aggregate Computing platform | provides an actor-based aggregate computing middleware; scafi-distributed | ScaFi integration-layer for spala, which can be leveraged to set up actor-based deployments of ScaFi-programmed systems. Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 51 / 58
  • 52.
    Frameworks Research-Oriented ScaFi (ScalaFields) – Architecture SCAFI-CORE AC PLATFORM SCAFI-TESTS AKKA-CORE AKKA-REMOTING SCAFI-SIMULATOR SCAFI-SIMULATOR-GUI SCAFI-STDLIB SCAFI-DISTRIBUTED SCAFI-COMMONS (space-time abstractions) DEMOS depends on Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 52 / 58
  • 53.
    Frameworks Research-Oriented ScaFi (ScalaFields) – Usage example ® I Distributed sensing // Incarnation: the context of an aggregate program import it.unibo.scafi.incarnations.BasicSimulationIncarnation.* class DistributedTemperatureSensing extends AggregateProgram // Libraries, expressed as mix-in with BlockG with BlockC with BlockS with BlocksWithGC with StandardSensors { // "entry point", kind of "end-of-the-world" of effects override def main: Double = { // area size in which I want sense temperature val area = 50 // center of that area val leader = S(area, nbrRange) // used to send information to leader val potential = distanceTo(leader) // average temperature value val areaTemperature = average(leader, sense("temperature")) // share the temperature to devices broadcast(leader, areaTemperature) } } Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 53 / 58
  • 54.
    Frameworks Research-Oriented ScaFi (ScalaFields) – Usage example ® II Self-healing channel class Channel extends AggregateProgram with StandardSensors with BlockG { override def main() = { // zone of the space that we want to link to target def source : Boolean = sense("source") // the target of the channel def target : Boolean = sense("target") def channel(source: Boolean, target: Boolean, width: Double): Boolean = { // All nodes that are inside the channel distanceTo(source) + distanceTo(target) <= distanceBetween(source, target) + width } val channelWidth = 1 channel(source, target, channelWidth) } } Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 54 / 58
  • 55.
    Frameworks Research-Oriented ScaFi (ScalaFields) – Usage example ® III Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 55 / 58
  • 56.
    Frameworks Research-Oriented ScalaLoci ScalaLoci isa distributed programming language embedded into Scala. based on typed-based multi-tier programming used to describe deployments & interaction among components simplifies developing distributed systems reduces error-prone communication code favors early detection of bugs Data are placed through distributed data flows, Support multiple software architectures (Client Server, P2P, ...) abstracting over low-level communication details and data conversions Based on Scala macro (<= 2.13) Support several communication protocl (TCP, WebSocket, ...) Multi-module & cross-platform Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 56 / 58
  • 57.
    Frameworks Research-Oriented ScalaLoci –Example import loci._ // Contexts.. import loci.serializer.upickle._ // Serialization context import loci.communicator.tcp._ // Communication context import rescala.default._ // Observable (reactive stream) cotenxt @multitier object Chat { // a deployment definition @peer type Server <: { type Tie <: Multiple[Client] } // peers and connections @peer type Client <: { type Tie <: Single[Server] } // Message are placed on Deployments val message : Evt[String] on Client = on[Client] { Evt[String] } // To access to remote data, I must have a Tie val publicMessage = on[Server] sbj { client: Remote[Client] => message.asLocalFromAllSeq collect { case (remote, message) if remote == client => message } } def main() = on[Client] { // Main client logic publicMessage.asLocal observe println for (line <- scala.io.Source.stdin.getLines) message.fire(line) } } // Elsewhere... object Server extends App { multitier start new Instance[Chat.Server](listen[Chat.Client] { TCP(43053) }) } object Client extends App { multitier start new Instance[Chat.Client](connect[Chat.Server] { TCP("localhost", 43053) }) } Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 57 / 58
  • 58.
    Frameworks Research-Oriented Scala(e) tothe large Concurrent programming in Scala and relevant Frameworks Gianluca Aguzzi gianluca.aguzzi@unibo.it Dipartimento di Informatica – Scienza e Ingegneria (DISI) Alma Mater Studiorum – Università di Bologna Talk @ Paradigmi di Progettazione e Sviluppo 03/06/2022 Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022 58 / 58
  • 59.
    References References I [1] AlvinAlexander. Functional programming, simplified : simple, step-by-step approach to learning functional programming. Boulder, Colo: Alvin Alexander, 2017. isbn: 1979788782. [2] Stephen Blackheath. Functional reactive programming. Shelter Island, NY: Manning Publications Co, 2016. isbn: 9781633430105. [3] Cats Effect · The pure asynchronous runtime for Scala. https://typelevel.org/cats-effect/. (Accessed on 06/03/2022). [4] cats effect documentation - Cerca con Google. https: //www.google.com/search?q=cats+effect+documentation&oq=cats+effect+ documentation&aqs=chrome..69i57.3488j0j4&sourceid=chrome&ie=UTF-8. (Accessed on 06/03/2022). [5] Documentation | Akka. https://akka.io/docs/. (Accessed on 06/03/2022). [6] HaskellWiki. Functional programming — HaskellWiki. [Online; accessed 31-May-2021]. 2020. url: https://wiki.haskell.org/index.php?title= Functional_programming&oldid=63198. [7] Bartosz Milewski. Category theory for programmers. Bartosz Milewski, 2019. isbn: 9780464243878. Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022
  • 60.
    References References II [8] MonixDocumentation. https://monix.io/docs/current/. (Accessed on 06/03/2022). [9] Overview - Spark 3.2.1 Documentation. https://spark.apache.org/docs/latest/. (Accessed on 06/03/2022). [10] ScaFi Aggregate Programming Toolkit - Home. https://scafi.github.io/. (Accessed on 06/03/2022). [11] ScalaLoci – A Programming Language for Distributed Applications. https://scala-loci.github.io/. (Accessed on 06/03/2022). [12] Gurnell Welsh. Scala with Cats. Bartosz Milewski, 2017. Aguzzi (DISI, Univ. Bologna) Scala(e) to the large 03/06/2022