Skip to content

Commit 884b4c9

Browse files
authored
Support of Kafka 0.11.0.0 (#117)
* Initial support for 0.11.0.0 * Added new transaction related properties to config
1 parent c6f88d9 commit 884b4c9

File tree

17 files changed

+263
-117
lines changed

17 files changed

+263
-117
lines changed

akka/src/it/scala/cakesolutions/kafka/akka/KafkaConsumerActorPerfSpec.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ package cakesolutions.kafka.akka
33
import akka.actor.{ActorRef, ActorSystem}
44
import akka.testkit.TestActor.AutoPilot
55
import akka.testkit.{ImplicitSender, TestActor, TestKit, TestProbe}
6-
import cakesolutions.kafka.akka.KafkaConsumerActor.Confirm
7-
import cakesolutions.kafka.akka.KafkaConsumerActor.Subscribe.AutoPartition
6+
import cakesolutions.kafka.akka.KafkaConsumerActor.{Confirm, Subscribe}
87
import cakesolutions.kafka.{KafkaConsumer, KafkaProducer, KafkaProducerRecord}
98
import com.typesafe.config.ConfigFactory
109
import org.apache.kafka.clients.consumer.ConsumerConfig
@@ -73,7 +72,7 @@ class KafkaConsumerActorPerfSpec(system_ : ActorSystem)
7372
producer.flush()
7473
log.info("Delivered {} messages to topic {}", totalMessages, topic)
7574

76-
consumer.subscribe(AutoPartition(Seq(topic)))
75+
consumer.subscribe(Subscribe.AutoPartition(Seq(topic)))
7776

7877
whenReady(pilot.future) { case (totalTime, messagesPerSec) =>
7978
log.info("Total Time millis : {}", totalTime)

akka/src/it/scala/cakesolutions/kafka/akka/KafkaE2EActorPerfSpec.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ package cakesolutions.kafka.akka
33
import akka.actor.{ActorRef, ActorSystem}
44
import akka.testkit.TestActor.AutoPilot
55
import akka.testkit.{ImplicitSender, TestActor, TestKit, TestProbe}
6-
import cakesolutions.kafka.akka.KafkaConsumerActor.Confirm
7-
import cakesolutions.kafka.akka.KafkaConsumerActor.Subscribe.AutoPartition
6+
import cakesolutions.kafka.akka.KafkaConsumerActor.{Confirm, Subscribe}
87
import cakesolutions.kafka.{KafkaConsumer, KafkaProducer, KafkaProducerRecord}
98
import com.typesafe.config.ConfigFactory
109
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
@@ -78,7 +77,7 @@ class KafkaE2EActorPerfSpec(system_ : ActorSystem)
7877
testProducer.flush()
7978
log.info("Delivered {} messages to topic {}", totalMessages, sourceTopic)
8079

81-
consumer.subscribe(AutoPartition(Seq(sourceTopic)))
80+
consumer.subscribe(Subscribe.AutoPartition(Seq(sourceTopic)))
8281

8382
whenReady(pilot.future) { case (totalTime, messagesPerSec) =>
8483
log.info("Total Time millis : {}", totalTime)

akka/src/test/scala/cakesolutions/kafka/akka/KafkaConsumerActorRecoverySpec.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package cakesolutions.kafka.akka
22

33
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, OneForOneStrategy, Props, SupervisorStrategy}
4-
import cakesolutions.kafka.akka.KafkaConsumerActor.Subscribe.{ManualOffset, ManualPartition}
54
import cakesolutions.kafka.akka.KafkaConsumerActor.{Confirm, Subscribe, TriggerConsumerFailure, Unsubscribe}
65
import cakesolutions.kafka.{KafkaConsumer, KafkaProducerRecord, KafkaTopicPartition}
76
import org.apache.kafka.clients.consumer.OffsetResetStrategy
@@ -119,7 +118,7 @@ class KafkaConsumerActorRecoverySpec(_system: ActorSystem) extends KafkaIntSpec(
119118
producer.flush()
120119

121120
val consumer = KafkaConsumerActor(consumerConf, KafkaConsumerActor.Conf(), testActor)
122-
consumer.subscribe(ManualPartition(List(topicPartition)))
121+
consumer.subscribe(Subscribe.ManualPartition(List(topicPartition)))
123122

124123
val rec1 = expectMsgClass(30.seconds, classOf[ConsumerRecords[String, String]])
125124
rec1.offsets.get(topicPartition) shouldBe Some(1)
@@ -141,7 +140,7 @@ class KafkaConsumerActorRecoverySpec(_system: ActorSystem) extends KafkaIntSpec(
141140

142141
// Reset subscription
143142
consumer.unsubscribe()
144-
consumer.subscribe(ManualOffset(offsets))
143+
consumer.subscribe(Subscribe.ManualOffset(offsets))
145144

146145
// New subscription starts from specified offset
147146
val rec3 = expectMsgClass(30.seconds, classOf[ConsumerRecords[String, String]])

akka/src/test/scala/cakesolutions/kafka/akka/KafkaConsumerActorSpec.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package cakesolutions.kafka.akka
22

33
import akka.actor.{ActorSystem, PoisonPill}
44
import akka.testkit.TestProbe
5-
import cakesolutions.kafka.akka.KafkaConsumerActor.Subscribe.AutoPartition
65
import cakesolutions.kafka.akka.KafkaConsumerActor.{Confirm, Subscribe, Unsubscribe}
76
import cakesolutions.kafka.{KafkaConsumer, KafkaProducer, KafkaProducerRecord, KafkaTopicPartition}
87
import com.typesafe.config.{Config, ConfigFactory}
@@ -80,7 +79,7 @@ class KafkaConsumerActorSpec(system_ : ActorSystem) extends KafkaIntSpec(system_
8079
producer.flush()
8180

8281
val consumer = KafkaConsumerActor(consumerConfig, actorConf, testActor)
83-
consumer.subscribe(AutoPartition(Seq(topic)))
82+
consumer.subscribe(Subscribe.AutoPartition(Seq(topic)))
8483

8584
val rs = expectMsgClass(30.seconds, classOf[ConsumerRecords[String, String]])
8685
consumer.confirm(rs.offsets)

client/src/it/scala/cakesolutions/kafka/KafkaConsumerPerfSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializ
66
import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
77
import org.slf4j.LoggerFactory
88

9-
import scala.collection.JavaConversions._
9+
import scala.collection.JavaConverters._
1010
import scala.util.Random
1111

1212
/**
@@ -42,7 +42,7 @@ class KafkaConsumerPerfSpec extends FlatSpecLike
4242
producer.flush()
4343
log.info("Delivered 100000 msg to topic {}", topic)
4444

45-
consumer.subscribe(List(topic))
45+
consumer.subscribe(List(topic).asJava)
4646

4747
var start = 0l
4848

client/src/main/scala/cakesolutions/kafka/KafkaConsumer.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import cakesolutions.kafka.TypesafeConfigExtensions._
44
import com.typesafe.config.Config
55
import org.apache.kafka.clients.consumer.{ConsumerConfig, OffsetResetStrategy, KafkaConsumer => JKafkaConsumer}
66
import org.apache.kafka.common.TopicPartition
7+
import org.apache.kafka.common.requests.IsolationLevel
78
import org.apache.kafka.common.serialization.Deserializer
9+
810
import scala.collection.JavaConverters._
911
import scala.language.implicitConversions
1012

@@ -60,7 +62,8 @@ object KafkaConsumer {
6062
maxPollRecords: Int = 500,
6163
maxPollInterval: Int = 300000,
6264
maxMetaDataAge : Long = 300000,
63-
autoOffsetReset: OffsetResetStrategy = OffsetResetStrategy.LATEST
65+
autoOffsetReset: OffsetResetStrategy = OffsetResetStrategy.LATEST,
66+
isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED
6467
): Conf[K, V] = {
6568

6669
val configMap = Map[String, AnyRef](
@@ -73,7 +76,8 @@ object KafkaConsumer {
7376
ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPollRecords.toString,
7477
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG -> maxPollInterval.toString,
7578
ConsumerConfig.METADATA_MAX_AGE_CONFIG ->maxMetaDataAge.toString,
76-
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> autoOffsetReset.toString.toLowerCase
79+
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> autoOffsetReset.toString.toLowerCase,
80+
ConsumerConfig.ISOLATION_LEVEL_CONFIG -> isolationLevel.toString.toLowerCase()
7781
)
7882

7983
apply(configMap, keyDeserializer, valueDeserializer)

0 commit comments

Comments
 (0)