33#
44
55import calendar
6+ import functools
7+ import logging
68import re
79import time
810from abc import ABC
911from collections import deque
1012from concurrent .futures import Future , ProcessPoolExecutor
11- from datetime import datetime
13+ from datetime import datetime , timedelta
1214from functools import partial
1315from math import ceil
1416from pickle import PickleError , dumps
3133LAST_END_TIME_KEY : str = "_last_end_time"
3234END_OF_STREAM_KEY : str = "end_of_stream"
3335
36+ logger = logging .getLogger ("airbyte" )
37+
38+ # For some streams, multiple http requests are running at the same time for performance reasons.
39+ # However, it may result in hitting the rate limit, therefore subsequent requests have to be made after a pause.
40+ # The idea is to sustain a pause once and continue making multiple requests at a time.
41+ # A single `retry_at` variable is introduced here, which prevents us from duplicate sleeping in the main thread
42+ # before each request is made as it used to be in prior versions.
43+ # It acts like a global counter - increased each time a 429 status is met
44+ # only if it is greater than the current value. On the other hand, no request may be made before this moment.
45+ # Because the requests are made in parallel, time.sleep will be called in parallel as well.
46+ # This is possible because it is a point in time, not timedelta.
47+ retry_at : Optional [datetime ] = None
48+
49+
50+ def sleep_before_executing (sleep_time : float ):
51+ def wrapper (function ):
52+ @functools .wraps (function )
53+ def inner (* args , ** kwargs ):
54+ logger .info (f"Sleeping { sleep_time } seconds before next request" )
55+ time .sleep (int (sleep_time ))
56+ result = function (* args , ** kwargs )
57+ return result , datetime .utcnow ()
58+
59+ return inner
60+
61+ return wrapper
62+
3463
3564def to_int (s ):
3665 "https://github.com/airbytehq/airbyte/issues/13673"
@@ -60,10 +89,16 @@ def send_future(self, request: requests.PreparedRequest, **kwargs) -> Future:
6089 if self .session :
6190 func = self .session .send
6291 else :
92+ sleep_time = 0
93+ now = datetime .utcnow ()
94+ if retry_at and retry_at > now :
95+ sleep_time = (retry_at - datetime .utcnow ()).seconds
6396 # avoid calling super to not break pickled method
6497 func = partial (requests .Session .send , self )
98+ func = sleep_before_executing (sleep_time )(func )
6599
66100 if isinstance (self .executor , ProcessPoolExecutor ):
101+ self .logger .warning ("ProcessPoolExecutor is used to perform IO related tasks for unknown reason!" )
67102 # verify function can be pickled
68103 try :
69104 dumps (func )
@@ -74,11 +109,12 @@ def send_future(self, request: requests.PreparedRequest, **kwargs) -> Future:
74109
75110
76111class BaseSourceZendeskSupportStream (HttpStream , ABC ):
77- def __init__ (self , subdomain : str , start_date : str , ** kwargs ):
112+ def __init__ (self , subdomain : str , start_date : str , ignore_pagination : bool = False , ** kwargs ):
78113 super ().__init__ (** kwargs )
79114
80115 self ._start_date = start_date
81116 self ._subdomain = subdomain
117+ self ._ignore_pagination = ignore_pagination
82118
83119 def backoff_time (self , response : requests .Response ) -> Union [int , float ]:
84120 """
@@ -93,10 +129,9 @@ def backoff_time(self, response: requests.Response) -> Union[int, float]:
93129 return retry_after
94130
95131 # the header X-Rate-Limit returns the amount of requests per minute
96- # we try to wait twice as long
97132 rate_limit = float (response .headers .get ("X-Rate-Limit" , 0 ))
98133 if rate_limit and rate_limit > 0 :
99- return ( 60.0 / rate_limit ) * 2
134+ return 60.0 / rate_limit
100135 return super ().backoff_time (response )
101136
102137 @staticmethod
@@ -211,7 +246,7 @@ def generate_future_requests(
211246 stream_state : Mapping [str , Any ] = None ,
212247 ):
213248 records_count = self .get_api_records_count (stream_slice = stream_slice , stream_state = stream_state )
214-
249+ self . logger . info ( f"Records count is { records_count } " )
215250 page_count = ceil (records_count / self .page_size )
216251 for page_number in range (1 , page_count + 1 ):
217252 params = self .request_params (stream_state = stream_state , stream_slice = stream_slice )
@@ -228,8 +263,14 @@ def generate_future_requests(
228263
229264 request_kwargs = self .request_kwargs (stream_state = stream_state , stream_slice = stream_slice )
230265 self .future_requests .append (
231- {"future" : self ._send_request (request , request_kwargs ), "request" : request , "request_kwargs" : request_kwargs , "retries" : 0 }
266+ {
267+ "future" : self ._send_request (request , request_kwargs ),
268+ "request" : request ,
269+ "request_kwargs" : request_kwargs ,
270+ "retries" : 0 ,
271+ }
232272 )
273+ self .logger .info (f"Generated { len (self .future_requests )} future requests" )
233274
234275 def _send (self , request : requests .PreparedRequest , request_kwargs : Mapping [str , Any ]) -> Future :
235276 response : Future = self ._session .send_future (request , ** request_kwargs )
@@ -264,15 +305,20 @@ def _retry(
264305 retries : int ,
265306 original_exception : Exception = None ,
266307 response : requests .Response = None ,
308+ finished_at : Optional [datetime ] = None ,
267309 ** request_kwargs ,
268310 ):
269311 if retries == self .max_retries :
270312 if original_exception :
271313 raise original_exception
272314 raise DefaultBackoffException (request = request , response = response )
273- if response is not None :
274- backoff_time = self .backoff_time (response )
275- time .sleep (max (0 , int (backoff_time - response .elapsed .total_seconds ())))
315+ sleep_time = self .backoff_time (response )
316+ if response is not None and finished_at and sleep_time :
317+ current_retry_at = finished_at + timedelta (seconds = sleep_time )
318+ global retry_at
319+ if not retry_at or (retry_at < current_retry_at ):
320+ retry_at = current_retry_at
321+ self .logger .info (f"Adding a request to be retried in { sleep_time } seconds" )
276322 self .future_requests .append (
277323 {
278324 "future" : self ._send_request (request , request_kwargs ),
@@ -292,17 +338,21 @@ def read_records(
292338 self .generate_future_requests (sync_mode = sync_mode , cursor_field = cursor_field , stream_slice = stream_slice , stream_state = stream_state )
293339
294340 while len (self .future_requests ) > 0 :
341+ self .logger .info ("Starting another while loop iteration" )
295342 item = self .future_requests .popleft ()
296343 request , retries , future , kwargs = item ["request" ], item ["retries" ], item ["future" ], item ["request_kwargs" ]
297344
298345 try :
299- response = future .result ()
346+ response , finished_at = future .result ()
300347 except TRANSIENT_EXCEPTIONS as exc :
348+ self .logger .info ("Will retry the request because of a transient exception" )
301349 self ._retry (request = request , retries = retries , original_exception = exc , ** kwargs )
302350 continue
303351 if self .should_retry (response ):
304- self ._retry (request = request , retries = retries , response = response , ** kwargs )
352+ self .logger .info ("Will retry the request for other reason" )
353+ self ._retry (request = request , retries = retries , response = response , finished_at = finished_at , ** kwargs )
305354 continue
355+ self .logger .info ("Request successful, will parse the response now" )
306356 yield from self .parse_response (response , stream_state = stream_state , stream_slice = stream_slice )
307357
308358
@@ -324,6 +374,8 @@ def path(self, **kwargs):
324374 return self .name
325375
326376 def next_page_token (self , response : requests .Response ) -> Optional [Mapping [str , Any ]]:
377+ if self ._ignore_pagination :
378+ return None
327379 next_page = self ._parse_next_page_number (response )
328380 if not next_page :
329381 self ._finished = True
@@ -357,6 +409,8 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
357409 return {self .cursor_field : max (new_value , old_value )}
358410
359411 def next_page_token (self , response : requests .Response ) -> Optional [Mapping [str , Any ]]:
412+ if self ._ignore_pagination :
413+ return None
360414 start_time = dict (parse_qsl (urlparse (response .json ().get (self .next_page_field ), "" ).query )).get ("start_time" )
361415 if start_time != self .prev_start_time :
362416 self .prev_start_time = start_time
@@ -502,6 +556,8 @@ class GroupMemberships(SourceZendeskSupportCursorPaginationStream):
502556 """GroupMemberships stream: https://developer.zendesk.com/api-reference/ticketing/groups/group_memberships/"""
503557
504558 def next_page_token (self , response : requests .Response ) -> Optional [Mapping [str , Any ]]:
559+ if self ._ignore_pagination :
560+ return None
505561 next_page = self ._parse_next_page_number (response )
506562 return next_page if next_page else None
507563
@@ -522,6 +578,8 @@ class SatisfactionRatings(SourceZendeskSupportCursorPaginationStream):
522578 """
523579
524580 def next_page_token (self , response : requests .Response ) -> Optional [Mapping [str , Any ]]:
581+ if self ._ignore_pagination :
582+ return None
525583 next_page = self ._parse_next_page_number (response )
526584 return next_page if next_page else None
527585
@@ -548,6 +606,8 @@ class TicketMetrics(SourceZendeskSupportCursorPaginationStream):
548606 """TicketMetric stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_metrics/"""
549607
550608 def next_page_token (self , response : requests .Response ) -> Optional [Mapping [str , Any ]]:
609+ if self ._ignore_pagination :
610+ return None
551611 next_page = self ._parse_next_page_number (response )
552612 return next_page if next_page else None
553613
@@ -601,6 +661,8 @@ def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) ->
601661 return params
602662
603663 def next_page_token (self , response : requests .Response ) -> Optional [Mapping [str , Any ]]:
664+ if self ._ignore_pagination :
665+ return None
604666 return response .json ().get ("before_cursor" )
605667
606668
0 commit comments