Skip to content

Conversation

@elprans
Copy link
Member

@elprans elprans commented May 7, 2017

This commit adds two new Connection methods: copy_from_table() and
copy_from_query() that allow copying the contents of a table or
the results of a query using PostgreSQL COPY protocol.

Issue #21.

@elprans elprans requested a review from 1st1 May 7, 2017 21:45
@elprans elprans self-assigned this May 7, 2017
@elprans elprans force-pushed the copy branch 2 times, most recently from 196937c to 67ed413 Compare May 7, 2017 22:33
"""Safely inline arguments to query text."""
# Introspect the target query for argument types and
# build a list of safely-quoted fully-qualified type names.
ps = await conn.prepare(query)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not rely on statements GC here, close the ps explicitly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

async def test_copy_from_query_to_sink(self):
with tempfile.NamedTemporaryFile() as f:
async def writer(data):
await asyncio.sleep(0.05, loop=self.loop)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment here that sleep is needed to test the back-pressure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

done = False
while not done:
buffer, done, status_msg = await self._new_waiter(timeout)
if buffer:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe

while True: ... if done: break
if self.state == PROTOCOL_IDLE:
self.state = new_state

elif (self.state == PROTOCOL_SIMPLE_QUERY and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this branch isn't needed.

if isinstance(path, os.PathLike):
path = os.fspath(path)
elif not isinstance(path, (str, bytes)):
path = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's better to raise TypeError and catch it in the code that uses this function?

return await self._protocol.copy_out(copy_stmt, writer, timeout)
finally:
if opened_by_us:
await run_in_executor(None, f.close)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK close() syscall isn't blocking so you don't need to use run_in_executor here.

self.result = buf.consume_messages(b'd')

self._skip_discard = True
if not buf.has_message():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need a comment here, explaining that we either:

  1. have read all messages from the buffer (it had only d messages) and now we need push the read data back;

  2. there are unprocessed messages in the buffer, so we want to let _process__copy_out_data to process them and push the result.

self._ensure_connected()
self._set_state(PROTOCOL_COPY_OUT)

buf = WriteBuffer.new_message(b'Q')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment here that Q is SimpleQuery.

paused = False
try:
while True:
waiter = self._new_waiter(timeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to properly implement timeouts as part of this PR?

'cannot perform operation: another operation is in progress')

cdef _new_waiter(self, timeout):
if self.waiter is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a piece that I added during debugging this PR. I think we should have it, but it should go in a separate commit.

This commit adds two new Connection methods: copy_from_table() and copy_from_query() that allow copying the contents of a table or the results of a query using PostgreSQL COPY protocol. With help from @1st1. Issue #21.
@1st1 1st1 merged commit 5662d9f into master May 10, 2017
@elprans elprans deleted the copy branch July 6, 2017 21:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

3 participants