Skip to content

Commit 5cf04bf

Browse files
committed
Add max_inactive_connection_lifetime parameter to Pool.
1 parent 12cce92 commit 5cf04bf

File tree

3 files changed

+144
-6
lines changed

3 files changed

+144
-6
lines changed

asyncpg/_testbase.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ def create_pool(dsn=None, *,
157157
min_size=10,
158158
max_size=10,
159159
max_queries=50000,
160+
max_inactive_connection_lifetime=60.0,
160161
setup=None,
161162
init=None,
162163
loop=None,
@@ -166,6 +167,7 @@ def create_pool(dsn=None, *,
166167
dsn,
167168
min_size=min_size, max_size=max_size,
168169
max_queries=max_queries, loop=loop, setup=setup, init=init,
170+
max_inactive_connection_lifetime=max_inactive_connection_lifetime,
169171
**connect_kwargs)
170172

171173

asyncpg/pool.py

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,19 +86,24 @@ class PoolConnectionHolder:
8686

8787
__slots__ = ('_con', '_pool', '_loop',
8888
'_connect_args', '_connect_kwargs',
89-
'_max_queries', '_setup', '_init')
89+
'_max_queries', '_setup', '_init',
90+
'_max_inactive_time', '_in_use',
91+
'_inactive_callback')
9092

9193
def __init__(self, pool, *, connect_args, connect_kwargs,
92-
max_queries, setup, init):
94+
max_queries, setup, init, max_inactive_time):
9395

9496
self._pool = pool
9597
self._con = None
9698

9799
self._connect_args = connect_args
98100
self._connect_kwargs = connect_kwargs
99101
self._max_queries = max_queries
102+
self._max_inactive_time = max_inactive_time
100103
self._setup = setup
101104
self._init = init
105+
self._inactive_callback = None
106+
self._in_use = False
102107

103108
async def connect(self):
104109
assert self._con is None
@@ -134,6 +139,8 @@ async def acquire(self) -> PoolConnectionProxy:
134139
if self._con is None:
135140
await self.connect()
136141

142+
self._maybe_cancel_inactive_callback()
143+
137144
proxy = PoolConnectionProxy(self, self._con)
138145

139146
if self._setup is not None:
@@ -154,9 +161,12 @@ async def acquire(self) -> PoolConnectionProxy:
154161
self._con = None
155162
raise ex
156163

164+
self._in_use = True
157165
return proxy
158166

159167
async def release(self):
168+
self._in_use = False
169+
160170
if self._con.is_closed():
161171
self._con = None
162172

@@ -181,7 +191,13 @@ async def release(self):
181191
self._con = None
182192
raise ex
183193

194+
assert self._inactive_callback is None
195+
if self._max_inactive_time:
196+
self._inactive_callback = self._pool._loop.call_later(
197+
self._max_inactive_time, self._deactivate_connection)
198+
184199
async def close(self):
200+
self._maybe_cancel_inactive_callback()
185201
if self._con is None:
186202
return
187203
if self._con.is_closed():
@@ -194,6 +210,7 @@ async def close(self):
194210
self._con = None
195211

196212
def terminate(self):
213+
self._maybe_cancel_inactive_callback()
197214
if self._con is None:
198215
return
199216
if self._con.is_closed():
@@ -205,6 +222,18 @@ def terminate(self):
205222
finally:
206223
self._con = None
207224

225+
def _maybe_cancel_inactive_callback(self):
226+
if self._inactive_callback is not None:
227+
self._inactive_callback.cancel()
228+
self._inactive_callback = None
229+
230+
def _deactivate_connection(self):
231+
assert not self._in_use
232+
if self._con is None or self._con.is_closed():
233+
return
234+
self._con.terminate()
235+
self._con = None
236+
208237

209238
class Pool:
210239
"""A connection pool.
@@ -225,6 +254,7 @@ def __init__(self, *connect_args,
225254
min_size,
226255
max_size,
227256
max_queries,
257+
max_inactive_connection_lifetime,
228258
setup,
229259
init,
230260
loop,
@@ -247,6 +277,11 @@ def __init__(self, *connect_args,
247277
if max_queries <= 0:
248278
raise ValueError('max_queries is expected to be greater than zero')
249279

280+
if max_inactive_connection_lifetime < 0:
281+
raise ValueError(
282+
'max_inactive_connection_lifetime is expected to be greater '
283+
'or equal to zero')
284+
250285
self._minsize = min_size
251286
self._maxsize = max_size
252287

@@ -265,6 +300,7 @@ def __init__(self, *connect_args,
265300
connect_args=connect_args,
266301
connect_kwargs=connect_kwargs,
267302
max_queries=max_queries,
303+
max_inactive_time=max_inactive_connection_lifetime,
268304
setup=setup,
269305
init=init)
270306

@@ -511,6 +547,7 @@ def create_pool(dsn=None, *,
511547
min_size=10,
512548
max_size=10,
513549
max_queries=50000,
550+
max_inactive_connection_lifetime=60.0,
514551
setup=None,
515552
init=None,
516553
loop=None,
@@ -548,6 +585,9 @@ def create_pool(dsn=None, *,
548585
:param int max_size: Max number of connections in the pool.
549586
:param int max_queries: Number of queries after a connection is closed
550587
and replaced with a new connection.
588+
:param float max_inactive_connection_lifetime:
589+
Number of seconds after which inactive connections in the
590+
pool will be closed. Pass ``0`` to disable this mechanism.
551591
:param coroutine setup: A coroutine to prepare a connection right before
552592
it is returned from :meth:`~pool.Pool.acquire`.
553593
An example use case would be to automatically
@@ -567,7 +607,9 @@ def create_pool(dsn=None, *,
567607
An :exc:`~asyncpg.exceptions.InterfaceError` will be raised on any
568608
attempted operation on a released connection.
569609
"""
570-
return Pool(dsn,
571-
min_size=min_size, max_size=max_size,
572-
max_queries=max_queries, loop=loop, setup=setup, init=init,
573-
**connect_kwargs)
610+
return Pool(
611+
dsn,
612+
min_size=min_size, max_size=max_size,
613+
max_queries=max_queries, loop=loop, setup=setup, init=init,
614+
max_inactive_connection_lifetime=max_inactive_connection_lifetime,
615+
**connect_kwargs)

tests/test_pool.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import os
1212
import platform
1313
import random
14+
import time
1415
import unittest
1516

1617
from asyncpg import _testbase as tb
@@ -457,6 +458,99 @@ async def worker(pool):
457458
finally:
458459
await pool.execute('DROP TABLE exmany')
459460

461+
async def test_pool_max_inactive_time_01(self):
462+
async with self.create_pool(
463+
database='postgres', min_size=1, max_size=1,
464+
max_inactive_connection_lifetime=0.1) as pool:
465+
466+
# Test that it's OK if a query takes longer time to execute
467+
# than `max_inactive_connection_lifetime`.
468+
469+
con = pool._holders[0]._con
470+
471+
for _ in range(3):
472+
await pool.execute('SELECT pg_sleep(0.5)')
473+
self.assertIs(pool._holders[0]._con, con)
474+
475+
self.assertEqual(
476+
await pool.execute('SELECT 1::int'),
477+
'SELECT 1')
478+
self.assertIs(pool._holders[0]._con, con)
479+
480+
async def test_pool_max_inactive_time_02(self):
481+
async with self.create_pool(
482+
database='postgres', min_size=1, max_size=1,
483+
max_inactive_connection_lifetime=0.5) as pool:
484+
485+
# Test that we have a new connection after pool not
486+
# being used longer than `max_inactive_connection_lifetime`.
487+
488+
con = pool._holders[0]._con
489+
490+
self.assertEqual(
491+
await pool.execute('SELECT 1::int'),
492+
'SELECT 1')
493+
self.assertIs(pool._holders[0]._con, con)
494+
495+
await asyncio.sleep(1, loop=self.loop)
496+
self.assertIs(pool._holders[0]._con, None)
497+
498+
self.assertEqual(
499+
await pool.execute('SELECT 1::int'),
500+
'SELECT 1')
501+
self.assertIsNot(pool._holders[0]._con, con)
502+
503+
async def test_pool_max_inactive_time_03(self):
504+
async with self.create_pool(
505+
database='postgres', min_size=1, max_size=1,
506+
max_inactive_connection_lifetime=1) as pool:
507+
508+
# Test that we start counting inactive time *after*
509+
# the connection is being released back to the pool.
510+
511+
con = pool._holders[0]._con
512+
513+
await pool.execute('SELECT pg_sleep(0.5)')
514+
await asyncio.sleep(0.6, loop=self.loop)
515+
516+
self.assertIs(pool._holders[0]._con, con)
517+
518+
self.assertEqual(
519+
await pool.execute('SELECT 1::int'),
520+
'SELECT 1')
521+
self.assertIs(pool._holders[0]._con, con)
522+
523+
async def test_pool_max_inactive_time_04(self):
524+
# Chaos test for max_inactive_connection_lifetime.
525+
DURATION = 2.0
526+
START = time.monotonic()
527+
N = 0
528+
529+
async def worker(pool):
530+
nonlocal N
531+
await asyncio.sleep(random.random() / 10 + 0.1, loop=self.loop)
532+
async with pool.acquire() as con:
533+
if random.random() > 0.5:
534+
await con.execute('SELECT pg_sleep({:.2f})'.format(
535+
random.random() / 10))
536+
self.assertEqual(
537+
await con.fetchval('SELECT 42::int'),
538+
42)
539+
540+
if time.monotonic() - START < DURATION:
541+
await worker(pool)
542+
543+
N += 1
544+
545+
async with self.create_pool(
546+
database='postgres', min_size=10, max_size=30,
547+
max_inactive_connection_lifetime=0.1) as pool:
548+
549+
workers = [worker(pool) for _ in range(50)]
550+
await asyncio.gather(*workers, loop=self.loop)
551+
552+
self.assertGreater(N, 50)
553+
460554

461555
@unittest.skipIf(os.environ.get('PGHOST'), 'using remote cluster for testing')
462556
class TestHostStandby(tb.ConnectedTestCase):

0 commit comments

Comments
 (0)