@@ -95,8 +95,10 @@ struct ReadStream
9595int16 ios_in_progress ;
9696int16 queue_size ;
9797int16 max_pinned_buffers ;
98+ int16 forwarded_buffers ;
9899int16 pinned_buffers ;
99100int16 distance ;
101+ int16 initialized_buffers ;
100102bool advice_enabled ;
101103bool temporary ;
102104
@@ -224,8 +226,10 @@ static bool
224226read_stream_start_pending_read (ReadStream * stream )
225227{
226228bool need_wait ;
229+ int requested_nblocks ;
227230int nblocks ;
228231int flags ;
232+ int forwarded ;
229233int16 io_index ;
230234int16 overflow ;
231235int16 buffer_index ;
@@ -272,11 +276,21 @@ read_stream_start_pending_read(ReadStream *stream)
272276}
273277}
274278
275- /* How many more buffers is this backend allowed? */
279+ /*
280+ * How many more buffers is this backend allowed?
281+ *
282+ * Forwarded buffers are already pinned and map to the leading blocks of
283+ * the pending read (the remaining portion of an earlier short read that
284+ * we're about to continue). They are not counted in pinned_buffers, but
285+ * they are counted as pins already held by this backend according to the
286+ * buffer manager, so they must be added to the limit it grants us.
287+ */
276288if (stream -> temporary )
277289buffer_limit = Min (GetAdditionalLocalPinLimit (), PG_INT16_MAX );
278290else
279291buffer_limit = Min (GetAdditionalPinLimit (), PG_INT16_MAX );
292+ Assert (stream -> forwarded_buffers <= stream -> pending_read_nblocks );
293+ buffer_limit += stream -> forwarded_buffers ;
280294if (buffer_limit == 0 && stream -> pinned_buffers == 0 )
281295buffer_limit = 1 ;/* guarantee progress */
282296
@@ -301,10 +315,16 @@ read_stream_start_pending_read(ReadStream *stream)
301315
302316/*
303317 * We say how many blocks we want to read, but it may be smaller on return
304- * if the buffer manager decides to shorten the read.
318+ * if the buffer manager decides to shorten the read. Initialize buffers
319+ * to InvalidBuffer (= not a forwarded buffer) as input on first use only,
320+ * and keep the original nblocks number so we can check for forwarded
321+ * buffers as output, below.
305322 */
306323buffer_index = stream -> next_buffer_index ;
307324io_index = stream -> next_io_index ;
325+ while (stream -> initialized_buffers < buffer_index + nblocks )
326+ stream -> buffers [stream -> initialized_buffers ++ ] = InvalidBuffer ;
327+ requested_nblocks = nblocks ;
308328need_wait = StartReadBuffers (& stream -> ios [io_index ].op ,
309329 & stream -> buffers [buffer_index ],
310330 stream -> pending_read_blocknum ,
@@ -333,16 +353,35 @@ read_stream_start_pending_read(ReadStream *stream)
333353stream -> seq_blocknum = stream -> pending_read_blocknum + nblocks ;
334354}
335355
356+ /*
357+ * How many pins were acquired but forwarded to the next call? These need
358+ * to be passed to the next StartReadBuffers() call by leaving them
359+ * exactly where they are in the queue, or released if the stream ends
360+ * early. We need the number for accounting purposes, since they are not
361+ * counted in stream->pinned_buffers but we already hold them.
362+ */
363+ forwarded = 0 ;
364+ while (nblocks + forwarded < requested_nblocks &&
365+ stream -> buffers [buffer_index + nblocks + forwarded ] != InvalidBuffer )
366+ forwarded ++ ;
367+ stream -> forwarded_buffers = forwarded ;
368+
336369/*
337370 * We gave a contiguous range of buffer space to StartReadBuffers(), but
338- * we want it to wrap around at queue_size. Slide overflowing buffers to
339- * the front of the array.
371+ * we want it to wrap around at queue_size. Copy overflowing buffers to
372+ * the front of the array where they'll be consumed, but also leave a copy
373+ * in the overflow zone which the I/O operation has a pointer to (it needs
374+ * a contiguous array). Both copies will be cleared when the buffers are
375+ * handed to the consumer.
340376 */
341- overflow = (buffer_index + nblocks ) - stream -> queue_size ;
377+ overflow = (buffer_index + nblocks + forwarded ) - stream -> queue_size ;
342378if (overflow > 0 )
343- memmove (& stream -> buffers [0 ],
344- & stream -> buffers [stream -> queue_size ],
345- sizeof (stream -> buffers [0 ]) * overflow );
379+ {
380+ Assert (overflow < stream -> queue_size );/* can't overlap */
381+ memcpy (& stream -> buffers [0 ],
382+ & stream -> buffers [stream -> queue_size ],
383+ sizeof (stream -> buffers [0 ]) * overflow );
384+ }
346385
347386/* Compute location of start of next read, without using % operator. */
348387buffer_index += nblocks ;
@@ -719,10 +758,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
719758
720759/* Fast path assumptions. */
721760Assert (stream -> ios_in_progress == 0 );
761+ Assert (stream -> forwarded_buffers == 0 );
722762Assert (stream -> pinned_buffers == 1 );
723763Assert (stream -> distance == 1 );
724764Assert (stream -> pending_read_nblocks == 0 );
725765Assert (stream -> per_buffer_data_size == 0 );
766+ Assert (stream -> initialized_buffers > stream -> oldest_buffer_index );
726767
727768/* We're going to return the buffer we pinned last time. */
728769oldest_buffer_index = stream -> oldest_buffer_index ;
@@ -771,6 +812,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
771812stream -> distance = 0 ;
772813stream -> oldest_buffer_index = stream -> next_buffer_index ;
773814stream -> pinned_buffers = 0 ;
815+ stream -> buffers [oldest_buffer_index ] = InvalidBuffer ;
774816}
775817
776818stream -> fast_path = false;
@@ -846,10 +888,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
846888stream -> seq_until_processed = InvalidBlockNumber ;
847889}
848890
849- #ifdef CLOBBER_FREED_MEMORY
850- /* Clobber old buffer for debugging purposes. */
891+ /*
892+ * We must zap this queue entry, or else it would appear as a forwarded
893+ * buffer. If it's potentially in the overflow zone (ie from a
894+ * multi-block I/O that wrapped around the queue), also zap the copy.
895+ */
851896stream -> buffers [oldest_buffer_index ] = InvalidBuffer ;
852- #endif
897+ if (oldest_buffer_index < stream -> io_combine_limit - 1 )
898+ stream -> buffers [stream -> queue_size + oldest_buffer_index ] =
899+ InvalidBuffer ;
853900
854901#if defined(CLOBBER_FREED_MEMORY ) || defined(USE_VALGRIND )
855902
@@ -894,6 +941,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
894941#ifndef READ_STREAM_DISABLE_FAST_PATH
895942/* See if we can take the fast path for all-cached scans next time. */
896943if (stream -> ios_in_progress == 0 &&
944+ stream -> forwarded_buffers == 0 &&
897945stream -> pinned_buffers == 1 &&
898946stream -> distance == 1 &&
899947stream -> pending_read_nblocks == 0 &&
@@ -929,6 +977,7 @@ read_stream_next_block(ReadStream *stream, BufferAccessStrategy *strategy)
929977void
930978read_stream_reset (ReadStream * stream )
931979{
980+ int16 index ;
932981Buffer buffer ;
933982
934983/* Stop looking ahead. */
@@ -942,6 +991,24 @@ read_stream_reset(ReadStream *stream)
942991while ((buffer = read_stream_next_buffer (stream , NULL )) != InvalidBuffer )
943992ReleaseBuffer (buffer );
944993
994+ /* Unpin any unused forwarded buffers. */
995+ index = stream -> next_buffer_index ;
996+ while (index < stream -> initialized_buffers &&
997+ (buffer = stream -> buffers [index ]) != InvalidBuffer )
998+ {
999+ Assert (stream -> forwarded_buffers > 0 );
1000+ stream -> forwarded_buffers -- ;
1001+ ReleaseBuffer (buffer );
1002+
1003+ stream -> buffers [index ] = InvalidBuffer ;
1004+ if (index < stream -> io_combine_limit - 1 )
1005+ stream -> buffers [stream -> queue_size + index ] = InvalidBuffer ;
1006+
1007+ if (++ index == stream -> queue_size )
1008+ index = 0 ;
1009+ }
1010+
1011+ Assert (stream -> forwarded_buffers == 0 );
9451012Assert (stream -> pinned_buffers == 0 );
9461013Assert (stream -> ios_in_progress == 0 );
9471014
0 commit comments