1717import collections
1818import io
1919import json
20+ import time
2021
2122try :
2223 import fastavro
2324except ImportError : # pragma: NO COVER
2425 fastavro = None
2526import google .api_core .exceptions
27+ import google .rpc .error_details_pb2
2628
2729try :
2830 import pandas
@@ -79,16 +81,17 @@ class ReadRowsStream(object):
7981 If the pandas and fastavro libraries are installed, use the
8082 :func:`~google.cloud.bigquery_storage_v1.reader.ReadRowsStream.to_dataframe()`
8183 method to parse all messages into a :class:`pandas.DataFrame`.
84+
85+ This object should not be created directly, but is returned by
86+ other methods in this library.
8287 """
8388
84- def __init__ (self , wrapped , client , name , offset , read_rows_kwargs ):
89+ def __init__ (
90+ self , client , name , offset , read_rows_kwargs , retry_delay_callback = None
91+ ):
8592 """Construct a ReadRowsStream.
8693
8794 Args:
88- wrapped (Iterable[ \
89- ~google.cloud.bigquery_storage.types.ReadRowsResponse \
90- ]):
91- The ReadRows stream to read.
9295 client ( \
9396 ~google.cloud.bigquery_storage_v1.services. \
9497 big_query_read.BigQueryReadClient \
@@ -106,6 +109,12 @@ def __init__(self, wrapped, client, name, offset, read_rows_kwargs):
106109 read_rows_kwargs (dict):
107110 Keyword arguments to use when reconnecting to a ReadRows
108111 stream.
112+ retry_delay_callback (Optional[Callable[[float], None]]):
113+ If the client receives a retryable error that asks the client to
114+ delay its next attempt and retry_delay_callback is not None,
115+ ReadRowsStream will call retry_delay_callback with the delay
116+ duration (in seconds) before it starts sleeping until the next
117+ attempt.
109118
110119 Returns:
111120 Iterable[ \
@@ -116,11 +125,12 @@ def __init__(self, wrapped, client, name, offset, read_rows_kwargs):
116125
117126 # Make a copy of the read position so that we can update it without
118127 # mutating the original input.
119- self ._wrapped = wrapped
120128 self ._client = client
121129 self ._name = name
122130 self ._offset = offset
123131 self ._read_rows_kwargs = read_rows_kwargs
132+ self ._retry_delay_callback = retry_delay_callback
133+ self ._wrapped = None
124134
125135 def __iter__ (self ):
126136 """An iterable of messages.
@@ -131,9 +141,12 @@ def __iter__(self):
131141 ]:
132142 A sequence of row messages.
133143 """
134-
135144 # Infinite loop to reconnect on reconnectable errors while processing
136145 # the row stream.
146+
147+ if self ._wrapped is None :
148+ self ._reconnect ()
149+
137150 while True :
138151 try :
139152 for message in self ._wrapped :
@@ -152,14 +165,53 @@ def __iter__(self):
152165 except _STREAM_RESUMPTION_EXCEPTIONS :
153166 # Transient error, so reconnect to the stream.
154167 pass
168+ except Exception as exc :
169+ if not self ._resource_exhausted_exception_is_retryable (exc ):
170+ raise
155171
156172 self ._reconnect ()
157173
158174 def _reconnect (self ):
159175 """Reconnect to the ReadRows stream using the most recent offset."""
160- self ._wrapped = self ._client .read_rows (
161- read_stream = self ._name , offset = self ._offset , ** self ._read_rows_kwargs
162- )
176+ while True :
177+ try :
178+ self ._wrapped = self ._client .read_rows (
179+ read_stream = self ._name ,
180+ offset = self ._offset ,
181+ ** self ._read_rows_kwargs
182+ )
183+ break
184+ except Exception as exc :
185+ if not self ._resource_exhausted_exception_is_retryable (exc ):
186+ raise
187+
188+ def _resource_exhausted_exception_is_retryable (self , exc ):
189+ if isinstance (exc , google .api_core .exceptions .ResourceExhausted ):
190+ # ResourceExhausted errors are only retried if a valid
191+ # RetryInfo is provided with the error.
192+ #
193+ # TODO: Remove hasattr logic when we require google-api-core >= 2.2.0.
194+ # ResourceExhausted added details/_details in google-api-core 2.2.0.
195+ details = None
196+ if hasattr (exc , "details" ):
197+ details = exc .details
198+ elif hasattr (exc , "_details" ):
199+ details = exc ._details
200+ if details is not None :
201+ for detail in details :
202+ if isinstance (detail , google .rpc .error_details_pb2 .RetryInfo ):
203+ retry_delay = detail .retry_delay
204+ if retry_delay is not None :
205+ delay = max (
206+ 0 ,
207+ float (retry_delay .seconds )
208+ + (float (retry_delay .nanos ) / 1e9 ),
209+ )
210+ if self ._retry_delay_callback :
211+ self ._retry_delay_callback (delay )
212+ time .sleep (delay )
213+ return True
214+ return False
163215
164216 def rows (self , read_session = None ):
165217 """Iterate over all rows in the stream.
0 commit comments