Skip to content

Commit 9a2fe49

Browse files
author
A. Jesse Jiryu Davis
committed
Fix PYTHON-509, rare connection leaks in Python <= 2.7.0
1 parent e56b08d commit 9a2fe49

File tree

5 files changed

+113
-20
lines changed

5 files changed

+113
-20
lines changed

pymongo/thread_util.py

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
"""Utilities to abstract the differences between threads and greenlets."""
1616

1717
import threading
18+
import sys
1819
import weakref
1920

2021
have_greenlet = True
@@ -24,6 +25,10 @@
2425
have_greenlet = False
2526

2627

28+
# Do we have to work around http://bugs.python.org/issue1868?
29+
issue1868 = (sys.version_info[:3] <= (2, 7, 1))
30+
31+
2732
class Ident(object):
2833
def __init__(self):
2934
self._refs = {}
@@ -47,23 +52,42 @@ def watch(self, callback):
4752

4853

4954
class ThreadIdent(Ident):
55+
class _DummyLock(object):
56+
def acquire(self):
57+
pass
58+
59+
def release(self):
60+
pass
61+
5062
def __init__(self):
5163
super(ThreadIdent, self).__init__()
5264
self._local = threading.local()
65+
if issue1868:
66+
self._lock = threading.Lock()
67+
else:
68+
self._lock = ThreadIdent._DummyLock()
5369

5470
# We watch for thread-death using a weakref callback to a thread local.
5571
# Weakrefs are permitted on subclasses of object but not object() itself.
5672
class ThreadVigil(object):
5773
pass
5874

75+
def _make_vigil(self):
76+
# Assigning to a threadlocal isn't thread-safe in Python <= 2.7.0
77+
self._lock.acquire()
78+
vigil = getattr(self._local, 'vigil', None)
79+
if not vigil:
80+
self._local.vigil = vigil = ThreadIdent.ThreadVigil()
81+
82+
self._lock.release()
83+
return vigil
84+
5985
def get(self):
60-
if not hasattr(self._local, 'vigil'):
61-
self._local.vigil = ThreadIdent.ThreadVigil()
62-
return id(self._local.vigil)
86+
return id(self._make_vigil())
6387

6488
def watch(self, callback):
65-
tid = self.get()
66-
self._refs[tid] = weakref.ref(self._local.vigil, callback)
89+
vigil = self._make_vigil()
90+
self._refs[id(vigil)] = weakref.ref(vigil, callback)
6791

6892

6993
class GreenletIdent(Ident):

test/slow/__init__.py

Whitespace-only changes.

test/slow/test_high_concurrency.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Copyright 2013 10gen, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Test built in connection-pooling with lots of threads."""
16+
17+
import unittest
18+
import sys
19+
20+
sys.path[0:0] = [""]
21+
22+
from test.test_pooling_base import _TestMaxPoolSize
23+
24+
25+
class TestPoolWithLotsOfThreads(_TestMaxPoolSize, unittest.TestCase):
26+
use_greenlets = False
27+
28+
def test_max_pool_size_with_leaked_request_super_massive(self):
29+
# Like test_max_pool_size_with_leaked_request_massive but even more
30+
# threads. Tests that socket reclamation works under high load,
31+
# especially in Python <= 2.7.0. You may need to raise ulimit.
32+
# See http://bugs.python.org/issue1868.
33+
nthreads = 1000
34+
self._test_max_pool_size(
35+
2, 1, max_pool_size=2 * nthreads, nthreads=nthreads)

test/test_pooling_base.py

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ def run_mongo_thread(self):
238238
r.lock.release()
239239
else:
240240
r.lock.release()
241-
r.ready.wait(2) # Wait two seconds
241+
r.ready.wait(timeout=60)
242242
assert r.ready.isSet(), "Rendezvous timed out"
243243

244244
for i in range(self.end_request):
@@ -652,10 +652,10 @@ def leak_request():
652652
# Access the thread local from the main thread to trigger the
653653
# ThreadVigil's delete callback, returning the request socket to
654654
# the pool.
655-
# In Python 2.6 and lesser, a dead thread's locals are deleted
655+
# In Python 2.7.0 and lesser, a dead thread's locals are deleted
656656
# and those locals' weakref callbacks are fired only when another
657-
# thread accesses the locals and finds the thread state is stale.
658-
# This is more or less a bug in Python <= 2.6. Accessing the thread
657+
# thread accesses the locals and finds the thread state is stale,
658+
# see http://bugs.python.org/issue1868. Accessing the thread
659659
# local from the main thread is a necessary part of this test, and
660660
# realistic: in a multithreaded web server a new thread will access
661661
# Pool._ident._local soon after an old thread has died.
@@ -664,20 +664,34 @@ def leak_request():
664664
# Pool reclaimed the socket
665665
self.assertEqual(1, len(cx_pool.sockets))
666666
self.assertEqual(the_sock[0], id(one(cx_pool.sockets).sock))
667+
self.assertEqual(0, len(cx_pool._tid_to_sock))
667668

668669

669670
class _TestMaxPoolSize(_TestPoolingBase):
670671
"""Test that connection pool keeps proper number of idle sockets open,
671672
no matter how start/end_request are called. To be run both with threads and
672673
with greenlets.
673674
"""
674-
def _test_max_pool_size(self, start_request, end_request):
675-
c = self.get_client(max_pool_size=4, auto_start_request=False)
676-
# If you increase nthreads over about 35, note a
677-
# Gevent 0.13.6 bug on Mac, Greenlet.join() hangs if more than
678-
# about 35 Greenlets share a MongoClient. Apparently fixed in
679-
# recent Gevent development.
680-
nthreads = 10
675+
def _test_max_pool_size(
676+
self, start_request, end_request, max_pool_size=4, nthreads=10):
677+
"""Start `nthreads` threads. Each calls start_request `start_request`
678+
times, then find_one and waits at a barrier; once all reach the barrier
679+
each calls end_request `end_request` times. The test asserts that the
680+
pool ends with min(max_pool_size, nthreads) sockets or, if
681+
start_request wasn't called, at least one socket.
682+
683+
This tests both max_pool_size enforcement and that leaked request
684+
sockets are eventually returned to the pool when their threads end.
685+
686+
You may need to increase ulimit -n on Mac.
687+
688+
If you increase nthreads over about 35, note a
689+
Gevent 0.13.6 bug on Mac: Greenlet.join() hangs if more than
690+
about 35 Greenlets share a MongoClient. Apparently fixed in
691+
recent Gevent development.
692+
"""
693+
c = self.get_client(
694+
max_pool_size=max_pool_size, auto_start_request=False)
681695

682696
rendevous = CreateAndReleaseSocket.Rendezvous(
683697
nthreads, self.use_greenlets)
@@ -716,14 +730,27 @@ def _test_max_pool_size(self, start_request, end_request):
716730
the_hub.shutdown()
717731

718732
if start_request:
719-
self.assertEqual(4, len(cx_pool.sockets))
733+
# Trigger final cleanup in Python <= 2.7.0.
734+
cx_pool._ident.get()
735+
736+
expected_idle = min(max_pool_size, nthreads)
737+
message = (
738+
'%d idle sockets (expected %d) and %d request sockets'
739+
' (expected 0)' % (
740+
len(cx_pool.sockets), expected_idle,
741+
len(cx_pool._tid_to_sock)))
742+
743+
self.assertEqual(
744+
expected_idle, len(cx_pool.sockets), message)
720745
else:
721746
# Without calling start_request(), threads can safely share
722747
# sockets; the number running concurrently, and hence the number
723-
# of sockets needed, is between 1 and 10, depending on thread-
724-
# scheduling.
748+
# of sockets needed, is between 1 and
749+
# min(max_pool_size, nthreads), depending on thread-scheduling.
725750
self.assertTrue(len(cx_pool.sockets) >= 1)
726751

752+
self.assertEqual(0, len(cx_pool._tid_to_sock))
753+
727754
def test_max_pool_size(self):
728755
self._test_max_pool_size(0, 0)
729756

@@ -732,13 +759,20 @@ def test_max_pool_size_with_request(self):
732759

733760
def test_max_pool_size_with_redundant_request(self):
734761
self._test_max_pool_size(2, 1)
762+
763+
def test_max_pool_size_with_redundant_request2(self):
735764
self._test_max_pool_size(20, 1)
736765

737766
def test_max_pool_size_with_leaked_request(self):
738767
# Call start_request() but not end_request() -- when threads die, they
739768
# should return their request sockets to the pool.
740769
self._test_max_pool_size(1, 0)
741770

771+
def test_max_pool_size_with_leaked_request_massive(self):
772+
nthreads = 100
773+
self._test_max_pool_size(
774+
2, 1, max_pool_size=2 * nthreads, nthreads=nthreads)
775+
742776
def test_max_pool_size_with_end_request_only(self):
743777
# Call end_request() but not start_request()
744778
self._test_max_pool_size(0, 1)

test/test_thread_util.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ def __init__(self, ident, state):
128128
del t_watched
129129
del t_unwatched
130130

131-
# Accessing the thread-local triggers cleanup in Python <= 2.6
131+
# Trigger final cleanup in Python <= 2.7.0.
132132
# http://bugs.python.org/issue1868
133133
ident.get()
134134
self.assertEqual(3, len(ids))

0 commit comments

Comments
 (0)