Skip to content

Commit 2c18a42

Browse files
committed
Using option for the timeout, letting connections declare their event loops and execution contexts
1 parent 73d9ade commit 2c18a42

File tree

8 files changed

+63
-35
lines changed

8 files changed

+63
-35
lines changed

db-async-common/src/main/scala/com/github/mauricio/async/db/Configuration.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ package com.github.mauricio.async.db
1818

1919
import java.nio.charset.Charset
2020

21-
import io.netty.buffer.{AbstractByteBufAllocator, PooledByteBufAllocator}
21+
import io.netty.buffer.{ByteBufAllocator, PooledByteBufAllocator}
2222
import io.netty.util.CharsetUtil
2323

2424
import scala.concurrent.duration._
@@ -43,6 +43,11 @@ object Configuration {
4343
* OOM or eternal loop attacks the client could have, defaults to 16 MB. You can set this
4444
* to any value you would like but again, make sure you know what you are doing if you do
4545
* change it.
46+
* @param allocator the netty buffer allocator to be used
47+
* @param connectTimeout the timeout for connecting to servers
48+
* @param testTimeout the timeout for connection tests performed by pools
49+
* @param queryTimeout the optional query timeout
50+
*
4651
*/
4752

4853
case class Configuration(username: String,
@@ -52,7 +57,7 @@ case class Configuration(username: String,
5257
database: Option[String] = None,
5358
charset: Charset = Configuration.DefaultCharset,
5459
maximumMessageSize: Int = 16777216,
55-
allocator: AbstractByteBufAllocator = PooledByteBufAllocator.DEFAULT,
60+
allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT,
5661
connectTimeout: Duration = 5.seconds,
5762
testTimeout: Duration = 5.seconds,
58-
queryTimeout: Duration = Duration.Inf)
63+
queryTimeout: Option[Duration] = None)

db-async-common/src/main/scala/com/github/mauricio/async/db/pool/TimeoutScheduler.scala

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,36 +2,62 @@ package com.github.mauricio.async.db.pool
22

33
import java.util.concurrent.atomic.AtomicBoolean
44
import java.util.concurrent.{TimeUnit, TimeoutException, ScheduledFuture}
5-
import com.github.mauricio.async.db.util.NettyUtils
5+
import io.netty.channel.EventLoopGroup
66
import scala.concurrent.{ExecutionContext, Promise}
77
import scala.concurrent.duration.Duration
88

99
trait TimeoutScheduler {
10-
implicit val internalPool: ExecutionContext
10+
11+
private var isTimeoutedBool = new AtomicBoolean(false)
12+
13+
/**
14+
*
15+
* The event loop group to be used for scheduling.
16+
*
17+
* @return
18+
*/
19+
20+
def eventLoopGroup : EventLoopGroup
21+
22+
/**
23+
* Implementors should decide here what they want to do when a timeout occur
24+
*/
25+
1126
def onTimeout // implementors should decide here what they want to do when a timeout occur
12-
private var isTimeoutedBool = new AtomicBoolean(false);
13-
def isTimeouted = isTimeoutedBool.get // We need this property as isClosed takes time to complete and
14-
// we don't want the connection to be used again.
1527

16-
def addTimeout[A](promise: Promise[A], duration: Duration) : Option[ScheduledFuture[_]] = {
17-
if (duration != Duration.Inf) {
18-
val scheduledFuture = schedule(
28+
/**
29+
*
30+
* We need this property as isClosed takes time to complete and
31+
* we don't want the connection to be used again.
32+
*
33+
* @return
34+
*/
35+
36+
def isTimeouted : Boolean =
37+
isTimeoutedBool.get
38+
39+
def addTimeout[A](
40+
promise: Promise[A],
41+
durationOption: Option[Duration])
42+
(implicit executionContext : ExecutionContext) : Option[ScheduledFuture[_]] = {
43+
durationOption.map {
44+
duration =>
45+
val scheduledFuture = schedule(
1946
{
2047
if (promise.tryFailure(new TimeoutException(s"Operation is timeouted after it took too long to return (${duration})"))) {
2148
isTimeoutedBool.set(true)
2249
onTimeout
2350
}
2451
},
2552
duration)
26-
promise.future.onComplete(x => scheduledFuture.cancel(false))
53+
promise.future.onComplete(x => scheduledFuture.cancel(false))
2754

28-
return Some(scheduledFuture)
55+
scheduledFuture
2956
}
30-
return None
3157
}
3258

3359
def schedule(block: => Unit, duration: Duration) : ScheduledFuture[_] =
34-
NettyUtils.DefaultEventLoopGroup.schedule(new Runnable {
60+
eventLoopGroup.schedule(new Runnable {
3561
override def run(): Unit = block
3662
}, duration.toMillis, TimeUnit.MILLISECONDS)
3763
}

db-async-common/src/test/scala/com/github/mauricio/async/db/pool/DummyTimeoutScheduler.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
package com.github.mauricio.async.db.pool
1818

1919
import java.util.concurrent.atomic.AtomicInteger
20-
import com.github.mauricio.async.db.util.ExecutorServiceUtils
20+
import com.github.mauricio.async.db.util.{NettyUtils, ExecutorServiceUtils}
21+
import io.netty.channel.EventLoopGroup
22+
2123
/**
2224
* Implementation of TimeoutScheduler used for testing
2325
*/
@@ -26,4 +28,5 @@ class DummyTimeoutScheduler extends TimeoutScheduler {
2628
private val timeOuts = new AtomicInteger
2729
override def onTimeout = timeOuts.incrementAndGet
2830
def timeoutCount = timeOuts.get()
31+
def eventLoopGroup : EventLoopGroup = NettyUtils.DefaultEventLoopGroup
2932
}

db-async-common/src/test/scala/com/github/mauricio/async/db/pool/TimeoutSchedulerSpec.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,10 @@ class TimeoutSchedulerSpec extends SpecificationWithJUnit {
2828

2929
val TIMEOUT_DID_NOT_PASS = "timeout did not pass"
3030

31-
3231
"test timeout did not pass" in {
3332
val timeoutScheduler = new DummyTimeoutScheduler()
3433
val promise = Promise[String]()
35-
val scheduledFuture = timeoutScheduler.addTimeout(promise,Duration(1000, MILLISECONDS))
34+
val scheduledFuture = timeoutScheduler.addTimeout(promise,Some(Duration(1000, MILLISECONDS)))
3635
Thread.sleep(100);
3736
promise.isCompleted === false
3837
promise.success(TIMEOUT_DID_NOT_PASS)
@@ -46,7 +45,7 @@ class TimeoutSchedulerSpec extends SpecificationWithJUnit {
4645
val timeoutMillis = 100
4746
val promise = Promise[String]()
4847
val timeoutScheduler = new DummyTimeoutScheduler()
49-
val scheduledFuture = timeoutScheduler.addTimeout(promise,Duration(timeoutMillis, MILLISECONDS))
48+
val scheduledFuture = timeoutScheduler.addTimeout(promise,Some(Duration(timeoutMillis, MILLISECONDS)))
5049
Thread.sleep(1000)
5150
promise.isCompleted === true
5251
scheduledFuture.get.isCancelled === false
@@ -55,11 +54,10 @@ class TimeoutSchedulerSpec extends SpecificationWithJUnit {
5554
promise.future.value.get.get must throwA[TimeoutException](message = s"Operation is timeouted after it took too long to return \\(${timeoutMillis} milliseconds\\)")
5655
}
5756

58-
5957
"test no timeout" in {
6058
val timeoutScheduler = new DummyTimeoutScheduler()
6159
val promise = Promise[String]()
62-
val scheduledFuture = timeoutScheduler.addTimeout(promise,Duration.Inf)
60+
val scheduledFuture = timeoutScheduler.addTimeout(promise,None)
6361
Thread.sleep(1000)
6462
scheduledFuture === None
6563
promise.isCompleted === false

mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/MySQLConnection.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class MySQLConnection(
4343
configuration: Configuration,
4444
charsetMapper: CharsetMapper = CharsetMapper.Instance,
4545
group : EventLoopGroup = NettyUtils.DefaultEventLoopGroup,
46-
executionContext : ExecutionContext = ExecutorServiceUtils.CachedExecutionContext
46+
implicit val executionContext : ExecutionContext = ExecutorServiceUtils.CachedExecutionContext
4747
)
4848
extends MySQLHandlerDelegate
4949
with Connection
@@ -55,10 +55,8 @@ class MySQLConnection(
5555
// validate that this charset is supported
5656
charsetMapper.toInt(configuration.charset)
5757

58-
5958
private final val connectionCount = MySQLConnection.Counter.incrementAndGet()
6059
private final val connectionId = s"[mysql-connection-$connectionCount]"
61-
override implicit val internalPool = executionContext
6260

6361
private final val connectionHandler = new MySQLConnectionHandler(
6462
configuration,
@@ -80,6 +78,8 @@ class MySQLConnection(
8078
def lastException : Throwable = this._lastException
8179
def count : Long = this.connectionCount
8280

81+
override def eventLoopGroup : EventLoopGroup = group
82+
8383
def connect: Future[Connection] = {
8484
this.connectionHandler.connect.onFailure {
8585
case e => this.connectionPromise.tryFailure(e)

mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/QueryTimeoutSpec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class QueryTimeoutSpec extends Specification with ConnectionHelper {
6666
port = 3306,
6767
password = Some("root"),
6868
database = Some("mysql_async_tests"),
69-
queryTimeout = Duration(1,NANOSECONDS)
69+
queryTimeout = Some(Duration(1,NANOSECONDS))
7070
)
7171

7272
def longTimeoutConfiguration = new Configuration(
@@ -75,6 +75,6 @@ class QueryTimeoutSpec extends Specification with ConnectionHelper {
7575
port = 3306,
7676
password = Some("root"),
7777
database = Some("mysql_async_tests"),
78-
queryTimeout = Duration(5,SECONDS)
78+
queryTimeout = Some(Duration(5,SECONDS))
7979
)
8080
}

postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnection.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class PostgreSQLConnection
4646
encoderRegistry: ColumnEncoderRegistry = PostgreSQLColumnEncoderRegistry.Instance,
4747
decoderRegistry: ColumnDecoderRegistry = PostgreSQLColumnDecoderRegistry.Instance,
4848
group : EventLoopGroup = NettyUtils.DefaultEventLoopGroup,
49-
executionContext : ExecutionContext = ExecutorServiceUtils.CachedExecutionContext
49+
implicit val executionContext : ExecutionContext = ExecutorServiceUtils.CachedExecutionContext
5050
)
5151
extends PostgreSQLConnectionDelegate
5252
with Connection
@@ -65,7 +65,6 @@ class PostgreSQLConnection
6565

6666
private final val currentCount = Counter.incrementAndGet()
6767
private final val preparedStatementsCounter = new AtomicInteger()
68-
override implicit val internalPool = executionContext
6968

7069
private val parameterStatus = new scala.collection.mutable.HashMap[String, String]()
7170
private val parsedStatements = new scala.collection.mutable.HashMap[String, PreparedStatementHolder]()
@@ -82,6 +81,7 @@ class PostgreSQLConnection
8281

8382
private var queryResult: Option[QueryResult] = None
8483

84+
override def eventLoopGroup : EventLoopGroup = group
8585
def isReadyForQuery: Boolean = this.queryPromise.isEmpty
8686

8787
def connect: Future[Connection] = {

postgresql-async/src/test/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnectionSpec.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -285,16 +285,12 @@ class PostgreSQLConnectionSpec extends Specification with DatabaseTestHelper {
285285
try {
286286
withHandler(configuration, {
287287
handler =>
288-
executeQuery(handler, "SELECT 0")
289-
throw new IllegalStateException("should not have come here")
288+
val result = executeQuery(handler, "SELECT 0")
289+
throw new IllegalStateException("should not have arrived")
290290
})
291291
} catch {
292-
case e: GenericDatabaseException => {
292+
case e: GenericDatabaseException =>
293293
e.errorMessage.fields(InformationMessage.Routine) === "auth_failed"
294-
}
295-
case e: Exception => {
296-
throw new IllegalStateException("should not have come here")
297-
}
298294
}
299295

300296
}

0 commit comments

Comments
 (0)