Simple Python client for interacting with Google BigQuery.
This client provides an API for retrieving and inserting BigQuery data by wrapping Google's low-level API client library. It also provides facilities that make it convenient to access data that is tied to an App Engine appspot, such as request logs.
pip install bigquery-python
from bigquery import get_client # BigQuery project id as listed in the Google Developers Console. project_id = 'project_id' # Service account email address as listed in the Google Developers Console. service_account = 'my_id_123@developer.gserviceaccount.com' # PKCS12 or PEM key provided by Google. key = 'key.pem' client = get_client(project_id, service_account=service_account, private_key_file=key, readonly=True) # JSON key provided by Google json_key = 'key.json' client = get_client(json_key_file=json_key, readonly=True) # Submit an async query. job_id, _results = client.query('SELECT * FROM dataset.my_table LIMIT 1000') # Check if the query has finished running. complete, row_count = client.check_job(job_id) # Retrieve the results. results = client.get_query_rows(job_id)
The BigQuery client allows you to execute raw queries against a dataset. The query
method inserts a query job into BigQuery. By default, query
method runs asynchronously with 0
for timeout
. When a non-zero timeout value is specified, the job will wait for the results, and throws an exception on timeout.
When you run an async query, you can use the returned job_id
to poll for job status later with check_job
.
# Submit an async query job_id, _results = client.query('SELECT * FROM dataset.my_table LIMIT 1000') # Do other stuffs # Poll for query completion. complete, row_count = client.check_job(job_id) # Retrieve the results. if complete: results = client.get_query_rows(job_id)
You can also specify a non-zero timeout value if you want your query to be synchronous.
# Submit a synchronous query try: _job_id, results = client.query('SELECT * FROM dataset.my_table LIMIT 1000', timeout=10) except BigQueryTimeoutException: print "Timeout"
The query_builder
module provides an API for generating query strings that can be run using the BigQuery client.
from bigquery.query_builder import render_query selects = { 'start_time': { 'alias': 'Timestamp', 'format': 'INTEGER-FORMAT_UTC_USEC' } } conditions = [ { 'field': 'Timestamp', 'type': 'INTEGER', 'comparators': [ { 'condition': '>=', 'negate': False, 'value': 1399478981 } ] } ] grouping = ['Timestamp'] having = [ { 'field': 'Timestamp', 'type': 'INTEGER', 'comparators': [ { 'condition': '==', 'negate': False, 'value': 1399478981 } ] } ] order_by ={'fields': ['Timestamp'], 'direction': 'desc'} query = render_query( 'dataset', ['table'], select=selects, conditions=conditions, groupings=grouping, having=having, order_by=order_by, limit=47 ) job_id, _ = client.query(query)
The BigQuery client provides facilities to manage dataset tables, including creating, deleting, checking the existence, and getting the metadata of tables.
# Create a new table. schema = [ {'name': 'foo', 'type': 'STRING', 'mode': 'nullable'}, {'name': 'bar', 'type': 'FLOAT', 'mode': 'nullable'} ] created = client.create_table('dataset', 'my_table', schema) # Delete an existing table. deleted = client.delete_table('dataset', 'my_table') # Check if a table exists. exists = client.check_table('dataset', 'my_table') # Get a table's full metadata. Includes numRows, numBytes, etc. # See: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables metadata = client.get_table('dataset', 'my_table')
There is also functionality for retrieving tables that are associated with a Google App Engine appspot, assuming table names are in the form of appid_YYYY_MM or YYYY_MM_appid. This allows tables between a date range to be selected and queried on.
# Get appspot tables falling within a start and end time. from datetime import datetime, timedelta range_end = datetime.utcnow() range_start = range_end - timedelta(weeks=12) tables = client.get_tables('dataset', 'appid', range_start, range_end)
The client provides an API for inserting data into a BigQuery table. The last parameter refers to an optional insert id key used to avoid duplicate entries.
# Insert data into table. rows = [ {'one': 'ein', 'two': 'zwei'}, {'id': 'NzAzYmRiY', 'one': 'uno', 'two': 'dos'}, {'id': 'NzAzYmRiY', 'one': 'ein', 'two': 'zwei'} # duplicate entry ] inserted = client.push_rows('dataset', 'table', rows, 'id')
You can write query results directly to table. When either dataset or table parameter is omitted, query result will be written to temporary table.
# write to permanent table job = client.write_to_table('SELECT * FROM dataset.original_table LIMIT 100', 'dataset', 'table') try: job_resource = client.wait_for_job(job, timeout=60) print job_resource except BigQueryTimeoutException: print "Timeout" # write to permanent table with UDF in query string external_udf_uris = ["gs://bigquery-sandbox-udf/url_decode.js"] query = """SELECT requests, title FROM urlDecode( SELECT title, sum(requests) AS num_requests FROM [fh-bigquery:wikipedia.pagecounts_201504] WHERE language = 'fr' GROUP EACH BY title ) WHERE title LIKE '%ç%' ORDER BY requests DESC LIMIT 100 """ job = client.write_to_table( query, 'dataset', 'table', external_udf_uris=external_udf_uris ) try: job_resource = client.wait_for_job(job, timeout=60) print job_resource except BigQueryTimeoutException: print "Timeout" # write to temporary table job = client.write_to_table('SELECT * FROM dataset.original_table LIMIT 100') try: job_resource = client.wait_for_job(job, timeout=60) print job_resource except BigQueryTimeoutException: print "Timeout"
schema = [ {"name": "username", "type": "string", "mode": "nullable"} ] job = client.import_data_from_uris( ['gs://mybucket/mydata.json'], 'dataset', 'table', schema, source_format=JOB_SOURCE_FORMAT_JSON) try: job_resource = client.wait_for_job(job, timeout=60) print job_resource except BigQueryTimeoutException: print "Timeout"
job = client.export_data_to_uris( ['gs://mybucket/mydata.json'], 'dataset', 'table') try: job_resource = client.wait_for_job(job, timeout=60) print job_resource except BigQueryTimeoutException: print "Timeout"
The client provides an API for listing, creating, deleting, updating and patching datasets.
# List datasets datasets = client.get_datasets() # Create dataset dataset = client.create_dataset('mydataset', friendly_name="My Dataset", description="A dataset created by me") # Get dataset client.get_dataset('mydataset') # Delete dataset client.delete_dataset('mydataset') client.delete_dataset('mydataset', delete_contents=True) # delete even if it contains data # Update dataset client.update_dataset('mydataset', friendly_name="mon Dataset") # description is deleted # Patch dataset client.patch_dataset('mydataset', friendly_name="mon Dataset") # friendly_name changed; description is preserved # Check if dataset exists. exists = client.check_dataset('mydataset')
from bigquery import schema_from_record schema_from_record({"id":123, "posts": [{"id":123, "text": "this is a post"}], "username": "bob"})
Requirements to commit here:
- Branch off master, PR back to master.
- Your code should pass Flake8.
- Unit test coverage is required.
- Good docstrs are required.
- Good commit messages are required.