- Notifications
You must be signed in to change notification settings - Fork 322
Description
I know BigQuery jobs are asynchronous by default. However, I am struggling to make my datapipeline async end-to-end.
Looking at this JS example, I thought it would be the most Pythonic to make a BigQuery job awaitable. However, I can't get that to work in Python i.e. errors when await client.query(query)
. Looking at the source code, I don't see which method returns an awaitable object.
I have little experience in writing async Python code and found this example that wraps jobs in a async def coroutine
.
class BQApi(object): def __init__(self): self.api = bigquery.Client.from_service_account_json(BQ_CONFIG["credentials"]) async def exec_query(self, query, **kwargs) -> bigquery.table.RowIterator: job = self.api.query(query, **kwargs) task = asyncio.create_task(self.coroutine_job(job)) return await task @staticmethod async def coroutine_job(job): return job.result()
The google.api_core.operation.Operation
shows how to use add_done_callback
to asynchronously wait for long-running operations. I have tried that, but the following yields AttributeError: 'QueryJob' object has no attribute '_condition'
:
from concurrent.futures import ThreadPoolExecutor, as_completed query1 = 'SELECT 1' query2 = 'SELECT 2' def my_callback(future): result = future.result() operations = [bq.query(query1), bq.query(query2)] [operation.add_done_callback(my_callback) for operation in operations] results2 = [] for future in as_completed(operations): results2.append(list(future.result()))
Given that jobs are already asynchronous, would it make sense to add a method that returns an awaitable?
Or am I missing something and is there an Pythonic way to use the BigQuery client with the async/await pattern?