Skip to content

Commit 2821bf7

Browse files
authored
Merge pull request #446 from ged/scheduler-stream
Fix compatibility of res.stream_each_* methods
2 parents ed5dbc8 + 82724e3 commit 2821bf7

File tree

4 files changed

+20
-1
lines changed

4 files changed

+20
-1
lines changed

ext/pg.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,7 @@ void pg_typemap_compact _(( void * ));
344344

345345
PGconn *pg_get_pgconn _(( VALUE ));
346346
t_pg_connection *pg_get_connection _(( VALUE ));
347+
VALUE pgconn_block _(( int, VALUE *, VALUE ));
347348

348349
VALUE pg_new_result _(( PGresult *, VALUE ));
349350
VALUE pg_new_result_autoclear _(( PGresult *, VALUE ));

ext/pg_connection.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2972,7 +2972,7 @@ get_result_readable(PGconn *conn)
29722972
* If +true+ is returned, +conn.is_busy+ will return +false+
29732973
* and +conn.get_result+ will not block.
29742974
*/
2975-
static VALUE
2975+
VALUE
29762976
pgconn_block( int argc, VALUE *argv, VALUE self ) {
29772977
struct timeval timeout;
29782978
struct timeval *ptimeout = NULL;

ext/pg_result.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1468,6 +1468,11 @@ pgresult_stream_any(VALUE self, void (*yielder)(VALUE, int, int, void*), void* d
14681468

14691469
yielder( self, ntuples, nfields, data );
14701470

1471+
if( gvl_PQisBusy(pgconn) ){
1472+
/* wait for input (without blocking) before reading each result */
1473+
pgconn_block( 0, NULL, this->connection );
1474+
}
1475+
14711476
pgresult = gvl_PQgetResult(pgconn);
14721477
if( pgresult == NULL )
14731478
rb_raise( rb_eNoResultError, "no result received - possibly an intersection with another result retrieval");

spec/pg/scheduler_spec.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,19 @@ def run_with_scheduler(timeout=10)
158158
end
159159
end
160160

161+
it "can use stream_each_* methods" do
162+
run_with_scheduler do |conn|
163+
conn.send_query( "SELECT generate_series(0,999);" )
164+
conn.set_single_row_mode
165+
166+
start_time = Time.now
167+
res = conn.get_result
168+
rows = res.stream_each_row.to_a
169+
170+
expect( rows ).to eq( (0..999).map{ [_1.to_s] } )
171+
end
172+
end
173+
161174
it "can receive COPY data" do
162175
run_with_scheduler do |conn|
163176
rows = []

0 commit comments

Comments
 (0)