@@ -13,9 +13,11 @@ import org.apache.kafka.common.serialization.Deserializer
1313
1414import scala .collection .JavaConverters ._
1515import scala .concurrent .duration ._
16+ import scala .language .implicitConversions
1617import scala .reflect .runtime .universe .TypeTag
1718import scala .util .{Failure , Success , Try }
1819
20+
1921/**
2022 * An actor that wraps [[KafkaConsumer ]].
2123 *
@@ -161,6 +163,21 @@ object KafkaConsumerActor {
161163 * @param offsets the topics with partitions and offsets to start consuming from
162164 */
163165 final case class ManualOffset (offsets : Offsets ) extends Subscribe
166+
167+ /**
168+ * Subscribe to topics by providing a timestamp per partition denoting the point in time with the first offset that must be retrieved.
169+ *
170+ * In manually assigned partition mode, the consumer will specify the partitions directly,
171+ * This means that Kafka will not be automatically rebalance the partitions when new consumers appear in the consumer group.
172+ *
173+ * In addition to manually assigning the partitions, the partition offsets will be set to start from the given offsets.
174+ *
175+ * The client should ensure that received records are confirmed with 'commit = false' to ensure consumed records are
176+ * not committed back to kafka.
177+ *
178+ * @param offsets the topics with partitions and timestamps to start consuming from
179+ */
180+ final case class ManualOffsetForTimes (offsets : Offsets ) extends Subscribe
164181 }
165182
166183 /**
@@ -375,6 +392,14 @@ private final class KafkaConsumerActorImpl[K: TypeTag, V: TypeTag](
375392 import PollScheduling .Poll
376393 import context .become
377394
395+ /**
396+ * Implicit conversion to support calling the org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes method with a Map[TopicPartition, scala.Long].
397+ */
398+ implicit def toJavaOffsetQuery (offsetQuery : Map [TopicPartition , scala.Long ]): java.util.Map [TopicPartition , java.lang.Long ] =
399+ offsetQuery
400+ .map { case (tp, time) => tp -> new java.lang.Long (time) }
401+ .asJava
402+
378403 type Records = ConsumerRecords [K , V ]
379404
380405 private val consumer = KafkaConsumer [K , V ](consumerConf)
@@ -401,11 +426,21 @@ private final class KafkaConsumerActorImpl[K: TypeTag, V: TypeTag](
401426 Subscribe .AutoPartitionWithManualOffset (s.topics, s.assignedListener, s.revokedListener)
402427 case _ : Subscribe .ManualPartition => Subscribe .ManualOffset (offsets)
403428 case _ : Subscribe .ManualOffset => Subscribe .ManualOffset (offsets)
429+ case _ : Subscribe .ManualOffsetForTimes =>
430+ val timeOffsets = timeOffsets2regularOffsets(offsets)
431+ Subscribe .ManualOffset (timeOffsets)
404432 }
405433 lastConfirmedOffsets.map(advance).getOrElse(subscription)
406434 }
407435 }
408436
437+ private def timeOffsets2regularOffsets (timeOffsets : Offsets ) : Offsets = {
438+ import scala .collection .JavaConverters ._
439+ val javaOffsetsAndTimestamps = consumer.offsetsForTimes(timeOffsets.offsetsMap).asScala.toMap
440+ val offsets = javaOffsetsAndTimestamps.mapValues(_.offset())
441+ Offsets (offsets)
442+ }
443+
409444 private case class Subscribed (
410445 subscription : Subscribe ,
411446 lastConfirmedOffsets : Option [Offsets ]
@@ -686,6 +721,12 @@ private final class KafkaConsumerActorImpl[K: TypeTag, V: TypeTag](
686721 log.info(" Subscribing in manual partition assignment mode to partitions with offsets [{}]" , offsets)
687722 consumer.assign(offsets.topicPartitions.toList.asJava)
688723 seekOffsets(offsets)
724+
725+ case Subscribe .ManualOffsetForTimes (offsets) =>
726+ log.info(" Subscribing in manual partition assignment mode with timestamps to partitions with offsets [{}]" , offsets)
727+ consumer.assign(offsets.topicPartitions.toList.asJava)
728+ val regularOffsets = timeOffsets2regularOffsets(offsets)
729+ seekOffsets(regularOffsets)
689730 }
690731
691732 // The client is usually misusing the Consumer if incorrect Confirm offsets are provided
0 commit comments