- Notifications
You must be signed in to change notification settings - Fork 436
Add support for COPY OUT #132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
196937c to 67ed413 Compare | """Safely inline arguments to query text.""" | ||
| # Introspect the target query for argument types and | ||
| # build a list of safely-quoted fully-qualified type names. | ||
| ps = await conn.prepare(query) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not rely on statements GC here, close the ps explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
| async def test_copy_from_query_to_sink(self): | ||
| with tempfile.NamedTemporaryFile() as f: | ||
| async def writer(data): | ||
| await asyncio.sleep(0.05, loop=self.loop) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment here that sleep is needed to test the back-pressure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
asyncpg/protocol/protocol.pyx Outdated
| done = False | ||
| while not done: | ||
| buffer, done, status_msg = await self._new_waiter(timeout) | ||
| if buffer: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe
while True: ... if done: break asyncpg/protocol/coreproto.pyx Outdated
| if self.state == PROTOCOL_IDLE: | ||
| self.state = new_state | ||
| | ||
| elif (self.state == PROTOCOL_SIMPLE_QUERY and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this branch isn't needed.
asyncpg/compat.py Outdated
| if isinstance(path, os.PathLike): | ||
| path = os.fspath(path) | ||
| elif not isinstance(path, (str, bytes)): | ||
| path = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it's better to raise TypeError and catch it in the code that uses this function?
asyncpg/connection.py Outdated
| return await self._protocol.copy_out(copy_stmt, writer, timeout) | ||
| finally: | ||
| if opened_by_us: | ||
| await run_in_executor(None, f.close) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK close() syscall isn't blocking so you don't need to use run_in_executor here.
| self.result = buf.consume_messages(b'd') | ||
| | ||
| self._skip_discard = True | ||
| if not buf.has_message(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll need a comment here, explaining that we either:
-
have read all messages from the buffer (it had only
dmessages) and now we need push the read data back; -
there are unprocessed messages in the buffer, so we want to let
_process__copy_out_datato process them and push the result.
| self._ensure_connected() | ||
| self._set_state(PROTOCOL_COPY_OUT) | ||
| | ||
| buf = WriteBuffer.new_message(b'Q') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment here that Q is SimpleQuery.
asyncpg/protocol/protocol.pyx Outdated
| paused = False | ||
| try: | ||
| while True: | ||
| waiter = self._new_waiter(timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to properly implement timeouts as part of this PR?
| 'cannot perform operation: another operation is in progress') | ||
| | ||
| cdef _new_waiter(self, timeout): | ||
| if self.waiter is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a piece that I added during debugging this PR. I think we should have it, but it should go in a separate commit.
This commit adds two new Connection methods: copy_from_table() and
copy_from_query() that allow copying the contents of a table or
the results of a query using PostgreSQL COPY protocol.
Issue #21.