Skip to content
This repository was archived by the owner on Jun 16, 2023. It is now read-only.

Commit 6db0d9c

Browse files
committed
Merge pull request mauricio#88 from fwbrasil/partitioned-pool
partitioned db pool
2 parents 80f2c6d + 249f65d commit 6db0d9c

File tree

3 files changed

+386
-0
lines changed

3 files changed

+386
-0
lines changed
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package com.github.mauricio.async.db.pool
2+
3+
import scala.concurrent.Future
4+
import com.github.mauricio.async.db.util.ExecutorServiceUtils
5+
import scala.concurrent.Promise
6+
import java.util.concurrent.ConcurrentHashMap
7+
import scala.util.Success
8+
import scala.util.Failure
9+
10+
class PartitionedAsyncObjectPool[T](
11+
factory: ObjectFactory[T],
12+
configuration: PoolConfiguration,
13+
numberOfPartitions: Int)
14+
extends AsyncObjectPool[T] {
15+
16+
import ExecutorServiceUtils.CachedExecutionContext
17+
18+
private val pools =
19+
(0 until numberOfPartitions)
20+
.map(_ -> new SingleThreadedAsyncObjectPool(factory, partitionConfig))
21+
.toMap
22+
23+
private val checkouts = new ConcurrentHashMap[T, SingleThreadedAsyncObjectPool[T]]
24+
25+
def take: Future[T] = {
26+
val pool = currentPool
27+
pool.take.andThen {
28+
case Success(conn) =>
29+
checkouts.put(conn, pool)
30+
case Failure(_) =>
31+
}
32+
}
33+
34+
def giveBack(item: T) =
35+
checkouts
36+
.remove(item)
37+
.giveBack(item)
38+
.map(_ => this)
39+
40+
def close =
41+
Future.sequence(pools.values.map(_.close)).map {
42+
_ => this
43+
}
44+
45+
def availables: Traversable[T] = pools.values.map(_.availables).flatten
46+
47+
def inUse: Traversable[T] = pools.values.map(_.inUse).flatten
48+
49+
def queued: Traversable[Promise[T]] = pools.values.map(_.queued).flatten
50+
51+
protected def isClosed =
52+
pools.values.forall(_.isClosed)
53+
54+
private def currentPool =
55+
pools(currentThreadAffinity)
56+
57+
private def currentThreadAffinity =
58+
(Thread.currentThread.getId % numberOfPartitions).toInt
59+
60+
private def partitionConfig =
61+
configuration.copy(
62+
maxObjects = configuration.maxObjects / numberOfPartitions,
63+
maxQueueSize = configuration.maxQueueSize / numberOfPartitions
64+
)
65+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.github.mauricio.async.db.pool;
2+
3+
import com.github.mauricio.async.db.util.ExecutorServiceUtils
4+
import com.github.mauricio.async.db.{ QueryResult, Connection }
5+
import scala.concurrent.{ ExecutionContext, Future }
6+
7+
class PartitionedConnectionPool[T <: Connection](
8+
factory: ObjectFactory[T],
9+
configuration: PoolConfiguration,
10+
numberOfPartitions: Int,
11+
executionContext: ExecutionContext = ExecutorServiceUtils.CachedExecutionContext)
12+
extends PartitionedAsyncObjectPool[T](factory, configuration, numberOfPartitions)
13+
with Connection {
14+
15+
def disconnect: Future[Connection] = if (this.isConnected) {
16+
this.close.map(item => this)(executionContext)
17+
} else {
18+
Future.successful(this)
19+
}
20+
21+
def connect: Future[Connection] = Future.successful(this)
22+
23+
def isConnected: Boolean = !this.isClosed
24+
25+
def sendQuery(query: String): Future[QueryResult] =
26+
this.use(_.sendQuery(query))(executionContext)
27+
28+
def sendPreparedStatement(query: String, values: Seq[Any] = List()): Future[QueryResult] =
29+
this.use(_.sendPreparedStatement(query, values))(executionContext)
30+
31+
override def inTransaction[A](f: Connection => Future[A])(implicit context: ExecutionContext = executionContext): Future[A] =
32+
this.use(_.inTransaction[A](f)(context))(executionContext)
33+
}
Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
package com.github.mauricio.async.db.pool
2+
3+
import org.specs2.mutable.Specification
4+
import scala.util.Try
5+
import scala.concurrent.Await
6+
import scala.concurrent.duration._
7+
import scala.concurrent.Future
8+
import org.specs2.mutable.SpecificationWithJUnit
9+
import language.reflectiveCalls
10+
import com.github.mauricio.async.db.util.ExecutorServiceUtils
11+
import scala.concurrent.ExecutionContext
12+
import java.util.concurrent.Executors
13+
14+
class PartitionedAsyncObjectPoolSpec extends SpecificationWithJUnit {
15+
isolated
16+
sequential
17+
18+
val config =
19+
PoolConfiguration(100, Long.MaxValue, 100, Int.MaxValue)
20+
21+
val factory = new ObjectFactory[Int] {
22+
var reject = Set[Int]()
23+
var failCreate = false
24+
private var current = 0
25+
def create =
26+
if (failCreate)
27+
throw new IllegalStateException
28+
else {
29+
current += 1
30+
current
31+
}
32+
def destroy(item: Int) = {}
33+
def validate(item: Int) =
34+
Try {
35+
if (reject.contains(item))
36+
throw new IllegalStateException
37+
else item
38+
}
39+
}
40+
41+
val pool = new PartitionedAsyncObjectPool(factory, config, 2)
42+
def maxObjects = config.maxObjects / 2
43+
def maxIdle = config.maxIdle / 2
44+
def maxQueueSize = config.maxQueueSize / 2
45+
46+
"pool contents" >> {
47+
48+
"before exceed maxObjects" >> {
49+
50+
"take one element" in {
51+
takeAndWait(1)
52+
53+
pool.inUse.size mustEqual 1
54+
pool.queued.size mustEqual 0
55+
pool.availables.size mustEqual 0
56+
}
57+
58+
"take one element and return it invalid" in {
59+
takeAndWait(1)
60+
factory.reject += 1
61+
62+
await(pool.giveBack(1)) must throwA[IllegalStateException]
63+
64+
pool.inUse.size mustEqual 0
65+
pool.queued.size mustEqual 0
66+
pool.availables.size mustEqual 0
67+
}
68+
69+
"take one failed element" in {
70+
factory.failCreate = true
71+
takeAndWait(1) must throwA[IllegalStateException]
72+
73+
pool.inUse.size mustEqual 0
74+
pool.queued.size mustEqual 0
75+
pool.availables.size mustEqual 0
76+
}
77+
78+
"take maxObjects" in {
79+
takeAndWait(maxObjects)
80+
81+
pool.inUse.size mustEqual maxObjects
82+
pool.queued.size mustEqual 0
83+
pool.availables.size mustEqual 0
84+
}
85+
86+
"take maxObjects - 1 and take one failed" in {
87+
takeAndWait(maxObjects - 1)
88+
89+
factory.failCreate = true
90+
takeAndWait(1) must throwA[IllegalStateException]
91+
92+
pool.inUse.size mustEqual maxObjects - 1
93+
pool.queued.size mustEqual 0
94+
pool.availables.size mustEqual 0
95+
}
96+
97+
"take maxObjects and receive one back" in {
98+
takeAndWait(maxObjects)
99+
await(pool.giveBack(1))
100+
101+
pool.inUse.size mustEqual maxObjects - 1
102+
pool.queued.size mustEqual 0
103+
pool.availables.size mustEqual 1
104+
}
105+
106+
"take maxObjects and receive one invalid back" in {
107+
takeAndWait(maxObjects)
108+
factory.reject += 1
109+
await(pool.giveBack(1)) must throwA[IllegalStateException]
110+
111+
pool.inUse.size mustEqual maxObjects - 1
112+
pool.queued.size mustEqual 0
113+
pool.availables.size mustEqual 0
114+
}
115+
}
116+
117+
"after exceed maxObjects" >> {
118+
119+
takeAndWait(maxObjects)
120+
121+
"before exceed maxQueueSize" >> {
122+
123+
"one take queued" in {
124+
pool.take
125+
126+
pool.inUse.size mustEqual maxObjects
127+
pool.queued.size mustEqual 1
128+
pool.availables.size mustEqual 0
129+
}
130+
131+
"one take queued and receive one item back" in {
132+
val taking = pool.take
133+
134+
await(pool.giveBack(1))
135+
136+
await(taking) mustEqual 1
137+
pool.inUse.size mustEqual maxObjects
138+
pool.queued.size mustEqual 0
139+
pool.availables.size mustEqual 0
140+
}
141+
142+
"one take queued and receive one invalid item back" in {
143+
val taking = pool.take
144+
factory.reject += 1
145+
await(pool.giveBack(1)) must throwA[IllegalStateException]
146+
147+
pool.inUse.size mustEqual maxObjects - 1
148+
pool.queued.size mustEqual 1
149+
pool.availables.size mustEqual 0
150+
}
151+
152+
"maxQueueSize takes queued" in {
153+
for (_ <- 0 until maxQueueSize)
154+
pool.take
155+
156+
pool.inUse.size mustEqual maxObjects
157+
pool.queued.size mustEqual maxQueueSize
158+
pool.availables.size mustEqual 0
159+
}
160+
161+
"maxQueueSize takes queued and receive one back" in {
162+
val taking = pool.take
163+
for (_ <- 0 until maxQueueSize - 1)
164+
pool.take
165+
166+
await(pool.giveBack(10))
167+
168+
await(taking) mustEqual 10
169+
pool.inUse.size mustEqual maxObjects
170+
pool.queued.size mustEqual maxQueueSize - 1
171+
pool.availables.size mustEqual 0
172+
}
173+
174+
"maxQueueSize takes queued and receive one invalid back" in {
175+
for (_ <- 0 until maxQueueSize)
176+
pool.take
177+
178+
factory.reject += 11
179+
await(pool.giveBack(11)) must throwA[IllegalStateException]
180+
181+
pool.inUse.size mustEqual maxObjects - 1
182+
pool.queued.size mustEqual maxQueueSize
183+
pool.availables.size mustEqual 0
184+
}
185+
}
186+
187+
"after exceed maxQueueSize" >> {
188+
189+
for (_ <- 0 until maxQueueSize)
190+
pool.take
191+
192+
"start to reject takes" in {
193+
await(pool.take) must throwA[PoolExhaustedException]
194+
195+
pool.inUse.size mustEqual maxObjects
196+
pool.queued.size mustEqual maxQueueSize
197+
pool.availables.size mustEqual 0
198+
}
199+
200+
"receive an object back" in {
201+
await(pool.giveBack(1))
202+
203+
pool.inUse.size mustEqual maxObjects
204+
pool.queued.size mustEqual maxQueueSize - 1
205+
pool.availables.size mustEqual 0
206+
}
207+
208+
"receive an invalid object back" in {
209+
factory.reject += 1
210+
await(pool.giveBack(1)) must throwA[IllegalStateException]
211+
212+
pool.inUse.size mustEqual maxObjects - 1
213+
pool.queued.size mustEqual maxQueueSize
214+
pool.availables.size mustEqual 0
215+
}
216+
217+
"receive maxQueueSize objects back" in {
218+
for (i <- 1 to maxQueueSize)
219+
await(pool.giveBack(i))
220+
221+
pool.inUse.size mustEqual maxObjects
222+
pool.queued.size mustEqual 0
223+
pool.availables.size mustEqual 0
224+
}
225+
226+
"receive maxQueueSize invalid objects back" in {
227+
for (i <- 1 to maxQueueSize) {
228+
factory.reject += i
229+
await(pool.giveBack(i)) must throwA[IllegalStateException]
230+
}
231+
232+
pool.inUse.size mustEqual maxObjects - maxQueueSize
233+
pool.queued.size mustEqual maxQueueSize
234+
pool.availables.size mustEqual 0
235+
}
236+
237+
"receive maxQueueSize + 1 object back" in {
238+
for (i <- 1 to maxQueueSize)
239+
await(pool.giveBack(i))
240+
241+
await(pool.giveBack(1))
242+
pool.inUse.size mustEqual maxObjects - 1
243+
pool.queued.size mustEqual 0
244+
pool.availables.size mustEqual 1
245+
}
246+
247+
"receive maxQueueSize + 1 invalid object back" in {
248+
for (i <- 1 to maxQueueSize)
249+
await(pool.giveBack(i))
250+
251+
factory.reject += 1
252+
await(pool.giveBack(1)) must throwA[IllegalStateException]
253+
pool.inUse.size mustEqual maxObjects - 1
254+
pool.queued.size mustEqual 0
255+
pool.availables.size mustEqual 0
256+
}
257+
}
258+
}
259+
}
260+
261+
"gives back the connection to the original pool" in {
262+
val executor = Executors.newFixedThreadPool(20)
263+
implicit val context = ExecutionContext.fromExecutor(executor)
264+
265+
val takes =
266+
for (_ <- 0 until 30) yield {
267+
Future().flatMap(_ => pool.take)
268+
}
269+
val takesAndReturns =
270+
Future.sequence(takes).flatMap { items =>
271+
Future.sequence(items.map(pool.giveBack))
272+
}
273+
274+
await(takesAndReturns)
275+
276+
executor.shutdown
277+
pool.inUse.size mustEqual 0
278+
pool.queued.size mustEqual 0
279+
pool.availables.size mustEqual 30
280+
}
281+
282+
private def takeAndWait(objects: Int) =
283+
for (_ <- 0 until objects)
284+
await(pool.take)
285+
286+
private def await[T](future: Future[T]) =
287+
Await.result(future, Duration.Inf)
288+
}

0 commit comments

Comments
 (0)