Closed
Description
Feature or enhancement
Proposal:
Consider the following use case:
An asyncio event loop where the majority of the handles being run have a very small run time because they are decoding Bluetooth or Zigbee packets.
In this case _run_once
becomes a bit of a bottleneck. In production, the min
and max
calls to find the timeout represent a significant portion of the run time. We can increase the number of packets that can be processed per second by switching the min
and max
to a simple >
and <
check.
import heapq import timeit from asyncio import events from asyncio.base_events import ( _MIN_CANCELLED_TIMER_HANDLES_FRACTION, _MIN_SCHEDULED_TIMER_HANDLES, MAXIMUM_SELECT_TIMEOUT, BaseEventLoop, _format_handle, logger, ) from time import monotonic class FakeSelector: def select(self, timeout): """Wait for I/O events.""" fake_selector = FakeSelector() original_loop = BaseEventLoop() original_loop._selector = fake_selector original_loop._process_events = lambda events: None timer = events.TimerHandle( monotonic() + 100000, lambda: None, ("any",), original_loop, None ) timer._scheduled = True heapq.heappush(original_loop._scheduled, timer) class OptimizedBaseEventLoop(BaseEventLoop): def _run_once(self): """Run one full iteration of the event loop. This calls all currently ready callbacks, polls for I/O, schedules the resulting callbacks, and finally schedules 'call_later' callbacks. """ sched_count = len(self._scheduled) if ( sched_count > _MIN_SCHEDULED_TIMER_HANDLES and self._timer_cancelled_count / sched_count > _MIN_CANCELLED_TIMER_HANDLES_FRACTION ): # Remove delayed calls that were cancelled if their number # is too high new_scheduled = [] for handle in self._scheduled: if handle._cancelled: handle._scheduled = False else: new_scheduled.append(handle) heapq.heapify(new_scheduled) self._scheduled = new_scheduled self._timer_cancelled_count = 0 else: # Remove delayed calls that were cancelled from head of queue. while self._scheduled and self._scheduled[0]._cancelled: self._timer_cancelled_count -= 1 handle = heapq.heappop(self._scheduled) handle._scheduled = False timeout = None if self._ready or self._stopping: timeout = 0 elif self._scheduled: # Compute the desired timeout. timeout = self._scheduled[0]._when - self.time() if timeout > MAXIMUM_SELECT_TIMEOUT: timeout = MAXIMUM_SELECT_TIMEOUT elif timeout < 0: timeout = 0 event_list = self._selector.select(timeout) self._process_events(event_list) # Needed to break cycles when an exception occurs. event_list = None # Handle 'later' callbacks that are ready. end_time = self.time() + self._clock_resolution while self._scheduled: handle = self._scheduled[0] if handle._when >= end_time: break handle = heapq.heappop(self._scheduled) handle._scheduled = False self._ready.append(handle) # This is the only place where callbacks are actually *called*. # All other places just add them to ready. # Note: We run all currently scheduled callbacks, but not any # callbacks scheduled by callbacks run this time around -- # they will be run the next time (after another I/O poll). # Use an idiom that is thread-safe without using locks. ntodo = len(self._ready) for i in range(ntodo): handle = self._ready.popleft() if handle._cancelled: continue if self._debug: try: self._current_handle = handle t0 = self.time() handle._run() dt = self.time() - t0 if dt >= self.slow_callback_duration: logger.warning( "Executing %s took %.3f seconds", _format_handle(handle), dt ) finally: self._current_handle = None else: handle._run() handle = None # Needed to break cycles when an exception occurs. new_loop = OptimizedBaseEventLoop() new_loop._selector = fake_selector new_loop._process_events = lambda events: None timer = events.TimerHandle(monotonic() + 100000, lambda: None, ("any",), new_loop, None) timer._scheduled = True heapq.heappush(new_loop._scheduled, timer) new_time = timeit.timeit( "loop._run_once()", number=1000000, globals={"loop": new_loop}, ) print("new: %s" % new_time) original_time = timeit.timeit( "loop._run_once()", number=1000000, globals={"loop": original_loop}, ) print("original: %s" % original_time)
new: 0.36033158400096 original: 0.4667800000170246
Has this already been discussed elsewhere?
This is a minor feature, which does not need previous discussion elsewhere
Links to previous discussion of this feature:
No response
Linked PRs
Metadata
Metadata
Assignees
Projects
Status
Done