| 
 | 1 | +# Copyright 2019 Google LLC  | 
 | 2 | +#  | 
 | 3 | +# Licensed under the Apache License, Version 2.0 (the "License");  | 
 | 4 | +# you may not use this file except in compliance with the License.  | 
 | 5 | +# You may obtain a copy of the License at  | 
 | 6 | +#  | 
 | 7 | +# https://www.apache.org/licenses/LICENSE-2.0  | 
 | 8 | +#  | 
 | 9 | +# Unless required by applicable law or agreed to in writing, software  | 
 | 10 | +# distributed under the License is distributed on an "AS IS" BASIS,  | 
 | 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
 | 12 | +# See the License for the specific language governing permissions and  | 
 | 13 | +# limitations under the License.  | 
 | 14 | + | 
 | 15 | +import argparse  | 
 | 16 | + | 
 | 17 | + | 
 | 18 | + | 
 | 19 | +def main(project_id='your-project-id', snapshot_millis=0):  | 
 | 20 | + # [START bigquerystorage_quickstart]  | 
 | 21 | + from google.cloud import bigquery_storage_v1beta1  | 
 | 22 | + | 
 | 23 | + | 
 | 24 | + # TODO(developer): Set the project_id variable.  | 
 | 25 | + # project_id = 'your-project-id'  | 
 | 26 | + #  | 
 | 27 | + # The read session is created in this project. This project can be  | 
 | 28 | + # different from that which contains the table.  | 
 | 29 | + | 
 | 30 | + client = bigquery_storage_v1beta1.BigQueryStorageClient()  | 
 | 31 | + | 
 | 32 | + # This example reads baby name data from the public datasets.  | 
 | 33 | + table_ref = bigquery_storage_v1beta1.types.TableReference()  | 
 | 34 | + table_ref.project_id = "bigquery-public-data"  | 
 | 35 | + table_ref.dataset_id = "usa_names"  | 
 | 36 | + table_ref.table_id = "usa_1910_current"  | 
 | 37 | + | 
 | 38 | + # We limit the output columns to a subset of those allowed in the table,  | 
 | 39 | + # and set a simple filter to only report names from the state of  | 
 | 40 | + # Washington (WA).  | 
 | 41 | + read_options = bigquery_storage_v1beta1.types.TableReadOptions()  | 
 | 42 | + read_options.selected_fields.append("name")  | 
 | 43 | + read_options.selected_fields.append("number")  | 
 | 44 | + read_options.selected_fields.append("state")  | 
 | 45 | + read_options.row_restriction = 'state = "WA"'  | 
 | 46 | + | 
 | 47 | + # Set a snapshot time if it's been specified.  | 
 | 48 | + modifiers = None  | 
 | 49 | + if snapshot_millis > 0:  | 
 | 50 | + modifiers = bigquery_storage_v1beta1.types.TableModifiers()  | 
 | 51 | + modifiers.snapshot_time.FromMilliseconds(snapshot_millis)  | 
 | 52 | + | 
 | 53 | + parent = "projects/{}".format(project_id)  | 
 | 54 | + session = client.create_read_session(  | 
 | 55 | + table_ref,  | 
 | 56 | + parent,  | 
 | 57 | + table_modifiers=modifiers,  | 
 | 58 | + read_options=read_options) # API request.  | 
 | 59 | + | 
 | 60 | + # We'll use only a single stream for reading data from the table. Because  | 
 | 61 | + # of dynamic sharding, this will yield all the rows in the table. However,  | 
 | 62 | + # if you wanted to fan out multiple readers you could do so by having a  | 
 | 63 | + # reader process each individual stream.  | 
 | 64 | + reader = client.read_rows(  | 
 | 65 | + bigquery_storage_v1beta1.types.StreamPosition(  | 
 | 66 | + stream=session.streams[0],  | 
 | 67 | + )  | 
 | 68 | + )  | 
 | 69 | + | 
 | 70 | + # The read stream contains blocks of Avro-encoded bytes. The rows() method  | 
 | 71 | + # uses the fastavro library to parse these blocks as an interable of Python  | 
 | 72 | + # dictionaries. Install fastavro with the following command:  | 
 | 73 | + #  | 
 | 74 | + # pip install google-cloud-bigquery-storage[fastavro]  | 
 | 75 | + rows = reader.rows(session)  | 
 | 76 | + | 
 | 77 | + # Do any local processing by iterating over the rows. The  | 
 | 78 | + # google-cloud-bigquery-storage client reconnects to the API after any  | 
 | 79 | + # transient network errors or timeouts.  | 
 | 80 | + names = set()  | 
 | 81 | + states = set()  | 
 | 82 | + | 
 | 83 | + for row in rows:  | 
 | 84 | + names.add(row["name"])  | 
 | 85 | + states.add(row["state"])  | 
 | 86 | + | 
 | 87 | + print("Got {} unique names in states: {}".format(len(names), states))  | 
 | 88 | + # [END bigquerystorage_quickstart]  | 
 | 89 | + | 
 | 90 | + | 
 | 91 | +if __name__ == "__main__":  | 
 | 92 | + parser = argparse.ArgumentParser()  | 
 | 93 | + parser.add_argument('project_id')  | 
 | 94 | + parser.add_argument('--snapshot_millis', default=0, type=int)  | 
 | 95 | + args = parser.parse_args()  | 
 | 96 | + main(project_id=args.project_id)  | 
0 commit comments