Skip to content
27 changes: 25 additions & 2 deletions google/cloud/bigquery/_job_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from google.cloud.bigquery import job
import google.cloud.bigquery.query
from google.cloud.bigquery import table
import google.cloud.bigquery.retry
from google.cloud.bigquery.retry import POLLING_DEFAULT_VALUE

# Avoid circular imports
Expand Down Expand Up @@ -142,12 +143,28 @@ def do_query():
raise create_exc

try:
# Sometimes we get a 404 after a Conflict. In this case, we
# have pretty high confidence that by retrying the 404, we'll
# (hopefully) eventually recover the job.
# https://github.com/googleapis/python-bigquery/issues/2134
#
# Allow users who want to completely disable retries to
# continue to do so by setting retry to None.
get_job_retry = retry
if retry is not None:
# TODO(tswast): Amend the user's retry object with allowing
# 404 to retry when there's a public way to do so.
# https://github.com/googleapis/python-api-core/issues/796
get_job_retry = (
google.cloud.bigquery.retry._DEFAULT_GET_JOB_CONFLICT_RETRY
)

query_job = client.get_job(
job_id,
project=project,
location=location,
retry=retry,
timeout=timeout,
retry=get_job_retry,
timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT,
)
except core_exceptions.GoogleAPIError: # (includes RetryError)
raise
Expand All @@ -156,7 +173,13 @@ def do_query():
else:
return query_job

# Allow users who want to completely disable retries to
# continue to do so by setting job_retry to None.
if job_retry is not None:
do_query = google.cloud.bigquery.retry._DEFAULT_QUERY_JOB_INSERT_RETRY(do_query)

future = do_query()

# The future might be in a failed state now, but if it's
# unrecoverable, we'll find out when we ask for it's result, at which
# point, we may retry.
Expand Down
54 changes: 54 additions & 0 deletions google/cloud/bigquery/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,32 @@ def _should_retry(exc):
pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``.
"""


def _should_retry_get_job_conflict(exc):
"""Predicate for determining when to retry a jobs.get call after a conflict error.

Sometimes we get a 404 after a Conflict. In this case, we
have pretty high confidence that by retrying the 404, we'll
(hopefully) eventually recover the job.
https://github.com/googleapis/python-bigquery/issues/2134

Note: we may be able to extend this to user-specified predicates
after https://github.com/googleapis/python-api-core/issues/796
to tweak existing Retry object predicates.
"""
return isinstance(exc, exceptions.NotFound) or _should_retry(exc)


# Pick a deadline smaller than our other deadlines since we want to timeout
# before those expire.
_DEFAULT_GET_JOB_CONFLICT_DEADLINE = _DEFAULT_RETRY_DEADLINE / 3.0
_DEFAULT_GET_JOB_CONFLICT_RETRY = retry.Retry(
predicate=_should_retry_get_job_conflict,
deadline=_DEFAULT_GET_JOB_CONFLICT_DEADLINE,
)
"""Private, may be removed in future."""


# Note: Take care when updating DEFAULT_TIMEOUT to anything but None. We
# briefly had a default timeout, but even setting it at more than twice the
# theoretical server-side default timeout of 2 minutes was not enough for
Expand Down Expand Up @@ -142,6 +168,34 @@ def _job_should_retry(exc):
The default job retry object.
"""


def _query_job_insert_should_retry(exc):
# Per https://github.com/googleapis/python-bigquery/issues/2134, sometimes
# we get a 404 error. In this case, if we get this far, assume that the job
# doesn't actually exist and try again. We can't add 404 to the default
# job_retry because that happens for errors like "this table does not
# exist", which probably won't resolve with a retry.
if isinstance(exc, exceptions.RetryError):
exc = exc.cause

if isinstance(exc, exceptions.NotFound):
message = exc.message
# Don't try to retry table/dataset not found, just job not found.
# The URL contains jobs, so use whitespace to disambiguate.
return message is not None and " job" in message.lower()

return _job_should_retry(exc)


_DEFAULT_QUERY_JOB_INSERT_RETRY = retry.Retry(
predicate=_query_job_insert_should_retry,
# jobs.insert doesn't wait for the job to complete, so we don't need the
# long _DEFAULT_JOB_DEADLINE for this part.
deadline=_DEFAULT_RETRY_DEADLINE,
)
"""Private, may be removed in future."""


DEFAULT_GET_JOB_TIMEOUT = 128
"""
Default timeout for Client.get_job().
Expand Down
174 changes: 173 additions & 1 deletion tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
from unittest import mock
import warnings

import requests
import freezegun
import packaging
import pytest
import requests

import google.api


try:
Expand All @@ -55,6 +58,8 @@
import google.cloud._helpers
from google.cloud import bigquery

from google.cloud.bigquery import job as bqjob
import google.cloud.bigquery._job_helpers
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery import exceptions
from google.cloud.bigquery import ParquetOptions
Expand Down Expand Up @@ -5308,6 +5313,173 @@ def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_fails(self):
with pytest.raises(DataLoss, match="we lost your job, sorry"):
client.query("SELECT 1;", job_id=None)

def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_fails_no_retries(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you help me understand what this test is for? It feels like we are just verifying that QueryJob._begin() and client.get_job() are called.

from google.api_core.exceptions import Conflict
from google.api_core.exceptions import DataLoss
from google.cloud.bigquery.job import QueryJob

creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

job_create_error = Conflict("Job already exists.")
job_begin_patcher = mock.patch.object(
QueryJob, "_begin", side_effect=job_create_error
)
get_job_patcher = mock.patch.object(
client, "get_job", side_effect=DataLoss("we lost your job, sorry")
)

with job_begin_patcher, get_job_patcher:
# If get job request fails but supposedly there does exist a job
# with this ID already, raise the exception explaining why we
# couldn't recover the job.
with pytest.raises(DataLoss, match="we lost your job, sorry"):
client.query(
"SELECT 1;",
job_id=None,
# Explicitly test with no retries to make sure those branches are covered.
retry=None,
job_retry=None,
)

def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_retries_404(self):
"""Regression test for https://github.com/googleapis/python-bigquery/issues/2134

Sometimes after a Conflict, the fetch fails with a 404, but we know
because of the conflict that really the job does exist. Retry until we
get the job status (or timeout).
"""
job_id = "abc123"
creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)
conn = client._connection = make_connection(
# We're mocking QueryJob._begin, so this is only going to be
# jobs.get requests and responses.
google.api_core.exceptions.TooManyRequests("this is retriable by default"),
google.api_core.exceptions.NotFound("we lost your job"),
google.api_core.exceptions.NotFound("we lost your job again, sorry"),
{
"jobReference": {
"projectId": self.PROJECT,
"location": "TESTLOC",
"jobId": job_id,
}
},
)

job_create_error = google.api_core.exceptions.Conflict("Job already exists.")
job_begin_patcher = mock.patch.object(
bqjob.QueryJob, "_begin", side_effect=job_create_error
)
job_id_patcher = mock.patch.object(
google.cloud.bigquery._job_helpers,
"make_job_id",
return_value=job_id,
)

with job_begin_patcher, job_id_patcher:
# If get job request fails there does exist a job
# with this ID already, retry 404 until we get it (or fails for a
# non-retriable reason, see other tests).
result = client.query("SELECT 1;", job_id=None)

jobs_get_path = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/jobs/{job_id}",
query_params={
"projection": "full",
},
timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT,
)
conn.api_request.assert_has_calls(
# Double-check that it was jobs.get that was called for each of our
# mocked responses.
[jobs_get_path]
* 4,
)
assert result.job_id == job_id

def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_retries_404_and_query_job_insert(
self,
):
"""Regression test for https://github.com/googleapis/python-bigquery/issues/2134

Sometimes after a Conflict, the fetch fails with a 404. If it keeps
failing with a 404, assume that the job actually doesn't exist.
"""
job_id_1 = "abc123"
job_id_2 = "xyz789"
creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

# We're mocking QueryJob._begin, so that the connection should only get
# jobs.get requests.
job_create_error = google.api_core.exceptions.Conflict("Job already exists.")
job_begin_patcher = mock.patch.object(
bqjob.QueryJob, "_begin", side_effect=job_create_error
)
conn = client._connection = make_connection(
google.api_core.exceptions.NotFound("we lost your job again, sorry"),
{
"jobReference": {
"projectId": self.PROJECT,
"location": "TESTLOC",
"jobId": job_id_2,
}
},
)

# Choose a small deadline so the 404 retries give up.
retry = (
google.cloud.bigquery.retry._DEFAULT_GET_JOB_CONFLICT_RETRY.with_deadline(1)
)
job_id_patcher = mock.patch.object(
google.cloud.bigquery._job_helpers,
"make_job_id",
side_effect=[job_id_1, job_id_2],
)
retry_patcher = mock.patch.object(
google.cloud.bigquery.retry,
"_DEFAULT_GET_JOB_CONFLICT_RETRY",
retry,
)

with freezegun.freeze_time(
"2025-01-01 00:00:00",
# 10x the retry deadline to guarantee a timeout.
auto_tick_seconds=10,
), job_begin_patcher, job_id_patcher, retry_patcher:
# If get job request fails there does exist a job
# with this ID already, retry 404 until we get it (or fails for a
# non-retriable reason, see other tests).
result = client.query("SELECT 1;", job_id=None)

jobs_get_path_1 = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/jobs/{job_id_1}",
query_params={
"projection": "full",
},
timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT,
)
jobs_get_path_2 = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/jobs/{job_id_2}",
query_params={
"projection": "full",
},
timeout=google.cloud.bigquery.retry.DEFAULT_GET_JOB_TIMEOUT,
)
conn.api_request.assert_has_calls(
# Double-check that it was jobs.get that was called for each of our
# mocked responses.
[jobs_get_path_1, jobs_get_path_2],
)
assert result.job_id == job_id_2

def test_query_job_rpc_fail_w_conflict_random_id_job_fetch_succeeds(self):
from google.api_core.exceptions import Conflict
from google.cloud.bigquery.job import QueryJob
Expand Down