A wrapper around Akka's reactive kafka providing resilience and re-use of Akka defined serialization for Kafka messages.
Add the dependency in the build.sbt, like:
libraryDependencies ++= Seq( "nl.tradecloud" %% "akka-kafka" % "0.65" ) Configure in the application.conf file, like:
tradecloud.kafka { serviceName = "test" brokers = "localhost:9092" topicPrefix = "" groupPrefix = "" } As this library is a wrapper around Akka's reactive kafka, you can also use the configuration options of Reactive Kafka.
implicit val system: ActorSystem = ActorSystem() implicit val materializer: Materializer = ActorMaterializer() new KafkaSubscriber( group = "some_group_name", topics = Set("some_topic") ).atLeastOnce( Flow[String] .map { wrapper: KafkaMessage[String] => // do something println(wrapper.msg + "-world") // return the offset msg.offset } ) // promise is completed when publish is added to Kafka implicit val system: ActorSystem = ActorSystem() implicit val materializer: Materializer = ActorMaterializer() val publisher = new KafkaPublisher() publisher.publish("topic", msg) Serialization is handled using Akka Serialization, see: Akka Serialization