|
21 | 21 | import static org.junit.Assert.assertEquals; |
22 | 22 | import static org.junit.Assert.assertNotNull; |
23 | 23 | import static org.junit.Assert.assertNull; |
| 24 | +import static org.junit.Assert.assertTrue; |
24 | 25 |
|
| 26 | +import com.google.api.gax.core.InstantiatingExecutorProvider; |
25 | 27 | import com.google.api.gax.rpc.ServerStream; |
26 | 28 | import com.google.cloud.RetryOption; |
27 | 29 | import com.google.cloud.ServiceOptions; |
|
44 | 46 | import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; |
45 | 47 | import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; |
46 | 48 | import com.google.cloud.bigquery.storage.v1.ReadSession; |
| 49 | +import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings; |
47 | 50 | import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers; |
48 | 51 | import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions; |
49 | 52 | import com.google.cloud.bigquery.storage.v1.ReadStream; |
@@ -806,6 +809,54 @@ public void testStructAndArraySqlTypes() throws InterruptedException, IOExceptio |
806 | 809 | assertEquals(rowAssertMessage, new Utf8("abc"), structRecord.get("str_field")); |
807 | 810 | } |
808 | 811 |
|
| 812 | + @Test |
| 813 | + public void testSimpleReadWithBackgroundExecutorProvider() throws IOException { |
| 814 | + BigQueryReadSettings bigQueryReadSettings = BigQueryReadSettings |
| 815 | + .newBuilder() |
| 816 | + .setBackgroundExecutorProvider( |
| 817 | + InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(14).build()) |
| 818 | + .build(); |
| 819 | + // Overriding the default client |
| 820 | + client = BigQueryReadClient.create(bigQueryReadSettings); |
| 821 | + assertTrue( |
| 822 | + client.getStub().getStubSettings().getBackgroundExecutorProvider() instanceof InstantiatingExecutorProvider); |
| 823 | + assertEquals( |
| 824 | + 14, |
| 825 | + ((InstantiatingExecutorProvider) client.getStub().getStubSettings().getBackgroundExecutorProvider()) |
| 826 | + .getExecutorThreadCount()); |
| 827 | + String table = |
| 828 | + BigQueryResource.FormatTableResource( |
| 829 | + /* projectId = */ "bigquery-public-data", |
| 830 | + /* datasetId = */ "samples", |
| 831 | + /* tableId = */ "shakespeare"); |
| 832 | + |
| 833 | + ReadSession session = |
| 834 | + client.createReadSession( |
| 835 | + /* parent = */ parentProjectId, |
| 836 | + /* readSession = */ ReadSession.newBuilder() |
| 837 | + .setTable(table) |
| 838 | + .setDataFormat(DataFormat.AVRO) |
| 839 | + .build(), |
| 840 | + /* maxStreamCount = */ 1); |
| 841 | + assertEquals( |
| 842 | + String.format( |
| 843 | + "Did not receive expected number of streams for table '%s' CreateReadSession response:%n%s", |
| 844 | + table, session.toString()), |
| 845 | + 1, |
| 846 | + session.getStreamsCount()); |
| 847 | + |
| 848 | + ReadRowsRequest readRowsRequest = |
| 849 | + ReadRowsRequest.newBuilder().setReadStream(session.getStreams(0).getName()).build(); |
| 850 | + |
| 851 | + long rowCount = 0; |
| 852 | + ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest); |
| 853 | + for (ReadRowsResponse response : stream) { |
| 854 | + rowCount += response.getRowCount(); |
| 855 | + } |
| 856 | + |
| 857 | + assertEquals(164_656, rowCount); |
| 858 | + } |
| 859 | + |
809 | 860 | /** |
810 | 861 | * Reads to the specified row offset within the stream. If the stream does not have the desired |
811 | 862 | * rows to read, it will read all of them. |
|
0 commit comments