Skip to content

Commit 452ee69

Browse files
Sahebsimonsouter
authored andcommitted
Add optional callbacks in AutoParititon Mode (#124)
* Add optional callback for assigned/ revoked partitions in auto partition mode * Fix test server expected records comparison * Fixing the first time partition assigned case
1 parent 82f5df9 commit 452ee69

File tree

3 files changed

+27
-9
lines changed

3 files changed

+27
-9
lines changed

akka/src/main/scala/cakesolutions/kafka/akka/KafkaConsumerActor.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,18 @@ object KafkaConsumerActor {
9797
* The client should ensure that received records are confirmed with 'commit = true' to ensure kafka tracks the commit point.
9898
*
9999
* @param topics the topics to subscribe to start consuming from
100+
* @param assignedListener Optionally provide a callback when partitions are assigned. Can be used if any initialisation is
101+
* required prior to receiving messages for the partition, such as to populate a cache. Default implementation
102+
* is to do nothing.
103+
* @param revokedListener Optionally provide a callback when partitions are revoked. Can be used if any cleanup is
104+
* required after a partition assignment is revoked. Default implementation
105+
* is to do nothing.
100106
*/
101-
final case class AutoPartition(topics: Iterable[String]) extends Subscribe
107+
final case class AutoPartition(
108+
topics: Iterable[String] = List(),
109+
assignedListener: List[TopicPartition] => Unit = _ => (),
110+
revokedListener: List[TopicPartition] => Unit = _ => ()
111+
) extends Subscribe
102112

103113
/**
104114
* Subscribe to topics in auto assigned partition mode with client managed offset commit positions for each partition.
@@ -658,9 +668,9 @@ private final class KafkaConsumerActorImpl[K: TypeTag, V: TypeTag](
658668
}
659669

660670
private def subscribe(s: Subscribe): Unit = s match {
661-
case Subscribe.AutoPartition(topics) =>
671+
case Subscribe.AutoPartition(topics, assignedListener, revokedListener) =>
662672
log.info(s"Subscribing in auto partition assignment mode to topics [{}].", topics.mkString(","))
663-
trackPartitions = new TrackPartitionsCommitMode(consumer, context.self)
673+
trackPartitions = new TrackPartitionsCommitMode(consumer, context.self, assignedListener, revokedListener)
664674
consumer.subscribe(topics.toList.asJava, trackPartitions)
665675

666676
case Subscribe.AutoPartitionWithManualOffset(topics, assignedListener, revokedListener) =>

akka/src/main/scala/cakesolutions/kafka/akka/TrackPartitions.scala

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ sealed trait TrackPartitions extends ConsumerRebalanceListener {
1313
def isRevoked: Boolean
1414

1515
def reset(): Unit
16+
17+
def offsetsToTopicPartitions(offsets: Map[TopicPartition, Long]): List[TopicPartition] =
18+
offsets.map { case (tp, _) => tp }.toList
1619
}
1720

1821
/**
@@ -25,8 +28,10 @@ sealed trait TrackPartitions extends ConsumerRebalanceListener {
2528
* @param consumer The client driver
2629
* @param consumerActor Tha KafkaConsumerActor to notify of partition change events
2730
*/
28-
private final class TrackPartitionsCommitMode(consumer: KafkaConsumer[_, _], consumerActor: ActorRef)
29-
extends TrackPartitions {
31+
private final class TrackPartitionsCommitMode(
32+
consumer: KafkaConsumer[_, _], consumerActor: ActorRef,
33+
assignedListener: List[TopicPartition] => Unit,
34+
revokedListener: List[TopicPartition] => Unit) extends TrackPartitions {
3035

3136
private val log = LoggerFactory.getLogger(getClass)
3237

@@ -38,6 +43,8 @@ private final class TrackPartitionsCommitMode(consumer: KafkaConsumer[_, _], con
3843

3944
_revoked = true
4045

46+
revokedListener(partitions.asScala.toList)
47+
4148
// If partitions have been revoked, keep a record of our current position within them.
4249
if (!partitions.isEmpty) {
4350
_offsets = partitions.asScala.map(partition => partition -> consumer.position(partition)).toMap
@@ -55,6 +62,7 @@ private final class TrackPartitionsCommitMode(consumer: KafkaConsumer[_, _], con
5562
val allExisting = _offsets.forall { case (partition, _) => partitions.contains(partition) }
5663

5764
if (allExisting) {
65+
assignedListener(partitions.asScala.toList)
5866
for {
5967
partition <- partitions.asScala
6068
offset <- _offsets.get(partition)
@@ -66,6 +74,9 @@ private final class TrackPartitionsCommitMode(consumer: KafkaConsumer[_, _], con
6674

6775
} else {
6876
consumerActor ! KafkaConsumerActor.RevokeReset
77+
78+
// Invoke client callback to notify revocation of all existing partitions.
79+
revokedListener(offsetsToTopicPartitions(_offsets))
6980
}
7081
}
7182

@@ -113,9 +124,6 @@ private final class TrackPartitionsManualOffset(
113124

114125
log.debug("onPartitionsAssigned: " + partitions.toString)
115126

116-
def offsetsToTopicPartitions(offsets: Map[TopicPartition, Long]): List[TopicPartition] =
117-
offsets.map { case (tp, _) => tp }.toList
118-
119127
def assign(partitions: List[TopicPartition]) = {
120128
val offsets = assignedListener(partitions)
121129
for {

testkit/src/main/scala/cakesolutions.kafka/testkit/KafkaServer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ final class KafkaServer(
158158
val collected = ArrayBuffer.empty[(Option[Key], Value)]
159159
val start = System.currentTimeMillis()
160160

161-
while (total <= expectedNumOfRecords && System.currentTimeMillis() < start + timeout) {
161+
while (total < expectedNumOfRecords && System.currentTimeMillis() < start + timeout) {
162162
val records = consumer.poll(100)
163163
val kvs = records.asScala.map(r => (Option(r.key()), r.value()))
164164
collected ++= kvs

0 commit comments

Comments
 (0)