|
13 | 13 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 | 14 | # See the License for the specific language governing permissions and |
15 | 15 | # limitations under the License. |
16 | | -"""System tests for reading rows from tables.""" |
17 | | - |
18 | | -import os |
| 16 | +"""System tests for reading rows with pandas connector.""" |
19 | 17 |
|
20 | 18 | import numpy |
21 | 19 | import pyarrow.types |
|
24 | 22 | from google.cloud import bigquery_storage_v1beta1 |
25 | 23 |
|
26 | 24 |
|
27 | | -@pytest.fixture() |
28 | | -def project_id(): |
29 | | - return os.environ["PROJECT_ID"] |
30 | | - |
31 | | - |
32 | | -@pytest.fixture() |
33 | | -def client(): |
34 | | - return bigquery_storage_v1beta1.BigQueryStorageClient() |
35 | | - |
36 | | - |
37 | | -@pytest.fixture() |
38 | | -def table_reference(): |
39 | | - table_ref = bigquery_storage_v1beta1.types.TableReference() |
40 | | - table_ref.project_id = "bigquery-public-data" |
41 | | - table_ref.dataset_id = "usa_names" |
42 | | - table_ref.table_id = "usa_1910_2013" |
43 | | - return table_ref |
44 | | - |
45 | | - |
46 | | -@pytest.fixture() |
47 | | -def small_table_reference(): |
48 | | - table_ref = bigquery_storage_v1beta1.types.TableReference() |
49 | | - table_ref.project_id = "bigquery-public-data" |
50 | | - table_ref.dataset_id = "utility_us" |
51 | | - table_ref.table_id = "country_code_iso" |
52 | | - return table_ref |
53 | | - |
54 | | - |
55 | | -def test_read_rows_full_table(client, project_id, small_table_reference): |
56 | | - session = client.create_read_session( |
57 | | - small_table_reference, "projects/{}".format(project_id), requested_streams=1 |
58 | | - ) |
59 | | - |
60 | | - stream_pos = bigquery_storage_v1beta1.types.StreamPosition( |
61 | | - stream=session.streams[0] |
62 | | - ) |
63 | | - blocks = list(client.read_rows(stream_pos)) |
64 | | - |
65 | | - assert len(blocks) > 0 |
66 | | - block = blocks[0] |
67 | | - assert block.status.estimated_row_count > 0 |
68 | | - assert len(block.avro_rows.serialized_binary_rows) > 0 |
69 | | - |
70 | | - |
71 | 25 | def test_read_rows_to_arrow(client, project_id): |
72 | 26 | table_ref = bigquery_storage_v1beta1.types.TableReference() |
73 | 27 | table_ref.project_id = "bigquery-public-data" |
@@ -102,47 +56,26 @@ def test_read_rows_to_arrow(client, project_id): |
102 | 56 | assert pyarrow.types.is_string(schema.field_by_name("name").type) |
103 | 57 |
|
104 | 58 |
|
105 | | -def test_read_rows_to_dataframe_w_avro(client, project_id): |
106 | | - table_ref = bigquery_storage_v1beta1.types.TableReference() |
107 | | - table_ref.project_id = "bigquery-public-data" |
108 | | - table_ref.dataset_id = "new_york_citibike" |
109 | | - table_ref.table_id = "citibike_stations" |
110 | | - session = client.create_read_session( |
111 | | - table_ref, "projects/{}".format(project_id), requested_streams=1 |
112 | | - ) |
113 | | - schema_type = session.WhichOneof("schema") |
114 | | - assert schema_type == "avro_schema" |
115 | | - |
116 | | - stream_pos = bigquery_storage_v1beta1.types.StreamPosition( |
117 | | - stream=session.streams[0] |
118 | | - ) |
119 | | - |
120 | | - frame = client.read_rows(stream_pos).to_dataframe( |
121 | | - session, dtypes={"latitude": numpy.float16} |
122 | | - ) |
123 | | - |
124 | | - # Station ID is a required field (no nulls), so the datatype should always |
125 | | - # be integer. |
126 | | - assert frame.station_id.dtype.name == "int64" |
127 | | - assert frame.latitude.dtype.name == "float16" |
128 | | - assert frame.longitude.dtype.name == "float64" |
129 | | - assert frame["name"].str.startswith("Central Park").any() |
130 | | - |
131 | | - |
132 | | -def test_read_rows_to_dataframe_w_arrow(client, project_id): |
| 59 | +@pytest.mark.parametrize( |
| 60 | + "data_format,expected_schema_type", |
| 61 | + ( |
| 62 | + (bigquery_storage_v1beta1.enums.DataFormat.AVRO, "avro_schema"), |
| 63 | + (bigquery_storage_v1beta1.enums.DataFormat.ARROW, "arrow_schema"), |
| 64 | + ), |
| 65 | +) |
| 66 | +def test_read_rows_to_dataframe(client, project_id, data_format, expected_schema_type): |
133 | 67 | table_ref = bigquery_storage_v1beta1.types.TableReference() |
134 | 68 | table_ref.project_id = "bigquery-public-data" |
135 | 69 | table_ref.dataset_id = "new_york_citibike" |
136 | 70 | table_ref.table_id = "citibike_stations" |
137 | | - |
138 | 71 | session = client.create_read_session( |
139 | 72 | table_ref, |
140 | 73 | "projects/{}".format(project_id), |
141 | | - format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW, |
| 74 | + format_=data_format, |
142 | 75 | requested_streams=1, |
143 | 76 | ) |
144 | 77 | schema_type = session.WhichOneof("schema") |
145 | | - assert schema_type == "arrow_schema" |
| 78 | + assert schema_type == expected_schema_type |
146 | 79 |
|
147 | 80 | stream_pos = bigquery_storage_v1beta1.types.StreamPosition( |
148 | 81 | stream=session.streams[0] |
|
0 commit comments