Skip to content
This repository was archived by the owner on May 25, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
package com.lightbend.kafka.scala.streams

import org.apache.kafka.streams.kstream._
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.{ KeyValue, Consumed }
import org.apache.kafka.common.serialization.Serde

import scala.language.implicitConversions

Expand Down Expand Up @@ -35,5 +36,28 @@ object ImplicitConversions {

implicit def Tuple2ToKeyValue[K, V](tuple: (K, V)): KeyValue[K, V] = new KeyValue(tuple._1, tuple._2)

}
// technique for optional implicits adopted from
// http://missingfaktor.blogspot.in/2013/12/optional-implicit-trick-in-scala.html
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome find! 👍


case class Perhaps[E](value: Option[E]) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably should be made final and extend AnyVal to reduce memory allocation and improve inlining.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah true, but in the latest release we have got rid of this abstraction altogether.

def fold[F](ifAbsent: => F)(ifPresent: E => F): F = {
value.fold(ifAbsent)(ifPresent)
}
}

implicit def perhaps[E](implicit ev: E = null): Perhaps[E] = {
Perhaps(Option(ev))
}

// we would also like to allow users implicit serdes
// and these implicits will convert them to `Serialized`, `Produced` or `Consumed`

implicit def SerializedFromSerde[K,V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Serialized[K,V] =
Serialized.`with`(keySerde, valueSerde)

implicit def ConsumedFromSerde[K,V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Consumed[K,V] =
Consumed.`with`(keySerde, valueSerde)

implicit def ProducedFromSerde[K,V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Produced[K,V] =
Produced.`with`(keySerde, valueSerde)
}
49 changes: 29 additions & 20 deletions src/main/scala/com/lightbend/kafka/scala/streams/KStreamS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,11 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
inner.branch(predicates.map(_.asPredicate): _*).map(kstream => wrapKStream(kstream))
}

def through(topic: String): KStreamS[K, V] = inner.through(topic)
def through(topic: String)(implicit produced: Perhaps[Produced[K, V]]): KStreamS[K, V] =
produced.fold[KStreamS[K, V]] { inner.through(topic) } { ev => inner.through(topic, ev) }

def through(topic: String,
produced: Produced[K, V]): KStreamS[K, V] = inner.through(topic, produced)

def to(topic: String): Unit = inner.to(topic)

def to(topic: String,
produced: Produced[K, V]): Unit = inner.to(topic, produced)
def to(topic: String)(implicit produced: Perhaps[Produced[K, V]]): Unit =
produced.fold[Unit] { inner.to(topic) } { implicit ev => inner.to(topic, ev) }

def transform[K1, V1](transformerSupplier: () => Transformer[K, V, (K1, V1)],
stateStoreNames: String*): KStreamS[K1, V1] = {
Expand Down Expand Up @@ -110,18 +106,31 @@ class KStreamS[K, V](val inner: KStream[K, V]) {
inner.process(processorSupplierJ, stateStoreNames: _*)
}

def groupByKey(): KGroupedStreamS[K, V] =
inner.groupByKey()

def groupByKey(serialized: Serialized[K, V]): KGroupedStreamS[K, V] =
inner.groupByKey(serialized)

def groupBy[KR](selector: (K, V) => KR): KGroupedStreamS[KR, V] = {
inner.groupBy(selector.asKeyValueMapper)
}

def groupBy[KR](selector: (K, V) => KR, serialized: Serialized[KR, V]): KGroupedStreamS[KR, V] = {
inner.groupBy(selector.asKeyValueMapper, serialized)
/**

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I saw this, but do the contests explicitly confirm all three usage patterns. Specifically, I'm not sure I saw that #1 was being tested.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct! I added that test ..

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great!

* If `Serialized[K, V]` is found in the implicit scope, then use it, else
* use the API with the default serializers.
*
* Usage Pattern 1: No implicits in scope, use default serializers
* - .groupByKey
*
* Usage Pattern 2: Use implicit `Serialized` in scope
* implicit val serialized = Serialized.`with`(stringSerde, longSerde)
* - .groupByKey
*
* Usage Pattern 3: uses the implicit conversion from the serdes to `Serialized`
* implicit val stringSerde: Serde[String] = Serdes.String()
* implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
* - .groupByKey
*/
def groupByKey(implicit serialized: Perhaps[Serialized[K, V]]): KGroupedStreamS[K, V] =
serialized.fold[KGroupedStreamS[K, V]] { inner.groupByKey } { implicit ev => inner.groupByKey(ev) }

def groupBy[KR](selector: (K, V) => KR)(implicit serialized: Perhaps[Serialized[KR, V]]): KGroupedStreamS[KR, V] = {
serialized.fold[KGroupedStreamS[KR, V]] {
inner.groupBy(selector.asKeyValueMapper)
} { implicit ev =>
inner.groupBy(selector.asKeyValueMapper, ev)
}
}

def join[VO, VR](otherStream: KStreamS[K, VO],
Expand Down
14 changes: 6 additions & 8 deletions src/main/scala/com/lightbend/kafka/scala/streams/KTableS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,12 @@ class KTableS[K, V](val inner: KTable[K, V]) {
inner.toStream[KR](mapper.asKeyValueMapper)
}

def groupBy[KR, VR](selector: (K, V) => (KR, VR)): KGroupedTableS[KR, VR] = {
inner.groupBy(selector.asKeyValueMapper)
}

def groupBy[KR, VR](selector: (K, V) => (KR, VR),
serialized: Serialized[KR, VR]): KGroupedTableS[KR, VR] = {

inner.groupBy(selector.asKeyValueMapper, serialized)
def groupBy[KR, VR](selector: (K, V) => (KR, VR))(implicit serialized: Perhaps[Serialized[KR, VR]]): KGroupedTableS[KR, VR] = {
serialized.fold[KGroupedTableS[KR, VR]] {
inner.groupBy(selector.asKeyValueMapper)
} { implicit ev =>
inner.groupBy(selector.asKeyValueMapper, ev)
}
}

def join[VO, VR](other: KTableS[K, VO],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,55 +20,33 @@ import scala.collection.JavaConverters._
*/
class StreamsBuilderS(inner: StreamsBuilder = new StreamsBuilder) {

def stream[K, V](topic: String): KStreamS[K, V] =
inner.stream[K, V](topic)
def stream[K, V](topic: String)(implicit consumed: Perhaps[Consumed[K, V]]): KStreamS[K, V] =
consumed.fold[KStreamS[K, V]] { inner.stream[K, V](topic) } { implicit ev => inner.stream[K, V](topic, ev) }

def stream[K, V](topic: String, consumed: Consumed[K, V]): KStreamS[K, V] =
inner.stream[K, V](topic, consumed)
def stream[K, V](topics: List[String])(implicit consumed: Perhaps[Consumed[K, V]]): KStreamS[K, V] =
consumed.fold[KStreamS[K, V]] { inner.stream[K, V](topics.asJava) } { implicit ev => inner.stream[K, V](topics.asJava, ev) }

def stream[K, V](topics: List[String]): KStreamS[K, V] =
inner.stream[K, V](topics.asJava)
def stream[K, V](topicPattern: Pattern)(implicit consumed: Perhaps[Consumed[K, V]]): KStreamS[K, V] =
consumed.fold[KStreamS[K, V]] { inner.stream[K, V](topicPattern) } { implicit ev => inner.stream[K, V](topicPattern, ev) }

def stream[K, V](topics: List[String], consumed: Consumed[K, V]): KStreamS[K, V] =
inner.stream[K, V](topics.asJava, consumed)
def table[K, V](topic: String)(implicit consumed: Perhaps[Consumed[K, V]]): KTableS[K, V] =
consumed.fold[KTableS[K, V]] { inner.table[K, V](topic) } { implicit ev => inner.table[K, V](topic, ev) }

def stream[K, V](topicPattern: Pattern): KStreamS[K, V] =
inner.stream[K, V](topicPattern)
def table[K, V](topic: String, materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]])
(implicit consumed: Perhaps[Consumed[K, V]]): KTableS[K, V] =
consumed.fold[KTableS[K, V]] { inner.table(topic, materialized) } { implicit ev => inner.table[K, V](topic, ev, materialized) }

def stream[K, V](topicPattern: Pattern, consumed: Consumed[K, V]): KStreamS[K, V] =
inner.stream[K, V](topicPattern, consumed)
def globalTable[K, V](topic: String)(implicit consumed: Perhaps[Consumed[K, V]]): GlobalKTable[K, V] =
consumed.fold[GlobalKTable[K, V]] { inner.globalTable(topic) } { implicit ev => inner.globalTable(topic, ev) }

def table[K, V](topic: String): KTableS[K, V] = inner.table[K, V](topic)

def table[K, V](topic: String, consumed: Consumed[K, V]): KTableS[K, V] =
inner.table[K, V](topic, consumed)

def table[K, V](topic: String, consumed: Consumed[K, V],
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, V] =
inner.table[K, V](topic, consumed, materialized)

def table[K, V](topic: String,
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): KTableS[K, V] =
inner.table[K, V](topic, materialized)

def globalTable[K, V](topic: String): GlobalKTable[K, V] =
inner.globalTable(topic)

def globalTable[K, V](topic: String, consumed: Consumed[K, V]): GlobalKTable[K, V] =
inner.globalTable(topic, consumed)

def globalTable[K, V](topic: String, consumed: Consumed[K, V],
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): GlobalKTable[K, V] =
inner.globalTable(topic, consumed, materialized)

def globalTable[K, V](topic: String,
materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]]): GlobalKTable[K, V] =
inner.globalTable(topic, materialized)
def globalTable[K, V](topic: String, materialized: Materialized[K, V, KeyValueStore[Bytes, Array[Byte]]])
(implicit consumed: Perhaps[Consumed[K, V]]): GlobalKTable[K, V] =
consumed.fold[GlobalKTable[K, V]] { inner.globalTable(topic, materialized) } { implicit ev => inner.globalTable(topic, ev, materialized) }

def addStateStore(builder: StoreBuilder[_ <: StateStore]): StreamsBuilder = inner.addStateStore(builder)

def addGlobalStore(storeBuilder: StoreBuilder[_ <: StateStore], topic: String, sourceName: String, consumed: Consumed[_, _], processorName: String, stateUpdateSupplier: ProcessorSupplier[_, _]): StreamsBuilder =
inner.addGlobalStore(storeBuilder, topic, sourceName, consumed, processorName, stateUpdateSupplier)

def build(): Topology = inner.build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import com.lightbend.kafka.scala.server.{KafkaLocalServer, MessageListener, Mess
import minitest.TestSuite
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.kstream.Produced
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
import ImplicitConversions._

object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestData {

Expand All @@ -34,8 +34,8 @@ object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestDa
//
// Step 1: Configure and start the processor topology.
//
val stringSerde = Serdes.String()
val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
implicit val stringSerde = Serdes.String()
implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]

val streamsConfiguration = new Properties()
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, s"wordcount-${scala.util.Random.nextInt(100)}")
Expand All @@ -57,7 +57,7 @@ object KafkaStreamsTest extends TestSuite[KafkaLocalServer] with WordCountTestDa
.groupBy((k, v) => v)
.count()

wordCounts.toStream.to(outputTopic, Produced.`with`(stringSerde, longSerde))
wordCounts.toStream.to(outputTopic)

val streams = new KafkaStreams(builder.build, streamsConfiguration)
streams.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ import com.lightbend.kafka.scala.streams.algebird.{CMSStore, CMSStoreBuilder}
import minitest.TestSuite
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.kstream.{Produced, Transformer}
import org.apache.kafka.streams.kstream.Transformer
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
import ImplicitConversions._

/**
* End-to-end integration test that demonstrates how to probabilistically count items in an input stream.
Expand Down Expand Up @@ -140,12 +141,13 @@ object ProbabilisticCountingScalaIntegrationTest extends TestSuite[KafkaLocalSer
// Read the input from Kafka.
val textLines: KStreamS[Array[Byte], String] = builder.stream(inputTopic)

val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]
implicit val stringSerde = Serdes.String()
implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]

textLines
.flatMapValues(value => value.toLowerCase.split("\\W+").toIterable)
.transform(() => new ProbabilisticCounter, cmsStoreName)
.to(outputTopic, Produced.`with`(Serdes.String(), longSerde))
.to(outputTopic)

val streams: KafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration)
streams.start()
Expand Down
Loading