22# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
33#
44
5+ import logging
56from abc import ABC
67from typing import Any , Iterable , List , Mapping , MutableMapping , Optional , Tuple
78
1819from .models import Customer
1920
2021
22+ class cyclic_sieve :
23+ def __init__ (self , logger : logging .Logger , fraction : int = 10 ):
24+ self ._logger = logger
25+ self ._cycle_counter = 0
26+ self ._fraction = fraction
27+
28+ def __getattr__ (self , item ):
29+ if self ._cycle_counter % self ._fraction == 0 :
30+ return getattr (self ._logger , item )
31+ return self .stub
32+
33+ def stub (self , * args , ** kwargs ):
34+ pass
35+
36+ def bump (self ):
37+ self ._cycle_counter += 1
38+
39+
2140def parse_dates (stream_slice ):
2241 start_date = pendulum .parse (stream_slice ["start_date" ])
2342 end_date = pendulum .parse (stream_slice ["end_date" ])
@@ -91,6 +110,7 @@ class GoogleAdsStream(Stream, ABC):
91110 def __init__ (self , api : GoogleAds , customers : List [Customer ]):
92111 self .google_ads_client = api
93112 self .customers = customers
113+ self .base_sieve_logger = cyclic_sieve (self .logger , 10 )
94114
95115 def get_query (self , stream_slice : Mapping [str , Any ]) -> str :
96116 query = GoogleAds .convert_schema_into_query (schema = self .get_json_schema (), report_name = self .name )
@@ -105,7 +125,8 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
105125 yield {"customer_id" : customer .id }
106126
107127 def read_records (self , sync_mode , stream_slice : Optional [Mapping [str , Any ]] = None , ** kwargs ) -> Iterable [Mapping [str , Any ]]:
108- self .logger .info (f"Read records using g-ads client. Stream slice is { stream_slice } " )
128+ self .base_sieve_logger .bump ()
129+ self .base_sieve_logger .info (f"Read records using g-ads client. Stream slice is { stream_slice } " )
109130 if stream_slice is None :
110131 return []
111132
@@ -119,7 +140,7 @@ def read_records(self, sync_mode, stream_slice: Optional[Mapping[str, Any]] = No
119140 raise
120141 for error in exc .failure .errors :
121142 if error .error_code .authorization_error == AuthorizationErrorEnum .AuthorizationError .CUSTOMER_NOT_ENABLED :
122- self .logger .error (error .message )
143+ self .base_sieve_logger .error (error .message )
123144 continue
124145 # log and ignore only CUSTOMER_NOT_ENABLED error, otherwise - raise further
125146 raise
@@ -139,6 +160,7 @@ def __init__(self, start_date: str, conversion_window_days: int, end_date: str =
139160 self ._end_date = end_date
140161 self ._state = {}
141162 super ().__init__ (** kwargs )
163+ self .incremental_sieve_logger = cyclic_sieve (self .logger , 10 )
142164
143165 @property
144166 def state (self ) -> MutableMapping [str , Any ]:
@@ -154,6 +176,7 @@ def current_state(self, customer_id, default=None):
154176
155177 def stream_slices (self , stream_state : Mapping [str , Any ] = None , ** kwargs ) -> Iterable [Optional [MutableMapping [str , any ]]]:
156178 for customer in self .customers :
179+ logger = cyclic_sieve (self .logger , 10 )
157180 stream_state = stream_state or {}
158181 if stream_state .get (customer .id ):
159182 start_date = stream_state [customer .id ].get (self .cursor_field ) or self ._start_date
@@ -165,7 +188,7 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
165188 start_date = self ._start_date
166189
167190 end_date = self ._end_date
168- self . logger .info (f"Generating slices for customer { customer .id } . Start date is { start_date } , end date is { end_date } " )
191+ logger .info (f"Generating slices for customer { customer .id } . Start date is { start_date } , end date is { end_date } " )
169192
170193 for chunk in chunk_date_range (
171194 start_date = start_date ,
@@ -178,7 +201,8 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
178201 ):
179202 if chunk :
180203 chunk ["customer_id" ] = customer .id
181- self .logger .info (f"Next slice is { chunk } " )
204+ logger .info (f"Next slice is { chunk } " )
205+ logger .bump ()
182206 yield chunk
183207
184208 def read_records (
@@ -188,8 +212,9 @@ def read_records(
188212 This method is overridden to handle GoogleAdsException with EXPIRED_PAGE_TOKEN error code,
189213 and update `start_date` key in the `stream_slice` with the latest read record's cursor value, then retry the sync.
190214 """
215+ self .incremental_sieve_logger .bump ()
191216 while True :
192- self .logger .info ("Starting a while loop iteration" )
217+ self .incremental_sieve_logger .info ("Starting a while loop iteration" )
193218 customer_id = stream_slice and stream_slice ["customer_id" ]
194219 try :
195220 records = super ().read_records (sync_mode , stream_slice = stream_slice )
@@ -200,38 +225,40 @@ def read_records(
200225 date_in_latest_record = pendulum .parse (record [self .cursor_field ])
201226 cursor_value = (max (date_in_current_stream , date_in_latest_record )).to_date_string ()
202227 self .state = {customer_id : {self .cursor_field : cursor_value }}
203- self .logger .info (f"Updated state for customer { customer_id } . Full state is { self .state } ." )
228+ self .incremental_sieve_logger .info (f"Updated state for customer { customer_id } . Full state is { self .state } ." )
204229 yield record
205230 continue
206231 self .state = {customer_id : {self .cursor_field : record [self .cursor_field ]}}
207- self .logger .info (f"Initialized state for customer { customer_id } . Full state is { self .state } ." )
232+ self .incremental_sieve_logger .info (f"Initialized state for customer { customer_id } . Full state is { self .state } ." )
208233 yield record
209234 continue
210235 except GoogleAdsException as exception :
211- self .logger .info (f"Caught a GoogleAdsException: { str (exception )} " )
236+ self .incremental_sieve_logger .info (f"Caught a GoogleAdsException: { str (exception )} " )
212237 error = next (iter (exception .failure .errors ))
213238 if error .error_code .request_error == RequestErrorEnum .RequestError .EXPIRED_PAGE_TOKEN :
214239 start_date , end_date = parse_dates (stream_slice )
215240 current_state = self .current_state (customer_id )
216- self .logger .info (f"Start date is { start_date } . End date is { end_date } . Current state is { current_state } " )
241+ self .incremental_sieve_logger .info (
242+ f"Start date is { start_date } . End date is { end_date } . Current state is { current_state } "
243+ )
217244 if (end_date - start_date ).days == 1 :
218245 # If range days is 1, no need in retry, because it's the minimum date range
219- self .logger .error ("Page token has expired." )
246+ self .incremental_sieve_logger .error ("Page token has expired." )
220247 raise exception
221248 elif current_state == stream_slice ["start_date" ]:
222249 # It couldn't read all the records within one day, it will enter an infinite loop,
223250 # so raise the error
224- self .logger .error ("Page token has expired." )
251+ self .incremental_sieve_logger .error ("Page token has expired." )
225252 raise exception
226253 # Retry reading records from where it crushed
227254 stream_slice ["start_date" ] = self .current_state (customer_id , default = stream_slice ["start_date" ])
228- self .logger .info (f"Retry reading records from where it crushed with a modified slice: { stream_slice } " )
255+ self .incremental_sieve_logger .info (f"Retry reading records from where it crushed with a modified slice: { stream_slice } " )
229256 else :
230257 # raise caught error for other error statuses
231258 raise exception
232259 else :
233260 # return the control if no exception is raised
234- self .logger .info ("Current slice has been read. Exiting read_records()" )
261+ self .incremental_sieve_logger .info ("Current slice has been read. Exiting read_records()" )
235262 return
236263
237264 def get_query (self , stream_slice : Mapping [str , Any ] = None ) -> str :
0 commit comments