Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions asyncpg/_testbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ def create_pool(dsn=None, *,
min_size=10,
max_size=10,
max_queries=50000,
max_inactive_connection_lifetime=60.0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the default should be 0.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, let's bump this to something like 300.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all done

setup=None,
init=None,
loop=None,
Expand All @@ -166,6 +167,7 @@ def create_pool(dsn=None, *,
dsn,
min_size=min_size, max_size=max_size,
max_queries=max_queries, loop=loop, setup=setup, init=init,
max_inactive_connection_lifetime=max_inactive_connection_lifetime,
**connect_kwargs)


Expand Down
55 changes: 49 additions & 6 deletions asyncpg/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,24 @@ class PoolConnectionHolder:

__slots__ = ('_con', '_pool', '_loop',
'_connect_args', '_connect_kwargs',
'_max_queries', '_setup', '_init')
'_max_queries', '_setup', '_init',
'_max_inactive_time', '_in_use',
'_inactive_callback')

def __init__(self, pool, *, connect_args, connect_kwargs,
max_queries, setup, init):
max_queries, setup, init, max_inactive_time):

self._pool = pool
self._con = None

self._connect_args = connect_args
self._connect_kwargs = connect_kwargs
self._max_queries = max_queries
self._max_inactive_time = max_inactive_time
self._setup = setup
self._init = init
self._inactive_callback = None
self._in_use = False

async def connect(self):
assert self._con is None
Expand Down Expand Up @@ -134,6 +139,8 @@ async def acquire(self) -> PoolConnectionProxy:
if self._con is None:
await self.connect()

self._maybe_cancel_inactive_callback()

proxy = PoolConnectionProxy(self, self._con)

if self._setup is not None:
Expand All @@ -154,9 +161,13 @@ async def acquire(self) -> PoolConnectionProxy:
self._con = None
raise ex

self._in_use = True
return proxy

async def release(self):
assert self._in_use
self._in_use = False

if self._con.is_closed():
self._con = None

Expand All @@ -181,7 +192,13 @@ async def release(self):
self._con = None
raise ex

assert self._inactive_callback is None
if self._max_inactive_time and self._con is not None:
self._inactive_callback = self._pool._loop.call_later(
self._max_inactive_time, self._deactivate_connection)

async def close(self):
self._maybe_cancel_inactive_callback()
if self._con is None:
return
if self._con.is_closed():
Expand All @@ -194,6 +211,7 @@ async def close(self):
self._con = None

def terminate(self):
self._maybe_cancel_inactive_callback()
if self._con is None:
return
if self._con.is_closed():
Expand All @@ -205,6 +223,18 @@ def terminate(self):
finally:
self._con = None

def _maybe_cancel_inactive_callback(self):
if self._inactive_callback is not None:
self._inactive_callback.cancel()
self._inactive_callback = None

def _deactivate_connection(self):
assert not self._in_use
if self._con is None or self._con.is_closed():
return
self._con.terminate()
self._con = None


class Pool:
"""A connection pool.
Expand All @@ -225,6 +255,7 @@ def __init__(self, *connect_args,
min_size,
max_size,
max_queries,
max_inactive_connection_lifetime,
setup,
init,
loop,
Expand All @@ -247,6 +278,11 @@ def __init__(self, *connect_args,
if max_queries <= 0:
raise ValueError('max_queries is expected to be greater than zero')

if max_inactive_connection_lifetime < 0:
raise ValueError(
'max_inactive_connection_lifetime is expected to be greater '
'or equal to zero')

self._minsize = min_size
self._maxsize = max_size

Expand All @@ -265,6 +301,7 @@ def __init__(self, *connect_args,
connect_args=connect_args,
connect_kwargs=connect_kwargs,
max_queries=max_queries,
max_inactive_time=max_inactive_connection_lifetime,
setup=setup,
init=init)

Expand Down Expand Up @@ -511,6 +548,7 @@ def create_pool(dsn=None, *,
min_size=10,
max_size=10,
max_queries=50000,
max_inactive_connection_lifetime=300.0,
setup=None,
init=None,
loop=None,
Expand Down Expand Up @@ -548,6 +586,9 @@ def create_pool(dsn=None, *,
:param int max_size: Max number of connections in the pool.
:param int max_queries: Number of queries after a connection is closed
and replaced with a new connection.
:param float max_inactive_connection_lifetime:
Number of seconds after which inactive connections in the
pool will be closed. Pass ``0`` to disable this mechanism.
:param coroutine setup: A coroutine to prepare a connection right before
it is returned from :meth:`~pool.Pool.acquire`.
An example use case would be to automatically
Expand All @@ -567,7 +608,9 @@ def create_pool(dsn=None, *,
An :exc:`~asyncpg.exceptions.InterfaceError` will be raised on any
attempted operation on a released connection.
"""
return Pool(dsn,
min_size=min_size, max_size=max_size,
max_queries=max_queries, loop=loop, setup=setup, init=init,
**connect_kwargs)
return Pool(
dsn,
min_size=min_size, max_size=max_size,
max_queries=max_queries, loop=loop, setup=setup, init=init,
max_inactive_connection_lifetime=max_inactive_connection_lifetime,
**connect_kwargs)
94 changes: 94 additions & 0 deletions tests/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import os
import platform
import random
import time
import unittest

from asyncpg import _testbase as tb
Expand Down Expand Up @@ -457,6 +458,99 @@ async def worker(pool):
finally:
await pool.execute('DROP TABLE exmany')

async def test_pool_max_inactive_time_01(self):
async with self.create_pool(
database='postgres', min_size=1, max_size=1,
max_inactive_connection_lifetime=0.1) as pool:

# Test that it's OK if a query takes longer time to execute
# than `max_inactive_connection_lifetime`.

con = pool._holders[0]._con

for _ in range(3):
await pool.execute('SELECT pg_sleep(0.5)')
self.assertIs(pool._holders[0]._con, con)

self.assertEqual(
await pool.execute('SELECT 1::int'),
'SELECT 1')
self.assertIs(pool._holders[0]._con, con)

async def test_pool_max_inactive_time_02(self):
async with self.create_pool(
database='postgres', min_size=1, max_size=1,
max_inactive_connection_lifetime=0.5) as pool:

# Test that we have a new connection after pool not
# being used longer than `max_inactive_connection_lifetime`.

con = pool._holders[0]._con

self.assertEqual(
await pool.execute('SELECT 1::int'),
'SELECT 1')
self.assertIs(pool._holders[0]._con, con)

await asyncio.sleep(1, loop=self.loop)
self.assertIs(pool._holders[0]._con, None)

self.assertEqual(
await pool.execute('SELECT 1::int'),
'SELECT 1')
self.assertIsNot(pool._holders[0]._con, con)

async def test_pool_max_inactive_time_03(self):
async with self.create_pool(
database='postgres', min_size=1, max_size=1,
max_inactive_connection_lifetime=1) as pool:

# Test that we start counting inactive time *after*
# the connection is being released back to the pool.

con = pool._holders[0]._con

await pool.execute('SELECT pg_sleep(0.5)')
await asyncio.sleep(0.6, loop=self.loop)

self.assertIs(pool._holders[0]._con, con)

self.assertEqual(
await pool.execute('SELECT 1::int'),
'SELECT 1')
self.assertIs(pool._holders[0]._con, con)

async def test_pool_max_inactive_time_04(self):
# Chaos test for max_inactive_connection_lifetime.
DURATION = 2.0
START = time.monotonic()
N = 0

async def worker(pool):
nonlocal N
await asyncio.sleep(random.random() / 10 + 0.1, loop=self.loop)
async with pool.acquire() as con:
if random.random() > 0.5:
await con.execute('SELECT pg_sleep({:.2f})'.format(
random.random() / 10))
self.assertEqual(
await con.fetchval('SELECT 42::int'),
42)

if time.monotonic() - START < DURATION:
await worker(pool)

N += 1

async with self.create_pool(
database='postgres', min_size=10, max_size=30,
max_inactive_connection_lifetime=0.1) as pool:

workers = [worker(pool) for _ in range(50)]
await asyncio.gather(*workers, loop=self.loop)

self.assertGreater(N, 50)


@unittest.skipIf(os.environ.get('PGHOST'), 'using remote cluster for testing')
class TestHostStandby(tb.ConnectedTestCase):
Expand Down