1818
1919import static com .google .common .base .Preconditions .checkState ;
2020
21+ import com .google .cloud .spanner .ErrorCode ;
2122import com .google .cloud .spanner .ForwardingStructReader ;
2223import com .google .cloud .spanner .ResultSet ;
2324import com .google .cloud .spanner .SpannerException ;
3031import com .google .spanner .v1 .ResultSetStats ;
3132import java .util .ArrayList ;
3233import java .util .List ;
34+ import java .util .concurrent .CountDownLatch ;
3335import java .util .concurrent .ExecutorService ;
3436import java .util .concurrent .Executors ;
3537import java .util .concurrent .LinkedBlockingDeque ;
@@ -47,15 +49,18 @@ static class PartitionExecutor implements Runnable {
4749 private final Connection connection ;
4850 private final String partitionId ;
4951 private final LinkedBlockingDeque <PartitionExecutorResult > queue ;
52+ private final CountDownLatch metadataAvailableLatch ;
5053 private final AtomicBoolean shouldStop = new AtomicBoolean ();
5154
5255 PartitionExecutor (
5356 Connection connection ,
5457 String partitionId ,
55- LinkedBlockingDeque <PartitionExecutorResult > queue ) {
58+ LinkedBlockingDeque <PartitionExecutorResult > queue ,
59+ CountDownLatch metadataAvailableLatch ) {
5660 this .connection = Preconditions .checkNotNull (connection );
5761 this .partitionId = Preconditions .checkNotNull (partitionId );
5862 this .queue = queue ;
63+ this .metadataAvailableLatch = Preconditions .checkNotNull (metadataAvailableLatch );
5964 }
6065
6166 @ Override
@@ -68,6 +73,7 @@ public void run() {
6873 queue .put (
6974 PartitionExecutorResult .dataAndMetadata (
7075 row , resultSet .getType (), resultSet .getMetadata ()));
76+ metadataAvailableLatch .countDown ();
7177 first = false ;
7278 } else {
7379 queue .put (PartitionExecutorResult .data (row ));
@@ -82,9 +88,11 @@ public void run() {
8288 queue .put (
8389 PartitionExecutorResult .typeAndMetadata (
8490 resultSet .getType (), resultSet .getMetadata ()));
91+ metadataAvailableLatch .countDown ();
8592 }
8693 } catch (Throwable exception ) {
8794 putWithoutInterruptPropagation (PartitionExecutorResult .exception (exception ));
95+ metadataAvailableLatch .countDown ();
8896 } finally {
8997 // Emit a special 'finished' result to ensure that the row producer is not blocked on a
9098 // queue that never receives any more results. This ensures that we can safely block on
@@ -215,6 +223,7 @@ private static class RowProducerImpl implements RowProducer {
215223 private final AtomicInteger finishedCounter ;
216224 private final LinkedBlockingDeque <PartitionExecutorResult > queue ;
217225 private ResultSetMetadata metadata ;
226+ private final CountDownLatch metadataAvailableLatch = new CountDownLatch (1 );
218227 private Type type ;
219228 private Struct currentRow ;
220229 private Throwable exception ;
@@ -243,7 +252,7 @@ private static class RowProducerImpl implements RowProducer {
243252 this .finishedCounter = new AtomicInteger (partitions .size ());
244253 for (String partition : partitions ) {
245254 PartitionExecutor partitionExecutor =
246- new PartitionExecutor (connection , partition , this .queue );
255+ new PartitionExecutor (connection , partition , this .queue , this . metadataAvailableLatch );
247256 this .partitionExecutors .add (partitionExecutor );
248257 this .executor .submit (partitionExecutor );
249258 }
@@ -310,8 +319,27 @@ public Struct get() {
310319 return currentRow ;
311320 }
312321
322+ private PartitionExecutorResult getFirstResult () {
323+ try {
324+ metadataAvailableLatch .await ();
325+ } catch (InterruptedException interruptedException ) {
326+ throw SpannerExceptionFactory .propagateInterrupt (interruptedException );
327+ }
328+ PartitionExecutorResult result = queue .peek ();
329+ if (result == null ) {
330+ throw SpannerExceptionFactory .newSpannerException (
331+ ErrorCode .FAILED_PRECONDITION , "Thread-unsafe access to ResultSet" );
332+ }
333+ if (result .exception != null ) {
334+ throw SpannerExceptionFactory .asSpannerException (result .exception );
335+ }
336+ return result ;
337+ }
338+
313339 public ResultSetMetadata getMetadata () {
314- checkState (metadata != null , "next() call required" );
340+ if (metadata == null ) {
341+ return getFirstResult ().metadata ;
342+ }
315343 return metadata ;
316344 }
317345
@@ -326,7 +354,9 @@ public int getParallelism() {
326354 }
327355
328356 public Type getType () {
329- checkState (type != null , "next() call required" );
357+ if (type == null ) {
358+ return getFirstResult ().type ;
359+ }
330360 return type ;
331361 }
332362 }
0 commit comments