The library wraps Java APIs in Scala thereby providing:
- much better type inference in Scala
- less boilerplate in application code
- the usual builder-style composition that developers get with the original Java API
The design of the library was inspired by the work started by Alexis Seigneurin in this repository.
kafka-streams-scala is published and cross-built for Scala 2.11, and 2.12, so you can just add the following to your build:
val kafka_streams_scala_version = "0.1.2" libraryDependencies ++= Seq("com.lightbend" %% "kafka-streams-scala" % kafka_streams_scala_version)Note:
kafka-streams-scalasupports onwards Kafka Streams1.0.0.
The API docs for kafka-streams-scala is available here for Scala 2.12 and here for Scala 2.11.
The library comes with an embedded Kafka server. To run the tests, simply run sbt testOnly and all tests will run on the local embedded server.
The embedded server is started and stopped for every test and takes quite a bit of resources. Hence it's recommended that you allocate more heap space to
sbtwhen running the tests. e.g.sbt -mem 1500.
$ sbt -mem 1500 > +clean > +testHere's a sample code fragment using the Scala wrapper library. Compare this with the Scala code from the same example in Confluent's repository.
// Compute the total per region by summing the individual click counts per region. val clicksPerRegion: KTableS[String, Long] = userClicksStream // Join the stream against the table. .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks)) // Change the stream from <user> -> <region, clicks> to <region> -> <clicks> .map((_, regionWithClicks) => regionWithClicks) // Compute the total per region by summing the individual click counts per region. .groupByKey(Serialized.`with`(stringSerde, longSerde)) .reduce(_ + _)Notes:
- The left quotes around "with" are there because
withis a Scala keyword. This is the mechanism you use to "escape" a Scala keyword when it's used as a normal identifier in a Java library.- Note that some methods, like
map, take a two-argument function, for key-value pairs, rather than the more typical single argument.
The wrapped Scala APIs also incur less boilerplate by taking advantage of Scala function literals that get converted to Java objects in the implementation of the API. Hence the surface syntax of the client API looks simpler and less noisy.
Here's an example of a snippet built using the Java API from Scala ..
val approximateWordCounts: KStream[String, Long] = textLines .flatMapValues(value => value.toLowerCase.split("\\W+").toIterable.asJava) .transform( new TransformerSupplier[Array[Byte], String, KeyValue[String, Long]] { override def get() = new ProbabilisticCounter }, cmsStoreName) approximateWordCounts.to(outputTopic, Produced.`with`(Serdes.String(), longSerde))And here's the corresponding snippet using the Scala library. Note how the noise of TransformerSupplier has been abstracted out by the function literal syntax of Scala.
textLines .flatMapValues(value => value.toLowerCase.split("\\W+").toIterable) .transform(() => new ProbabilisticCounter, cmsStoreName) .to(outputTopic, Produced.`with`(Serdes.String(), longSerde))Also, the explicit conversion asJava from a Scala Iterable to a Java Iterable is done for you by the Scala library.
One of the areas where the Java APIs' verbosity can be reduced is through a succinct way to pass serializers and de-serializers to the various functions. The library implementation offers type safe implicit serdes to provide the serializers and de-serializers. In doing so, the Scala library does not use configuration based default serdes which is not type safe and prone to runtime errors.
The implementation allows implicits for the Serdes or for Serialized, Consumed, Produced and Joined. The test examples demonstrate both, though the implicits for Serdes make a cleaner implementation.
The library offers a module that contains all the default serdes for the primitives. Importing the object will bring in scope all such primitives and helps reduce implicit hell.
object DefaultSerdes { implicit val stringSerde: Serde[String] = Serdes.String() implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]] implicit val byteArraySerde: Serde[Array[Byte]] = Serdes.ByteArray() implicit val bytesSerde: Serde[org.apache.kafka.common.utils.Bytes] = Serdes.Bytes() implicit val floatSerde: Serde[Float] = Serdes.Float().asInstanceOf[Serde[Float]] implicit val doubleSerde: Serde[Double] = Serdes.Double().asInstanceOf[Serde[Double]] implicit val integerSerde: Serde[Int] = Serdes.Integer().asInstanceOf[Serde[Int]] }Not only the serdes, but DefaultSerdes also brings into scope implicit Serialized, Produced, Consumed and Joined instances. So all APIs that accept Serialized, Produced, Consumed or Joined will get these instances automatically with an import DefaultSerdes._.
Just one import of DefaultSerdes._ and the following code does not need a bit of Serialized, Produced, Consumed or Joined to be specified explicitly or through the default config. And the best part is that for any missing instances of these you get a compilation error. ..
val clicksPerRegion: KTableS[String, Long] = userClicksStream // Join the stream against the table. .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks)) // Change the stream from <user> -> <region, clicks> to <region> -> <clicks> .map((_, regionWithClicks) => regionWithClicks) // Compute the total per region by summing the individual click counts per region. .groupByKey .reduce(_ + _) // Write the (continuously updating) results to the output topic. clicksPerRegion.toStream.to(outputTopic)