1- from typing import Any , Dict , List
1+ from typing import Any , Dict , List , Optional
22
33from confluent_kafka import Consumer , Message , TopicPartition # type: ignore
4- from confluent_kafka .admin import AdminClient , TopicMetadata # type: ignore
4+ from confluent_kafka .admin import TopicMetadata # type: ignore
55
66from dlt import config , secrets
77from dlt .common import pendulum
88from dlt .common .configuration import configspec
99from dlt .common .configuration .specs import CredentialsConfiguration
1010from dlt .common .time import ensure_pendulum_datetime
11- from dlt .common .typing import DictStrAny , TSecretValue , TAnyDateTime
11+ from dlt .common .typing import DictStrAny , TSecretValue
1212from dlt .common .utils import digest128
1313
1414
@@ -54,23 +54,26 @@ def default_msg_processor(msg: Message) -> Dict[str, Any]:
5454class OffsetTracker (dict ): # type: ignore
5555 """Object to control offsets of the given topics.
5656
57- Tracks all the partitions of the given topics with two params:
58- current offset and maximum offset (partition length).
57+ Tracks all the partitions of the given topics with three params:
58+ current offset, maximum offset (partition length), and an end time .
5959
6060 Args:
6161 consumer (confluent_kafka.Consumer): Kafka consumer.
6262 topic_names (List): Names of topics to track.
6363 pl_state (DictStrAny): Pipeline current state.
6464 start_from (Optional[pendulum.DateTime]): A timestamp, after which messages
6565 are read. Older messages are ignored.
66+ end_time (Optional[pendulum.DateTime]): A timestamp, before which messages
67+ are read. Newer messages are ignored.
6668 """
6769
6870 def __init__ (
6971 self ,
7072 consumer : Consumer ,
7173 topic_names : List [str ],
7274 pl_state : DictStrAny ,
73- start_from : pendulum .DateTime = None ,
75+ start_from : Optional [pendulum .DateTime ] = None ,
76+ end_time : Optional [pendulum .DateTime ] = None ,
7477 ):
7578 super ().__init__ ()
7679
@@ -82,7 +85,7 @@ def __init__(
8285 "offsets" , {t_name : {} for t_name in topic_names }
8386 )
8487
85- self ._init_partition_offsets (start_from )
88+ self ._init_partition_offsets (start_from , end_time )
8689
8790 def _read_topics (self , topic_names : List [str ]) -> Dict [str , TopicMetadata ]:
8891 """Read the given topics metadata from Kafka.
@@ -104,7 +107,11 @@ def _read_topics(self, topic_names: List[str]) -> Dict[str, TopicMetadata]:
104107
105108 return tracked_topics
106109
107- def _init_partition_offsets (self , start_from : pendulum .DateTime ) -> None :
110+ def _init_partition_offsets (
111+ self ,
112+ start_from : Optional [pendulum .DateTime ] = None ,
113+ end_time : Optional [pendulum .DateTime ] = None ,
114+ ) -> None :
108115 """Designate current and maximum offsets for every partition.
109116
110117 Current offsets are read from the state, if present. Set equal
@@ -113,6 +120,8 @@ def _init_partition_offsets(self, start_from: pendulum.DateTime) -> None:
113120 Args:
114121 start_from (pendulum.DateTime): A timestamp, at which to start
115122 reading. Older messages are ignored.
123+ end_time (pendulum.DateTime): A timestamp, before which messages
124+ are read. Newer messages are ignored.
116125 """
117126 all_parts = []
118127 for t_name , topic in self ._topics .items ():
@@ -128,27 +137,49 @@ def _init_partition_offsets(self, start_from: pendulum.DateTime) -> None:
128137 for part in topic .partitions
129138 ]
130139
131- # get offsets for the timestamp, if given
132- if start_from is not None :
140+ # get offsets for the timestamp ranges, if given
141+ if start_from is not None and end_time is not None :
142+ start_ts_offsets = self ._consumer .offsets_for_times (parts )
143+ end_ts_offsets = self ._consumer .offsets_for_times (
144+ [
145+ TopicPartition (t_name , part , end_time .int_timestamp * 1000 )
146+ for part in topic .partitions
147+ ]
148+ )
149+ elif start_from is not None :
133150 ts_offsets = self ._consumer .offsets_for_times (parts )
134151
135152 # designate current and maximum offsets for every partition
136153 for i , part in enumerate (parts ):
137154 max_offset = self ._consumer .get_watermark_offsets (part )[1 ]
138155
139- if start_from is not None :
156+ if start_from is not None and end_time is not None :
157+ if start_ts_offsets [i ].offset != - 1 :
158+ cur_offset = start_ts_offsets [i ].offset
159+ else :
160+ cur_offset = max_offset - 1
161+ if end_ts_offsets [i ].offset != - 1 :
162+ end_offset = end_ts_offsets [i ].offset
163+ else :
164+ end_offset = max_offset
165+
166+ elif start_from is not None :
140167 if ts_offsets [i ].offset != - 1 :
141168 cur_offset = ts_offsets [i ].offset
142169 else :
143170 cur_offset = max_offset - 1
171+
172+ end_offset = max_offset
173+
144174 else :
145175 cur_offset = (
146176 self ._cur_offsets [t_name ].get (str (part .partition ), - 1 ) + 1
147177 )
178+ end_offset = max_offset
148179
149180 self [t_name ][str (part .partition )] = {
150181 "cur" : cur_offset ,
151- "max" : max_offset ,
182+ "max" : end_offset ,
152183 }
153184
154185 parts [i ].offset = cur_offset
@@ -200,9 +231,11 @@ class KafkaCredentials(CredentialsConfiguration):
200231 bootstrap_servers : str = config .value
201232 group_id : str = config .value
202233 security_protocol : str = config .value
203- sasl_mechanisms : str = config .value
204- sasl_username : str = config .value
205- sasl_password : TSecretValue = secrets .value
234+
235+ # Optional SASL credentials
236+ sasl_mechanisms : Optional [str ] = config .value
237+ sasl_username : Optional [str ] = config .value
238+ sasl_password : Optional [TSecretValue ] = secrets .value
206239
207240 def init_consumer (self ) -> Consumer :
208241 """Init a Kafka consumer from this credentials.
@@ -214,9 +247,16 @@ def init_consumer(self) -> Consumer:
214247 "bootstrap.servers" : self .bootstrap_servers ,
215248 "group.id" : self .group_id ,
216249 "security.protocol" : self .security_protocol ,
217- "sasl.mechanisms" : self .sasl_mechanisms ,
218- "sasl.username" : self .sasl_username ,
219- "sasl.password" : self .sasl_password ,
220250 "auto.offset.reset" : "earliest" ,
221251 }
252+
253+ if self .sasl_mechanisms and self .sasl_username and self .sasl_password :
254+ config .update (
255+ {
256+ "sasl.mechanisms" : self .sasl_mechanisms ,
257+ "sasl.username" : self .sasl_username ,
258+ "sasl.password" : self .sasl_password ,
259+ }
260+ )
261+
222262 return Consumer (config )
0 commit comments