|
| 1 | +ㅠ# 63. 응답성을 최대로 높이려면 asyncio 이벤트 루프를 블록하지 말라 |
| 2 | + |
| 3 | +## 1. [Better way 62](https://github.com/damho1104/Effective-Python/blob/master/summary/BetterWay62.md) 문제 |
| 4 | + |
| 5 | +```python |
| 6 | +import asyncio |
| 7 | +async def run_tasks(handles, interval, output_path): |
| 8 | + with open(output_path, 'wb') as output: |
| 9 | + async def write_async(data): |
| 10 | + output.write(data) |
| 11 | + |
| 12 | + tasks = [] |
| 13 | + for handle in handles: |
| 14 | + coro = tail_async(handle, interval, write_async) |
| 15 | + task = asyncio.create_task(coro) |
| 16 | + tasks.append(task) |
| 17 | + |
| 18 | + await asyncio.gather(*tasks) |
| 19 | +``` |
| 20 | + |
| 21 | +- 문제 |
| 22 | + - 출력 파일 핸들에 대한 `open`, `close`, `write` 호출이 주 이벤트 루프에서 발생 |
| 23 | + - 시스템 콜 이므로 블록될 수 있음 |
| 24 | + - 다른 코루틴이 진행되지 못함 |
| 25 | + - 응답성 저하, 동시성이 강한 서버에서는 응답 시간 증대 |
| 26 | +- 해당 문제 발생인지 확인하고자 하는 경우 |
| 27 | + - `asyncio.run` 함수에 `debug=True` 전달 |
| 28 | + |
| 29 | + ```python |
| 30 | + import time |
| 31 | + |
| 32 | + async def slow_coroutine(): |
| 33 | + time.sleep(0.5) # 느린 I/O를 시뮬레이션함 |
| 34 | + asyncio.run(slow_coroutine(), debug=True) |
| 35 | + |
| 36 | + >>> |
| 37 | + Executing <Task finished name='Task-1' coro=<slow_coroutine() done, defined at example.py:29> result=None created at .../asyncio/base_events.py:487> took 0.503 seconds |
| 38 | + ... |
| 39 | + ``` |
| 40 | + |
| 41 | + |
| 42 | +## 2. 해결 방안 |
| 43 | + |
| 44 | +- 이벤트 루프에서 시스템 콜이 이뤄질 잠재적 가능성을 최소화 |
| 45 | + |
| 46 | +```python |
| 47 | +from threading import Thread |
| 48 | + |
| 49 | +class WriteThread(Thread): |
| 50 | + def __init__(self, output_path): |
| 51 | + super().__init__() |
| 52 | + self.output_path = output_path |
| 53 | + self.output = None |
| 54 | + self.loop = asyncio.new_event_loop() |
| 55 | + |
| 56 | + def run(self): |
| 57 | + asyncio.set_event_loop(self.loop) |
| 58 | + with open(self.output_path, 'wb') as self.output: |
| 59 | + self.loop.run_forever() |
| 60 | + |
| 61 | + # 맨 마지막에 한 번 더 이벤트 루프를 실행해서 |
| 62 | + # 다른 이벤트 루프가# stop()에# await하는 경우를 해결한다 |
| 63 | + self.loop.run_until_complete(asyncio.sleep(0)) |
| 64 | + |
| 65 | + async def real_write(self, data): |
| 66 | + self.output.write(data) |
| 67 | + |
| 68 | + async def write(self, data): |
| 69 | + coro = self.real_write(data) |
| 70 | + future = asyncio.run_coroutine_threadsafe(coro, self.loop) |
| 71 | + await asyncio.wrap_future(future) |
| 72 | + |
| 73 | + async def real_stop(self): |
| 74 | + self.loop.stop() |
| 75 | + |
| 76 | + async def stop(self): |
| 77 | + coro = self.real_stop() |
| 78 | + future = asyncio.run_coroutine_threadsafe(coro, self.loop) |
| 79 | + await asyncio.wrap_future(future) |
| 80 | + |
| 81 | + async def __aenter__(self): |
| 82 | + loop = asyncio.get_event_loop() |
| 83 | + await loop.run_in_executor(None, self.start) |
| 84 | + return self |
| 85 | + |
| 86 | + async def __aexit__(self, *_): |
| 87 | + await self.stop() |
| 88 | +``` |
| 89 | + |
| 90 | +- `write` 메소드는 실제 I/O 발생시키는 `real_write` 메소드가 thread-safety 하게 래핑 |
| 91 | +- 다른 스레드에서 실행되는 코루틴은 해당 클래스의 `write` 메소드를 실행하면서 `await` |
| 92 | + - `Lock` 필요 없음 |
| 93 | +- 다른 코루틴은 `stop` 메소드를 사용해 작업자 스레드에게 실행을 중단하라고 전달 가능 |
| 94 | + - `write` 와 비슷하게 thread-safety 하게 작성 |
| 95 | +- `__acenter__`, `__aexit__` 를 사용하여 `with` 문과 함께 사용가능하도록 작성 |
| 96 | + - 작업자 스레드가 주 이벤트 루프 스레드를 느리게 만들지 않으면서 제시간이 시작/종료 가능 |
| 97 | + |
| 98 | +```python |
| 99 | +def readline(handle): |
| 100 | + ... |
| 101 | + |
| 102 | +async def tail_async(handle, interval, write_func): |
| 103 | + ... |
| 104 | + |
| 105 | +async def run_fully_async(handles, interval, output_path): |
| 106 | + async with WriteThread(output_path) as output: |
| 107 | + tasks = [] |
| 108 | + for handle in handles: |
| 109 | + coro = tail_async(handle, interval, output.write) |
| 110 | + task = asyncio.create_task(coro) |
| 111 | + tasks.append(task) |
| 112 | + |
| 113 | + await asyncio.gather(*tasks) |
| 114 | + |
| 115 | +def confirm_merge(input_paths, output_path): |
| 116 | + ... |
| 117 | + |
| 118 | +input_paths = ... |
| 119 | +handles = ... |
| 120 | +output_path = ... |
| 121 | + |
| 122 | +asyncio.run(run_fully_async(handles, 0.1, output_path)) |
| 123 | +confirm_merge(input_paths, output_path) |
| 124 | +``` |
| 125 | + |
| 126 | +- `WriteThread` 클래스를 사용하여 `run_tasks` 를 완벽히 비동기적으로 변경 |
| 127 | +- 주어진 입력 핸들과 출력 파일 경로에 대해 작업자가 예상대로 작동하는지 검증 가능 |
0 commit comments