1+ from threading import *
2+ from collections import deque
3+ from time import sleep
4+
5+ def _test ():
6+
7+ class BoundedQueue ():
8+
9+ def __init__ (self , limit ):
10+ self .mon = RLock ()
11+ self .rc = Condition (self .mon )
12+ self .wc = Condition (self .mon )
13+ self .limit = limit
14+ self .queue = deque ()
15+
16+ def put (self , item ):
17+ self .mon .acquire ()
18+ while len (self .queue ) >= self .limit :
19+ self ._note ("put(%s): queue full" , item )
20+ self .wc .wait ()
21+ self .queue .append (item )
22+ self ._note ("put(%s): appended, length now %d" ,
23+ item , len (self .queue ))
24+ self .rc .notify ()
25+ self .mon .release ()
26+
27+ def get (self ):
28+ self .mon .acquire ()
29+ while not self .queue :
30+ self ._note ("get(): queue empty" )
31+ self .rc .wait ()
32+ item = self .queue .popleft ()
33+ self ._note ("get(): got %s, %d left" , item , len (self .queue ))
34+ self .wc .notify ()
35+ self .mon .release ()
36+ return item
37+
38+ def _note (self , format , * args ):
39+ format = format % args
40+ ident = get_ident ()
41+ try :
42+ name = current_thread ().name
43+ except KeyError :
44+ name = "<OS thread %d>" % ident
45+ format = "%s: %s" % (name , format )
46+ print (format )
47+
48+ class ProducerThread (Thread ):
49+
50+ def __init__ (self , queue , quota ):
51+ Thread .__init__ (self , name = "Producer" )
52+ self .queue = queue
53+ self .quota = quota
54+
55+ def run (self ):
56+ from random import random
57+ counter = 0
58+ while counter < self .quota :
59+ counter = counter + 1
60+ self .queue .put ("%s.%d" % (self .name , counter ))
61+ sleep (random () * 0.00001 )
62+
63+
64+ class ConsumerThread (Thread ):
65+
66+ def __init__ (self , queue , count ):
67+ Thread .__init__ (self , name = "Consumer" )
68+ self .queue = queue
69+ self .count = count
70+
71+ def run (self ):
72+ while self .count > 0 :
73+ item = self .queue .get ()
74+ print (item )
75+ self .count = self .count - 1
76+
77+ NP = 3
78+ QL = 4
79+ NI = 5
80+
81+ Q = BoundedQueue (QL )
82+ P = []
83+ for i in range (NP ):
84+ t = ProducerThread (Q , NI )
85+ t .name = ("Producer-%d" % (i + 1 ))
86+ P .append (t )
87+ C = ConsumerThread (Q , NI * NP )
88+ for t in P :
89+ t .start ()
90+ sleep (0.000001 )
91+ C .start ()
92+ for t in P :
93+ t .join ()
94+ C .join ()
95+
96+ _test ()
0 commit comments