Skip to content

Commit 6290517

Browse files
authored
perf: use the first page a results when query(api_method="QUERY") (#1723)
* perf: use the first page a results when `query(api_method="QUERY")` * add tests * respect max_results with cached page * respect page_size, also avoid bqstorage if almost fully downloaded * skip true test if bqstorage not installed * coverage
1 parent 494f275 commit 6290517

File tree

10 files changed

+468
-33
lines changed

10 files changed

+468
-33
lines changed

google/cloud/bigquery/_job_helpers.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from google.api_core import retry as retries
2323

2424
from google.cloud.bigquery import job
25+
import google.cloud.bigquery.query
2526

2627
# Avoid circular imports
2728
if TYPE_CHECKING: # pragma: NO COVER
@@ -197,14 +198,9 @@ def _to_query_job(
197198
job_complete = query_response.get("jobComplete")
198199
if job_complete:
199200
query_job._properties["status"]["state"] = "DONE"
200-
# TODO: https://github.com/googleapis/python-bigquery/issues/589
201-
# Set the first page of results if job is "complete" and there is
202-
# only 1 page of results. Otherwise, use the existing logic that
203-
# refreshes the job stats.
204-
#
205-
# This also requires updates to `to_dataframe` and the DB API connector
206-
# so that they don't try to read from a destination table if all the
207-
# results are present.
201+
query_job._query_results = google.cloud.bigquery.query._QueryResults(
202+
query_response
203+
)
208204
else:
209205
query_job._properties["status"]["state"] = "PENDING"
210206

google/cloud/bigquery/client.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3862,6 +3862,7 @@ def _list_rows_from_query_results(
38623862
retry: retries.Retry = DEFAULT_RETRY,
38633863
timeout: TimeoutType = DEFAULT_TIMEOUT,
38643864
query_id: Optional[str] = None,
3865+
first_page_response: Optional[Dict[str, Any]] = None,
38653866
) -> RowIterator:
38663867
"""List the rows of a completed query.
38673868
See
@@ -3904,6 +3905,8 @@ def _list_rows_from_query_results(
39043905
query_id (Optional[str]):
39053906
[Preview] ID of a completed query. This ID is auto-generated
39063907
and not guaranteed to be populated.
3908+
first_page_response (Optional[dict]):
3909+
API response for the first page of results (if available).
39073910
Returns:
39083911
google.cloud.bigquery.table.RowIterator:
39093912
Iterator of row data
@@ -3923,6 +3926,11 @@ def _list_rows_from_query_results(
39233926
if start_index is not None:
39243927
params["startIndex"] = start_index
39253928

3929+
# We don't call jobs.query with a page size, so if the user explicitly
3930+
# requests a certain size, invalidate the cache.
3931+
if page_size is not None:
3932+
first_page_response = None
3933+
39263934
params["formatOptions.useInt64Timestamp"] = True
39273935
row_iterator = RowIterator(
39283936
client=self,
@@ -3938,6 +3946,7 @@ def _list_rows_from_query_results(
39383946
location=location,
39393947
job_id=job_id,
39403948
query_id=query_id,
3949+
first_page_response=first_page_response,
39413950
)
39423951
return row_iterator
39433952

google/cloud/bigquery/job/query.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1586,7 +1586,8 @@ def do_get_result():
15861586
# Since the job could already be "done" (e.g. got a finished job
15871587
# via client.get_job), the superclass call to done() might not
15881588
# set the self._query_results cache.
1589-
self._reload_query_results(retry=retry, timeout=timeout)
1589+
if self._query_results is None or not self._query_results.complete:
1590+
self._reload_query_results(retry=retry, timeout=timeout)
15901591

15911592
if retry_do_query is not None and job_retry is not None:
15921593
do_get_result = job_retry(do_get_result)
@@ -1615,6 +1616,15 @@ def do_get_result():
16151616
query_id=self.query_id,
16161617
)
16171618

1619+
# We know that there's at least 1 row, so only treat the response from
1620+
# jobs.getQueryResults / jobs.query as the first page of the
1621+
# RowIterator response if there are any rows in it. This prevents us
1622+
# from stopping the iteration early because we're missing rows and
1623+
# there's no next page token.
1624+
first_page_response = self._query_results._properties
1625+
if "rows" not in first_page_response:
1626+
first_page_response = None
1627+
16181628
rows = self._client._list_rows_from_query_results(
16191629
self.job_id,
16201630
self.location,
@@ -1628,6 +1638,7 @@ def do_get_result():
16281638
retry=retry,
16291639
timeout=timeout,
16301640
query_id=self.query_id,
1641+
first_page_response=first_page_response,
16311642
)
16321643
rows._preserve_order = _contains_order_by(self.query)
16331644
return rows

google/cloud/bigquery/query.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,14 +1005,6 @@ def _set_properties(self, api_response):
10051005
Args:
10061006
api_response (Dict): Response returned from an API call
10071007
"""
1008-
job_id_present = (
1009-
"jobReference" in api_response
1010-
and "jobId" in api_response["jobReference"]
1011-
and "projectId" in api_response["jobReference"]
1012-
)
1013-
if not job_id_present:
1014-
raise ValueError("QueryResult requires a job reference")
1015-
10161008
self._properties.clear()
10171009
self._properties.update(copy.deepcopy(api_response))
10181010

google/cloud/bigquery/table.py

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@
100100
"because the necessary `__from_arrow__` attribute is missing."
101101
)
102102

103+
# How many of the total rows need to be downloaded already for us to skip
104+
# calling the BQ Storage API?
105+
ALMOST_COMPLETELY_CACHED_RATIO = 0.333
106+
103107

104108
def _reference_getter(table):
105109
"""A :class:`~google.cloud.bigquery.table.TableReference` pointing to
@@ -1625,16 +1629,31 @@ def query_id(self) -> Optional[str]:
16251629
"""
16261630
return self._query_id
16271631

1628-
def _is_completely_cached(self):
1632+
def _is_almost_completely_cached(self):
16291633
"""Check if all results are completely cached.
16301634
16311635
This is useful to know, because we can avoid alternative download
16321636
mechanisms.
16331637
"""
1634-
if self._first_page_response is None or self.next_page_token:
1638+
if self._first_page_response is None:
16351639
return False
16361640

1637-
return self._first_page_response.get(self._next_token) is None
1641+
total_cached_rows = len(self._first_page_response.get(self._items_key, []))
1642+
if self.max_results is not None and total_cached_rows >= self.max_results:
1643+
return True
1644+
1645+
if (
1646+
self.next_page_token is None
1647+
and self._first_page_response.get(self._next_token) is None
1648+
):
1649+
return True
1650+
1651+
if self._total_rows is not None:
1652+
almost_completely = self._total_rows * ALMOST_COMPLETELY_CACHED_RATIO
1653+
if total_cached_rows >= almost_completely:
1654+
return True
1655+
1656+
return False
16381657

16391658
def _validate_bqstorage(self, bqstorage_client, create_bqstorage_client):
16401659
"""Returns True if the BigQuery Storage API can be used.
@@ -1647,7 +1666,14 @@ def _validate_bqstorage(self, bqstorage_client, create_bqstorage_client):
16471666
if not using_bqstorage_api:
16481667
return False
16491668

1650-
if self._is_completely_cached():
1669+
if self._table is None:
1670+
return False
1671+
1672+
# The developer is manually paging through results if this is set.
1673+
if self.next_page_token is not None:
1674+
return False
1675+
1676+
if self._is_almost_completely_cached():
16511677
return False
16521678

16531679
if self.max_results is not None:
@@ -1671,7 +1697,15 @@ def _get_next_page_response(self):
16711697
The parsed JSON response of the next page's contents.
16721698
"""
16731699
if self._first_page_response:
1674-
response = self._first_page_response
1700+
rows = self._first_page_response.get(self._items_key, [])[
1701+
: self.max_results
1702+
]
1703+
response = {
1704+
self._items_key: rows,
1705+
}
1706+
if self._next_token in self._first_page_response:
1707+
response[self._next_token] = self._first_page_response[self._next_token]
1708+
16751709
self._first_page_response = None
16761710
return response
16771711

tests/unit/job/test_query.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import requests
2626

2727
from google.cloud.bigquery.client import _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS
28+
import google.cloud.bigquery._job_helpers
2829
import google.cloud.bigquery.query
2930
from google.cloud.bigquery.table import _EmptyRowIterator
3031

@@ -1081,6 +1082,114 @@ def test_result_with_done_job_calls_get_query_results(self):
10811082
timeout=None,
10821083
)
10831084
conn.api_request.assert_has_calls([query_results_call, query_results_page_call])
1085+
assert conn.api_request.call_count == 2
1086+
1087+
def test_result_with_done_jobs_query_response_doesnt_call_get_query_results(self):
1088+
"""With a done result from jobs.query, we don't need to call
1089+
jobs.getQueryResults to wait for the query to finish.
1090+
1091+
jobs.get is still called because there is an assumption that after
1092+
QueryJob.result(), all job metadata is available locally.
1093+
"""
1094+
job_resource = self._make_resource(started=True, ended=True, location="EU")
1095+
conn = make_connection(job_resource)
1096+
client = _make_client(self.PROJECT, connection=conn)
1097+
query_resource_done = {
1098+
"jobComplete": True,
1099+
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
1100+
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
1101+
"rows": [{"f": [{"v": "abc"}]}],
1102+
"totalRows": "1",
1103+
}
1104+
job = google.cloud.bigquery._job_helpers._to_query_job(
1105+
client,
1106+
"SELECT 'abc' AS col1",
1107+
request_config=None,
1108+
query_response=query_resource_done,
1109+
)
1110+
assert job.state == "DONE"
1111+
1112+
result = job.result()
1113+
1114+
rows = list(result)
1115+
self.assertEqual(len(rows), 1)
1116+
self.assertEqual(rows[0].col1, "abc")
1117+
job_path = f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}"
1118+
conn.api_request.assert_called_once_with(
1119+
method="GET",
1120+
path=job_path,
1121+
query_params={},
1122+
timeout=None,
1123+
)
1124+
1125+
def test_result_with_done_jobs_query_response_and_page_size_invalidates_cache(self):
1126+
"""We don't call jobs.query with a page size, so if the user explicitly
1127+
requests a certain size, invalidate the cache.
1128+
"""
1129+
# Arrange
1130+
job_resource = self._make_resource(
1131+
started=True, ended=True, location="asia-northeast1"
1132+
)
1133+
query_resource_done = {
1134+
"jobComplete": True,
1135+
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
1136+
"schema": {"fields": [{"name": "col1", "type": "STRING"}]},
1137+
"rows": [{"f": [{"v": "abc"}]}],
1138+
"pageToken": "initial-page-token-shouldnt-be-used",
1139+
"totalRows": "4",
1140+
}
1141+
query_page_resource = {
1142+
"totalRows": 4,
1143+
"pageToken": "some-page-token",
1144+
"rows": [
1145+
{"f": [{"v": "row1"}]},
1146+
{"f": [{"v": "row2"}]},
1147+
{"f": [{"v": "row3"}]},
1148+
],
1149+
}
1150+
query_page_resource_2 = {"totalRows": 4, "rows": [{"f": [{"v": "row4"}]}]}
1151+
conn = make_connection(job_resource, query_page_resource, query_page_resource_2)
1152+
client = _make_client(self.PROJECT, connection=conn)
1153+
job = google.cloud.bigquery._job_helpers._to_query_job(
1154+
client,
1155+
"SELECT col1 FROM table",
1156+
request_config=None,
1157+
query_response=query_resource_done,
1158+
)
1159+
assert job.state == "DONE"
1160+
1161+
# Act
1162+
result = job.result(page_size=3)
1163+
1164+
# Assert
1165+
actual_rows = list(result)
1166+
self.assertEqual(len(actual_rows), 4)
1167+
1168+
query_results_path = f"/projects/{self.PROJECT}/queries/{self.JOB_ID}"
1169+
query_page_1_call = mock.call(
1170+
method="GET",
1171+
path=query_results_path,
1172+
query_params={
1173+
"maxResults": 3,
1174+
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
1175+
"location": "asia-northeast1",
1176+
"formatOptions.useInt64Timestamp": True,
1177+
},
1178+
timeout=None,
1179+
)
1180+
query_page_2_call = mock.call(
1181+
method="GET",
1182+
path=query_results_path,
1183+
query_params={
1184+
"pageToken": "some-page-token",
1185+
"maxResults": 3,
1186+
"fields": _LIST_ROWS_FROM_QUERY_RESULTS_FIELDS,
1187+
"location": "asia-northeast1",
1188+
"formatOptions.useInt64Timestamp": True,
1189+
},
1190+
timeout=None,
1191+
)
1192+
conn.api_request.assert_has_calls([query_page_1_call, query_page_2_call])
10841193

10851194
def test_result_with_max_results(self):
10861195
from google.cloud.bigquery.table import RowIterator

tests/unit/test_query.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1362,13 +1362,13 @@ def test_errors_present(self):
13621362
self.assertEqual(query.errors, ERRORS)
13631363

13641364
def test_job_id_missing(self):
1365-
with self.assertRaises(ValueError):
1366-
self._make_one({})
1365+
query = self._make_one({})
1366+
self.assertIsNone(query.job_id)
13671367

13681368
def test_job_id_broken_job_reference(self):
13691369
resource = {"jobReference": {"bogus": "BOGUS"}}
1370-
with self.assertRaises(ValueError):
1371-
self._make_one(resource)
1370+
query = self._make_one(resource)
1371+
self.assertIsNone(query.job_id)
13721372

13731373
def test_job_id_present(self):
13741374
resource = self._make_resource()

0 commit comments

Comments
 (0)