Skip to content

Commit d3600c2

Browse files
committed
refactor: move query and wait logic to separate module
This prepares the way for using the `query_and_wait` method built-in to the client library when available.
1 parent 78c58cc commit d3600c2

File tree

6 files changed

+248
-192
lines changed

6 files changed

+248
-192
lines changed

pandas_gbq/exceptions.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,12 @@ class PerformanceWarning(RuntimeWarning):
3535
Such warnings can occur when dependencies for the requested feature
3636
aren't up-to-date.
3737
"""
38+
39+
40+
class QueryTimeout(ValueError):
41+
"""
42+
Raised when the query request exceeds the timeoutMs value specified in the
43+
BigQuery configuration.
44+
"""
45+
46+
pass

pandas_gbq/gbq.py

Lines changed: 19 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
# license that can be found in the LICENSE file.
44

55
import copy
6-
import concurrent.futures
76
from datetime import datetime
87
import logging
98
import re
@@ -20,8 +19,9 @@
2019
if typing.TYPE_CHECKING: # pragma: NO COVER
2120
import pandas
2221

23-
from pandas_gbq.exceptions import AccessDenied, GenericGBQException
22+
from pandas_gbq.exceptions import GenericGBQException, QueryTimeout
2423
from pandas_gbq.features import FEATURES
24+
import pandas_gbq.query
2525
import pandas_gbq.schema
2626
import pandas_gbq.timestamp
2727

@@ -130,15 +130,6 @@ class NotFoundException(ValueError):
130130
pass
131131

132132

133-
class QueryTimeout(ValueError):
134-
"""
135-
Raised when the query request exceeds the timeoutMs value specified in the
136-
BigQuery configuration.
137-
"""
138-
139-
pass
140-
141-
142133
class TableCreationError(ValueError):
143134
"""
144135
Raised when the create table method fails
@@ -340,10 +331,6 @@ def __init__(
340331
self.client = self.get_client()
341332
self.use_bqstorage_api = use_bqstorage_api
342333

343-
# BQ Queries costs $5 per TB. First 1 TB per month is free
344-
# see here for more: https://cloud.google.com/bigquery/pricing
345-
self.query_price_for_TB = 5.0 / 2**40 # USD/TB
346-
347334
def _start_timer(self):
348335
self.start = time.time()
349336

@@ -355,16 +342,6 @@ def log_elapsed_seconds(self, prefix="Elapsed", postfix="s.", overlong=6):
355342
if sec > overlong:
356343
logger.info("{} {} {}".format(prefix, sec, postfix))
357344

358-
# http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size
359-
@staticmethod
360-
def sizeof_fmt(num, suffix="B"):
361-
fmt = "%3.1f %s%s"
362-
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
363-
if abs(num) < 1024.0:
364-
return fmt % (num, unit, suffix)
365-
num /= 1024.0
366-
return fmt % (num, "Y", suffix)
367-
368345
def get_client(self):
369346
import google.api_core.client_info
370347
from google.cloud import bigquery
@@ -421,46 +398,10 @@ def download_table(
421398
user_dtypes=dtypes,
422399
)
423400

424-
def _wait_for_query_job(self, query_reply, timeout_ms):
425-
"""Wait for query to complete, pausing occasionally to update progress.
426-
427-
Args:
428-
query_reply (QueryJob):
429-
A query job which has started.
430-
431-
timeout_ms (Optional[int]):
432-
How long to wait before cancelling the query.
433-
"""
434-
# Wait at most 10 seconds so we can show progress.
435-
# TODO(https://github.com/googleapis/python-bigquery-pandas/issues/327):
436-
# Include a tqdm progress bar here instead of a stream of log messages.
437-
timeout_sec = 10.0
438-
if timeout_ms:
439-
timeout_sec = min(timeout_sec, timeout_ms / 1000.0)
440-
441-
while query_reply.state != "DONE":
442-
self.log_elapsed_seconds(" Elapsed", "s. Waiting...")
443-
444-
if timeout_ms and timeout_ms < self.get_elapsed_seconds() * 1000:
445-
self.client.cancel_job(
446-
query_reply.job_id, location=query_reply.location
447-
)
448-
raise QueryTimeout("Query timeout: {} ms".format(timeout_ms))
449-
450-
try:
451-
query_reply.result(timeout=timeout_sec)
452-
except concurrent.futures.TimeoutError:
453-
# Use our own timeout logic
454-
pass
455-
except self.http_error as ex:
456-
self.process_http_error(ex)
457-
458401
def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
459-
from google.auth.exceptions import RefreshError
460402
from google.cloud import bigquery
461-
import pandas
462403

463-
job_config = {
404+
job_config_dict = {
464405
"query": {
465406
"useLegacySql": self.dialect
466407
== "legacy"
@@ -470,74 +411,27 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
470411
}
471412
config = kwargs.get("configuration")
472413
if config is not None:
473-
job_config.update(config)
414+
job_config_dict.update(config)
474415

475-
self._start_timer()
476-
477-
try:
478-
logger.debug("Requesting query... ")
479-
query_reply = self.client.query(
480-
query,
481-
job_config=bigquery.QueryJobConfig.from_api_repr(job_config),
482-
location=self.location,
483-
project=self.project_id,
484-
)
485-
logger.debug("Query running...")
486-
except (RefreshError, ValueError) as ex:
487-
if self.private_key:
488-
raise AccessDenied(
489-
f"The service account credentials are not valid: {ex}"
490-
)
491-
else:
492-
raise AccessDenied(
493-
"The credentials have been revoked or expired, "
494-
f"please re-run the application to re-authorize: {ex}"
495-
)
496-
except self.http_error as ex:
497-
self.process_http_error(ex)
498-
499-
job_id = query_reply.job_id
500-
logger.debug("Job ID: %s" % job_id)
501-
502-
timeout_ms = job_config.get("jobTimeoutMs") or job_config["query"].get(
503-
"timeoutMs"
504-
)
416+
timeout_ms = job_config_dict.get("jobTimeoutMs") or job_config_dict[
417+
"query"
418+
].get("timeoutMs")
505419
timeout_ms = int(timeout_ms) if timeout_ms else None
506-
self._wait_for_query_job(query_reply, timeout_ms)
507420

508-
if query_reply.cache_hit:
509-
logger.debug("Query done.\nCache hit.\n")
510-
else:
511-
bytes_processed = query_reply.total_bytes_processed or 0
512-
bytes_billed = query_reply.total_bytes_billed or 0
513-
logger.debug(
514-
"Query done.\nProcessed: {} Billed: {}".format(
515-
self.sizeof_fmt(bytes_processed),
516-
self.sizeof_fmt(bytes_billed),
517-
)
518-
)
519-
logger.debug(
520-
"Standard price: ${:,.2f} USD\n".format(
521-
bytes_billed * self.query_price_for_TB
522-
)
523-
)
421+
self._start_timer()
422+
job_config = bigquery.QueryJobConfig.from_api_repr(job_config_dict)
423+
rows_iter = pandas_gbq.query.query_and_wait(
424+
self,
425+
self.client,
426+
query,
427+
location=self.location,
428+
project_id=self.project_id,
429+
job_config=job_config,
430+
max_results=max_results,
431+
timeout_ms=timeout_ms,
432+
)
524433

525434
dtypes = kwargs.get("dtypes")
526-
527-
# Ensure destination is populated.
528-
try:
529-
query_reply.result()
530-
except self.http_error as ex:
531-
self.process_http_error(ex)
532-
533-
# Avoid attempting to download results from DML queries, which have no
534-
# destination.
535-
if query_reply.destination is None:
536-
return pandas.DataFrame()
537-
538-
rows_iter = self.client.list_rows(
539-
query_reply.destination, max_results=max_results
540-
)
541435
return self._download_results(
542436
rows_iter,
543437
max_results=max_results,

pandas_gbq/query.py

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
# Copyright (c) 2017 pandas-gbq Authors All rights reserved.
2+
# Use of this source code is governed by a BSD-style
3+
# license that can be found in the LICENSE file.
4+
5+
from __future__ import annotations
6+
7+
import concurrent.futures
8+
import logging
9+
from typing import Optional
10+
11+
from google.cloud import bigquery
12+
13+
import pandas_gbq.exceptions
14+
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
# On-demand BQ Queries costs $6.25 per TB. First 1 TB per month is free
20+
# see here for more: https://cloud.google.com/bigquery/pricing
21+
QUERY_PRICE_FOR_TB = 6.25 / 2**40 # USD/TB
22+
23+
24+
# http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size
25+
def sizeof_fmt(num, suffix="B"):
26+
fmt = "%3.1f %s%s"
27+
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
28+
if abs(num) < 1024.0:
29+
return fmt % (num, unit, suffix)
30+
num /= 1024.0
31+
return fmt % (num, "Y", suffix)
32+
33+
34+
def _wait_for_query_job(
35+
connector,
36+
client: bigquery.Client,
37+
query_reply: bigquery.QueryJob,
38+
timeout_ms: Optional[float],
39+
):
40+
"""Wait for query to complete, pausing occasionally to update progress.
41+
42+
Args:
43+
query_reply (QueryJob):
44+
A query job which has started.
45+
46+
timeout_ms (Optional[int]):
47+
How long to wait before cancelling the query.
48+
"""
49+
# Wait at most 10 seconds so we can show progress.
50+
# TODO(https://github.com/googleapis/python-bigquery-pandas/issues/327):
51+
# Include a tqdm progress bar here instead of a stream of log messages.
52+
timeout_sec = 10.0
53+
if timeout_ms:
54+
timeout_sec = min(timeout_sec, timeout_ms / 1000.0)
55+
56+
while query_reply.state != "DONE":
57+
connector.log_elapsed_seconds(" Elapsed", "s. Waiting...")
58+
59+
if timeout_ms and timeout_ms < connector.get_elapsed_seconds() * 1000:
60+
client.cancel_job(query_reply.job_id, location=query_reply.location)
61+
raise pandas_gbq.exceptions.QueryTimeout(
62+
"Query timeout: {} ms".format(timeout_ms)
63+
)
64+
65+
try:
66+
query_reply.result(timeout=timeout_sec)
67+
except concurrent.futures.TimeoutError:
68+
# Use our own timeout logic
69+
pass
70+
except connector.http_error as ex:
71+
connector.process_http_error(ex)
72+
73+
74+
def query_and_wait(
75+
connector,
76+
client: bigquery.Client,
77+
query: str,
78+
*,
79+
job_config: bigquery.QueryJobConfig,
80+
location: Optional[str],
81+
project_id: Optional[str],
82+
max_results: Optional[int],
83+
timeout_ms: Optional[int],
84+
):
85+
from google.auth.exceptions import RefreshError
86+
87+
try:
88+
logger.debug("Requesting query... ")
89+
query_reply = client.query(
90+
query,
91+
job_config=job_config,
92+
location=location,
93+
project=project_id,
94+
)
95+
logger.debug("Query running...")
96+
except (RefreshError, ValueError) as ex:
97+
if connector.private_key:
98+
raise pandas_gbq.exceptions.AccessDenied(
99+
f"The service account credentials are not valid: {ex}"
100+
)
101+
else:
102+
raise pandas_gbq.exceptions.AccessDenied(
103+
"The credentials have been revoked or expired, "
104+
f"please re-run the application to re-authorize: {ex}"
105+
)
106+
except connector.http_error as ex:
107+
connector.process_http_error(ex)
108+
109+
job_id = query_reply.job_id
110+
logger.debug("Job ID: %s" % job_id)
111+
112+
_wait_for_query_job(connector, connector.client, query_reply, timeout_ms)
113+
114+
if query_reply.cache_hit:
115+
logger.debug("Query done.\nCache hit.\n")
116+
else:
117+
bytes_processed = query_reply.total_bytes_processed or 0
118+
bytes_billed = query_reply.total_bytes_billed or 0
119+
logger.debug(
120+
"Query done.\nProcessed: {} Billed: {}".format(
121+
sizeof_fmt(bytes_processed),
122+
sizeof_fmt(bytes_billed),
123+
)
124+
)
125+
logger.debug(
126+
"Standard price: ${:,.2f} USD\n".format(bytes_billed * QUERY_PRICE_FOR_TB)
127+
)
128+
129+
# As of google-cloud-bigquery 2.3.0, QueryJob.result() uses
130+
# getQueryResults() instead of tabledata.list, which returns the correct
131+
# response with DML/DDL queries.
132+
try:
133+
return query_reply.result(max_results=max_results)
134+
except connector.http_error as ex:
135+
connector.process_http_error(ex)

setup.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,6 @@
3434
"google-api-core >= 2.10.2, <3.0.0dev",
3535
"google-auth >=2.13.0",
3636
"google-auth-oauthlib >=0.7.0",
37-
# Require 1.27.* because it has a fix for out-of-bounds timestamps. See:
38-
# https://github.com/googleapis/python-bigquery/pull/209 and
39-
# https://github.com/googleapis/python-bigquery-pandas/issues/365
40-
# Exclude 2.4.* because it has a bug where waiting for the query can hang
41-
# indefinitely. https://github.com/pydata/pandas-gbq/issues/343
4237
"google-cloud-bigquery >=3.3.5,<4.0.0dev,!=2.4.*",
4338
"google-cloud-bigquery-storage >=2.16.2,<3.0.0dev",
4439
"packaging >=20.0.0",

0 commit comments

Comments
 (0)