Kafka CloudStack Events Reader and Evaluator Framework
The aim of the library is the convenient handling of Kafka messages. It provides the mechanisms to:
- Buffer the messages to vary a count of processing messages without changing a consumer properties
- Implement the logic of storing consumer offsets to any place
Add the following to your build.sbt
libraryDependencies += "com.bwsw" %% "kafka-reader" % "0.10.1"The diagram below is a simple illustration of how the library's classes should be used.
Implement your own EventHandler and EventManager in the way as it is displayed on the diagram.
where:
* K - type of ConsumerRecord key
* V - type of ConsumerRecord value
* T - type of data after handle a ConsumerRecord by the instance of EventHandler implementation
The example below shows how to print messages from Kafka to the console. The call to the method is performed in Future:
class SimpleEventHandler(messageQueue: MessageQueue[String,String], messageCount: Int) extends EventHandler[String,String,Future[Unit]](messageQueue, messageCount) { override def handle(flag: AtomicBoolean): List[OutputEnvelope[Future[Unit]]] = { val inputEnvelopes = messageQueue.take(messageCount) inputEnvelopes.map { x => OutputEnvelope[Future[Unit]](x.topic, x.partition, x.offset, Future(println(x.data))) } } }According to the diagram above the main class looks like this:
object EventManager { def main(args: Array[String]): Unit = { val dummyFlag = new AtomicBoolean(true) val consumer = new Consumer[String,String](Consumer.Settings("localhost:9092", "group01", 3000)) val checkpointInfoProcessor = new CheckpointInfoProcessor[String,String,Future[Unit]]( TopicInfoList(List(TopicInfo(topic = "topic1"))), consumer ) val messageQueue = new MessageQueue[String,String](consumer) val eventHandler = new SimpleEventHandler(messageQueue, countOfMessages = 1) checkpointInfoProcessor.load() val outputEnvelopes = eventHandler.handle(dummyFlag) outputEnvelopes.data.foreach { case Success(x) => case Failure(e) => prinln(s"something went wrong, exception was thrown: $e") throw e } checkpointInfoProcessor.save(outputEnvelopes) consumer.close() } }Run tests: sbt test
Run tests: sbt it:test
Library has the same version as Apache Kafka library
This project is licensed under the Apache License - see the LICENSE file for details