Skip to content

Commit 7c68407

Browse files
authored
bpo-32622: Implement loop.sendfile() (#5271)
1 parent f13f12d commit 7c68407

File tree

12 files changed

+560
-8
lines changed

12 files changed

+560
-8
lines changed

Doc/library/asyncio-eventloop.rst

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,37 @@ Creating listening connections
543543
.. versionadded:: 3.5.3
544544

545545

546+
File Transferring
547+
-----------------
548+
549+
.. coroutinemethod:: AbstractEventLoop.sendfile(sock, transport, \
550+
offset=0, count=None, \
551+
*, fallback=True)
552+
553+
Send a *file* to *transport*, return the total number of bytes
554+
which were sent.
555+
556+
The method uses high-performance :meth:`os.sendfile` if available.
557+
558+
*file* must be a regular file object opened in binary mode.
559+
560+
*offset* tells from where to start reading the file. If specified,
561+
*count* is the total number of bytes to transmit as opposed to
562+
sending the file until EOF is reached. File position is updated on
563+
return or also in case of error in which case :meth:`file.tell()
564+
<io.IOBase.tell>` can be used to figure out the number of bytes
565+
which were sent.
566+
567+
*fallback* set to ``True`` makes asyncio to manually read and send
568+
the file when the platform does not support the sendfile syscall
569+
(e.g. Windows or SSL socket on Unix).
570+
571+
Raise :exc:`SendfileNotAvailableError` if the system does not support
572+
*sendfile* syscall and *fallback* is ``False``.
573+
574+
.. versionadded:: 3.7
575+
576+
546577
TLS Upgrade
547578
-----------
548579

Lib/asyncio/base_events.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@
3838
from . import coroutines
3939
from . import events
4040
from . import futures
41+
from . import protocols
4142
from . import sslproto
4243
from . import tasks
44+
from . import transports
4345
from .log import logger
4446

4547

@@ -155,6 +157,75 @@ def _run_until_complete_cb(fut):
155157
futures._get_loop(fut).stop()
156158

157159

160+
161+
class _SendfileFallbackProtocol(protocols.Protocol):
162+
def __init__(self, transp):
163+
if not isinstance(transp, transports._FlowControlMixin):
164+
raise TypeError("transport should be _FlowControlMixin instance")
165+
self._transport = transp
166+
self._proto = transp.get_protocol()
167+
self._should_resume_reading = transp.is_reading()
168+
self._should_resume_writing = transp._protocol_paused
169+
transp.pause_reading()
170+
transp.set_protocol(self)
171+
if self._should_resume_writing:
172+
self._write_ready_fut = self._transport._loop.create_future()
173+
else:
174+
self._write_ready_fut = None
175+
176+
async def drain(self):
177+
if self._transport.is_closing():
178+
raise ConnectionError("Connection closed by peer")
179+
fut = self._write_ready_fut
180+
if fut is None:
181+
return
182+
await fut
183+
184+
def connection_made(self, transport):
185+
raise RuntimeError("Invalid state: "
186+
"connection should have been established already.")
187+
188+
def connection_lost(self, exc):
189+
if self._write_ready_fut is not None:
190+
# Never happens if peer disconnects after sending the whole content
191+
# Thus disconnection is always an exception from user perspective
192+
if exc is None:
193+
self._write_ready_fut.set_exception(
194+
ConnectionError("Connection is closed by peer"))
195+
else:
196+
self._write_ready_fut.set_exception(exc)
197+
self._proto.connection_lost(exc)
198+
199+
def pause_writing(self):
200+
if self._write_ready_fut is not None:
201+
return
202+
self._write_ready_fut = self._transport._loop.create_future()
203+
204+
def resume_writing(self):
205+
if self._write_ready_fut is None:
206+
return
207+
self._write_ready_fut.set_result(False)
208+
self._write_ready_fut = None
209+
210+
def data_received(self, data):
211+
raise RuntimeError("Invalid state: reading should be paused")
212+
213+
def eof_received(self):
214+
raise RuntimeError("Invalid state: reading should be paused")
215+
216+
async def restore(self):
217+
self._transport.set_protocol(self._proto)
218+
if self._should_resume_reading:
219+
self._transport.resume_reading()
220+
if self._write_ready_fut is not None:
221+
# Cancel the future.
222+
# Basically it has no effect because protocol is switched back,
223+
# no code should wait for it anymore.
224+
self._write_ready_fut.cancel()
225+
if self._should_resume_writing:
226+
self._proto.resume_writing()
227+
228+
158229
class Server(events.AbstractServer):
159230

160231
def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
@@ -926,6 +997,77 @@ async def _create_connection_transport(
926997

927998
return transport, protocol
928999

1000+
async def sendfile(self, transport, file, offset=0, count=None,
1001+
*, fallback=True):
1002+
"""Send a file to transport.
1003+
1004+
Return the total number of bytes which were sent.
1005+
1006+
The method uses high-performance os.sendfile if available.
1007+
1008+
file must be a regular file object opened in binary mode.
1009+
1010+
offset tells from where to start reading the file. If specified,
1011+
count is the total number of bytes to transmit as opposed to
1012+
sending the file until EOF is reached. File position is updated on
1013+
return or also in case of error in which case file.tell()
1014+
can be used to figure out the number of bytes
1015+
which were sent.
1016+
1017+
fallback set to True makes asyncio to manually read and send
1018+
the file when the platform does not support the sendfile syscall
1019+
(e.g. Windows or SSL socket on Unix).
1020+
1021+
Raise SendfileNotAvailableError if the system does not support
1022+
sendfile syscall and fallback is False.
1023+
"""
1024+
if transport.is_closing():
1025+
raise RuntimeError("Transport is closing")
1026+
mode = getattr(transport, '_sendfile_compatible',
1027+
constants._SendfileMode.UNSUPPORTED)
1028+
if mode is constants._SendfileMode.UNSUPPORTED:
1029+
raise RuntimeError(
1030+
f"sendfile is not supported for transport {transport!r}")
1031+
if mode is constants._SendfileMode.TRY_NATIVE:
1032+
try:
1033+
return await self._sendfile_native(transport, file,
1034+
offset, count)
1035+
except events.SendfileNotAvailableError as exc:
1036+
if not fallback:
1037+
raise
1038+
# the mode is FALLBACK or fallback is True
1039+
return await self._sendfile_fallback(transport, file,
1040+
offset, count)
1041+
1042+
async def _sendfile_native(self, transp, file, offset, count):
1043+
raise events.SendfileNotAvailableError(
1044+
"sendfile syscall is not supported")
1045+
1046+
async def _sendfile_fallback(self, transp, file, offset, count):
1047+
if offset:
1048+
file.seek(offset)
1049+
blocksize = min(count, 16384) if count else 16384
1050+
buf = bytearray(blocksize)
1051+
total_sent = 0
1052+
proto = _SendfileFallbackProtocol(transp)
1053+
try:
1054+
while True:
1055+
if count:
1056+
blocksize = min(count - total_sent, blocksize)
1057+
if blocksize <= 0:
1058+
return total_sent
1059+
view = memoryview(buf)[:blocksize]
1060+
read = file.readinto(view)
1061+
if not read:
1062+
return total_sent # EOF
1063+
await proto.drain()
1064+
transp.write(view)
1065+
total_sent += read
1066+
finally:
1067+
if total_sent > 0 and hasattr(file, 'seek'):
1068+
file.seek(offset + total_sent)
1069+
await proto.restore()
1070+
9291071
async def start_tls(self, transport, protocol, sslcontext, *,
9301072
server_side=False,
9311073
server_hostname=None,

Lib/asyncio/constants.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import enum
2+
13
# After the connection is lost, log warnings after this many write()s.
24
LOG_THRESHOLD_FOR_CONNLOST_WRITES = 5
35

@@ -11,3 +13,10 @@
1113

1214
# Number of seconds to wait for SSL handshake to complete
1315
SSL_HANDSHAKE_TIMEOUT = 10.0
16+
17+
# The enum should be here to break circular dependencies between
18+
# base_events and sslproto
19+
class _SendfileMode(enum.Enum):
20+
UNSUPPORTED = enum.auto()
21+
TRY_NATIVE = enum.auto()
22+
FALLBACK = enum.auto()

Lib/asyncio/events.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,14 @@ async def create_server(
354354
"""
355355
raise NotImplementedError
356356

357+
async def sendfile(self, transport, file, offset=0, count=None,
358+
*, fallback=True):
359+
"""Send a file through a transport.
360+
361+
Return an amount of sent bytes.
362+
"""
363+
raise NotImplementedError
364+
357365
async def start_tls(self, transport, protocol, sslcontext, *,
358366
server_side=False,
359367
server_hostname=None,

Lib/asyncio/proactor_events.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,12 @@ def _loop_reading(self, fut=None):
180180
assert self._read_fut is fut or (self._read_fut is None and
181181
self._closing)
182182
self._read_fut = None
183-
data = fut.result() # deliver data later in "finally" clause
183+
if fut.done():
184+
# deliver data later in "finally" clause
185+
data = fut.result()
186+
else:
187+
# the future will be replaced by next proactor.recv call
188+
fut.cancel()
184189

185190
if self._closing:
186191
# since close() has been called we ignore any read data
@@ -345,6 +350,8 @@ class _ProactorSocketTransport(_ProactorReadPipeTransport,
345350
transports.Transport):
346351
"""Transport for connected sockets."""
347352

353+
_sendfile_compatible = constants._SendfileMode.FALLBACK
354+
348355
def _set_extra(self, sock):
349356
self._extra['socket'] = sock
350357

Lib/asyncio/selector_events.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,20 @@ def _sock_accept(self, fut, registered, sock):
540540
else:
541541
fut.set_result((conn, address))
542542

543+
async def _sendfile_native(self, transp, file, offset, count):
544+
del self._transports[transp._sock_fd]
545+
resume_reading = transp.is_reading()
546+
transp.pause_reading()
547+
await transp._make_empty_waiter()
548+
try:
549+
return await self.sock_sendfile(transp._sock, file, offset, count,
550+
fallback=False)
551+
finally:
552+
transp._reset_empty_waiter()
553+
if resume_reading:
554+
transp.resume_reading()
555+
self._transports[transp._sock_fd] = transp
556+
543557
def _process_events(self, event_list):
544558
for key, mask in event_list:
545559
fileobj, (reader, writer) = key.fileobj, key.data
@@ -695,12 +709,14 @@ def get_write_buffer_size(self):
695709
class _SelectorSocketTransport(_SelectorTransport):
696710

697711
_start_tls_compatible = True
712+
_sendfile_compatible = constants._SendfileMode.TRY_NATIVE
698713

699714
def __init__(self, loop, sock, protocol, waiter=None,
700715
extra=None, server=None):
701716
super().__init__(loop, sock, protocol, extra, server)
702717
self._eof = False
703718
self._paused = False
719+
self._empty_waiter = None
704720

705721
# Disable the Nagle algorithm -- small writes will be
706722
# sent without waiting for the TCP ACK. This generally
@@ -765,6 +781,8 @@ def write(self, data):
765781
f'not {type(data).__name__!r}')
766782
if self._eof:
767783
raise RuntimeError('Cannot call write() after write_eof()')
784+
if self._empty_waiter is not None:
785+
raise RuntimeError('unable to write; sendfile is in progress')
768786
if not data:
769787
return
770788

@@ -807,12 +825,16 @@ def _write_ready(self):
807825
self._loop._remove_writer(self._sock_fd)
808826
self._buffer.clear()
809827
self._fatal_error(exc, 'Fatal write error on socket transport')
828+
if self._empty_waiter is not None:
829+
self._empty_waiter.set_exception(exc)
810830
else:
811831
if n:
812832
del self._buffer[:n]
813833
self._maybe_resume_protocol() # May append to buffer.
814834
if not self._buffer:
815835
self._loop._remove_writer(self._sock_fd)
836+
if self._empty_waiter is not None:
837+
self._empty_waiter.set_result(None)
816838
if self._closing:
817839
self._call_connection_lost(None)
818840
elif self._eof:
@@ -828,6 +850,23 @@ def write_eof(self):
828850
def can_write_eof(self):
829851
return True
830852

853+
def _call_connection_lost(self, exc):
854+
super()._call_connection_lost(exc)
855+
if self._empty_waiter is not None:
856+
self._empty_waiter.set_exception(
857+
ConnectionError("Connection is closed by peer"))
858+
859+
def _make_empty_waiter(self):
860+
if self._empty_waiter is not None:
861+
raise RuntimeError("Empty waiter is already set")
862+
self._empty_waiter = self._loop.create_future()
863+
if not self._buffer:
864+
self._empty_waiter.set_result(None)
865+
return self._empty_waiter
866+
867+
def _reset_empty_waiter(self):
868+
self._empty_waiter = None
869+
831870

832871
class _SelectorDatagramTransport(_SelectorTransport):
833872

Lib/asyncio/sslproto.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,8 @@ def feed_appdata(self, data, offset=0):
282282
class _SSLProtocolTransport(transports._FlowControlMixin,
283283
transports.Transport):
284284

285+
_sendfile_compatible = constants._SendfileMode.FALLBACK
286+
285287
def __init__(self, loop, ssl_protocol):
286288
self._loop = loop
287289
# SSLProtocol instance
@@ -365,6 +367,11 @@ def get_write_buffer_size(self):
365367
"""Return the current size of the write buffer."""
366368
return self._ssl_protocol._transport.get_write_buffer_size()
367369

370+
@property
371+
def _protocol_paused(self):
372+
# Required for sendfile fallback pause_writing/resume_writing logic
373+
return self._ssl_protocol._transport._protocol_paused
374+
368375
def write(self, data):
369376
"""Write some data bytes to the transport.
370377

Lib/asyncio/windows_events.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,8 @@ def finish_recv(trans, key, ov):
425425
try:
426426
return ov.getresult()
427427
except OSError as exc:
428-
if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
428+
if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
429+
_overlapped.ERROR_OPERATION_ABORTED):
429430
raise ConnectionResetError(*exc.args)
430431
else:
431432
raise
@@ -447,7 +448,8 @@ def finish_recv(trans, key, ov):
447448
try:
448449
return ov.getresult()
449450
except OSError as exc:
450-
if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
451+
if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
452+
_overlapped.ERROR_OPERATION_ABORTED):
451453
raise ConnectionResetError(*exc.args)
452454
else:
453455
raise
@@ -466,7 +468,8 @@ def finish_send(trans, key, ov):
466468
try:
467469
return ov.getresult()
468470
except OSError as exc:
469-
if exc.winerror == _overlapped.ERROR_NETNAME_DELETED:
471+
if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED,
472+
_overlapped.ERROR_OPERATION_ABORTED):
470473
raise ConnectionResetError(*exc.args)
471474
else:
472475
raise

0 commit comments

Comments
 (0)