Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions elasticapm/instrumentation/packages/asyncio/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,33 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule
from elasticapm.traces import execution_context
from elasticapm.utils.wrapt import function_wrapper


class AsyncAbstractInstrumentedModule(AbstractInstrumentedModule):
async def call(self, module, method, wrapped, instance, args, kwargs):
raise NotImplementedError()


class RunInExecutorInstrumentation(AbstractInstrumentedModule):
name = "executor"
instrument_list = [("asyncio.base_events", "BaseEventLoop.run_in_executor")]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might we worth checking if this would also work with uvloop (https://github.com/MagicStack/uvloop/blob/8beacd265651429a558d1b22e200d6d81016f1db/uvloop/loop.pyx#L2646), but I have my doubts, as wrapt can't patch objects defined in C AFAIK


def call(self, module, method, wrapped, instance, args, kwargs):
func = args[1]
transaction = execution_context.get_transaction()
span = execution_context.get_span()

@function_wrapper
def wrapper(wrapped, instance, args, kwargs):
execution_context.set_transaction(transaction)
execution_context.set_span(span)
try:
return wrapped(*args, **kwargs)
finally:
execution_context.get_transaction(clear=True)

func = wrapper(func)
args = (args[0], func) + args[2:]
return wrapped(*args, **kwargs)
1 change: 1 addition & 0 deletions elasticapm/instrumentation/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
"elasticapm.instrumentation.packages.tornado.TornadoRequestExecuteInstrumentation",
"elasticapm.instrumentation.packages.tornado.TornadoHandleRequestExceptionInstrumentation",
"elasticapm.instrumentation.packages.tornado.TornadoRenderInstrumentation",
"elasticapm.instrumentation.packages.asyncio.base.RunInExecutorInstrumentation",
"elasticapm.instrumentation.packages.asyncio.httpcore.HTTPCoreAsyncInstrumentation",
"elasticapm.instrumentation.packages.asyncio.aioredis.RedisConnectionPoolInstrumentation",
"elasticapm.instrumentation.packages.asyncio.aioredis.RedisPipelineInstrumentation",
Expand Down
19 changes: 19 additions & 0 deletions tests/instrumentation/asyncio_tests/base_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import threading
from asyncio import tasks
from concurrent import futures

import pytest

Expand Down Expand Up @@ -64,3 +66,20 @@ async def do_some_work():
assert d["subtype"] == "custom"
assert not d["sync"]
assert d["transaction_id"] == transaction["id"]


async def test_threadpool_executor(instrument, event_loop, elasticapm_client):
executor = futures.ThreadPoolExecutor(max_workers=2)
elasticapm_client.begin_transaction("foo")

def myfunc(a, b):
with elasticapm.capture_span("nothing", extra={"thread_id": threading.get_ident()}):
pass

await event_loop.run_in_executor(executor, myfunc, 1, 2)
elasticapm_client.end_transaction("bar", "OK")
transaction = elasticapm_client.events[constants.TRANSACTION][0]
spans = elasticapm_client.spans_for_transaction(transaction)
assert len(spans) == 1
assert spans[0]["name"] == "nothing"
assert spans[0]["context"]["thread_id"] != threading.get_ident()