- Notifications
You must be signed in to change notification settings - Fork 51
Implementation of Implicit (De-)Serializer techniques #47
Changes from 3 commits
9872248 1b4e9d1 c668377 e345b69 File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -5,7 +5,8 @@ | |
| package com.lightbend.kafka.scala.streams | ||
| | ||
| import org.apache.kafka.streams.kstream._ | ||
| import org.apache.kafka.streams.KeyValue | ||
| import org.apache.kafka.streams.{ KeyValue, Consumed } | ||
| import org.apache.kafka.common.serialization.Serde | ||
| | ||
| import scala.language.implicitConversions | ||
| | ||
| | @@ -35,5 +36,28 @@ object ImplicitConversions { | |
| | ||
| implicit def Tuple2ToKeyValue[K, V](tuple: (K, V)): KeyValue[K, V] = new KeyValue(tuple._1, tuple._2) | ||
| | ||
| } | ||
| // technique for optional implicits adopted from | ||
| // http://missingfaktor.blogspot.in/2013/12/optional-implicit-trick-in-scala.html | ||
| | ||
| case class Perhaps[E](value: Option[E]) { | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It probably should be made Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah true, but in the latest release we have got rid of this abstraction altogether. | ||
| def fold[F](ifAbsent: => F)(ifPresent: E => F): F = { | ||
| value.fold(ifAbsent)(ifPresent) | ||
| } | ||
| } | ||
| | ||
| implicit def perhaps[E](implicit ev: E = null): Perhaps[E] = { | ||
| Perhaps(Option(ev)) | ||
| } | ||
| | ||
| // we would also like to allow users implicit serdes | ||
| // and these implicits will convert them to `Serialized`, `Produced` or `Consumed` | ||
| | ||
| implicit def SerializedFromSerde[K,V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Serialized[K,V] = | ||
| Serialized.`with`(keySerde, valueSerde) | ||
| | ||
| implicit def ConsumedFromSerde[K,V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Consumed[K,V] = | ||
| Consumed.`with`(keySerde, valueSerde) | ||
| | ||
| implicit def ProducedFromSerde[K,V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Produced[K,V] = | ||
| Produced.`with`(keySerde, valueSerde) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -57,15 +57,11 @@ class KStreamS[K, V](val inner: KStream[K, V]) { | |
| inner.branch(predicates.map(_.asPredicate): _*).map(kstream => wrapKStream(kstream)) | ||
| } | ||
| | ||
| def through(topic: String): KStreamS[K, V] = inner.through(topic) | ||
| def through(topic: String)(implicit produced: Perhaps[Produced[K, V]]): KStreamS[K, V] = | ||
| produced.fold[KStreamS[K, V]] { inner.through(topic) } { ev => inner.through(topic, ev) } | ||
| | ||
| def through(topic: String, | ||
| produced: Produced[K, V]): KStreamS[K, V] = inner.through(topic, produced) | ||
| | ||
| def to(topic: String): Unit = inner.to(topic) | ||
| | ||
| def to(topic: String, | ||
| produced: Produced[K, V]): Unit = inner.to(topic, produced) | ||
| def to(topic: String)(implicit produced: Perhaps[Produced[K, V]]): Unit = | ||
| produced.fold[Unit] { inner.to(topic) } { implicit ev => inner.to(topic, ev) } | ||
| | ||
| def transform[K1, V1](transformerSupplier: () => Transformer[K, V, (K1, V1)], | ||
| stateStoreNames: String*): KStreamS[K1, V1] = { | ||
| | @@ -110,18 +106,31 @@ class KStreamS[K, V](val inner: KStream[K, V]) { | |
| inner.process(processorSupplierJ, stateStoreNames: _*) | ||
| } | ||
| | ||
| def groupByKey(): KGroupedStreamS[K, V] = | ||
| inner.groupByKey() | ||
| | ||
| def groupByKey(serialized: Serialized[K, V]): KGroupedStreamS[K, V] = | ||
| inner.groupByKey(serialized) | ||
| | ||
| def groupBy[KR](selector: (K, V) => KR): KGroupedStreamS[KR, V] = { | ||
| inner.groupBy(selector.asKeyValueMapper) | ||
| } | ||
| | ||
| def groupBy[KR](selector: (K, V) => KR, serialized: Serialized[KR, V]): KGroupedStreamS[KR, V] = { | ||
| inner.groupBy(selector.asKeyValueMapper, serialized) | ||
| /** | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if I saw this, but do the contests explicitly confirm all three usage patterns. Specifically, I'm not sure I saw that #1 was being tested. Contributor Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are correct! I added that test .. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great! | ||
| * If `Serialized[K, V]` is found in the implicit scope, then use it, else | ||
| * use the API with the default serializers. | ||
| * | ||
| * Usage Pattern 1: No implicits in scope, use default serializers | ||
| * - .groupByKey | ||
| * | ||
| * Usage Pattern 2: Use implicit `Serialized` in scope | ||
| * implicit val serialized = Serialized.`with`(stringSerde, longSerde) | ||
| * - .groupByKey | ||
| * | ||
| * Usage Pattern 3: uses the implicit conversion from the serdes to `Serialized` | ||
| * implicit val stringSerde: Serde[String] = Serdes.String() | ||
| * implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]] | ||
| * - .groupByKey | ||
| */ | ||
| def groupByKey(implicit serialized: Perhaps[Serialized[K, V]]): KGroupedStreamS[K, V] = | ||
| serialized.fold[KGroupedStreamS[K, V]] { inner.groupByKey } { implicit ev => inner.groupByKey(ev) } | ||
| | ||
| def groupBy[KR](selector: (K, V) => KR)(implicit serialized: Perhaps[Serialized[KR, V]]): KGroupedStreamS[KR, V] = { | ||
| serialized.fold[KGroupedStreamS[KR, V]] { | ||
| inner.groupBy(selector.asKeyValueMapper) | ||
| } { implicit ev => | ||
| inner.groupBy(selector.asKeyValueMapper, ev) | ||
| } | ||
| } | ||
| | ||
| def join[VO, VR](otherStream: KStreamS[K, VO], | ||
| | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome find! 👍