Skip to content

Commit 77226d8

Browse files
committed
Add timeout support for queries
fixes on timeout scheduler fixes on timeout scheduler fixes on timeout scheduler styling issues Improve unit test
1 parent 19d7269 commit 77226d8

File tree

11 files changed

+259
-11
lines changed

11 files changed

+259
-11
lines changed

db-async-common/src/main/scala/com/github/mauricio/async/db/Configuration.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,5 @@ case class Configuration(username: String,
5454
maximumMessageSize: Int = 16777216,
5555
allocator: AbstractByteBufAllocator = PooledByteBufAllocator.DEFAULT,
5656
connectTimeout: Duration = 5.seconds,
57-
testTimeout: Duration = 5.seconds)
57+
testTimeout: Duration = 5.seconds,
58+
queryTimeout: Duration = Duration.Inf)
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package com.github.mauricio.async.db.exceptions
2+
3+
import com.github.mauricio.async.db.Connection
4+
5+
class ConnectionTimeoutedException( val connection : Connection )
6+
extends DatabaseException( "The connection %s has a timeouted query and is being closed".format(connection) )
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.github.mauricio.async.db.pool
2+
3+
import java.util.concurrent.atomic.AtomicBoolean
4+
import java.util.concurrent.{TimeUnit, TimeoutException, ScheduledFuture}
5+
import com.github.mauricio.async.db.util.NettyUtils
6+
import scala.concurrent.{ExecutionContext, Promise}
7+
import scala.concurrent.duration.Duration
8+
9+
trait TimeoutScheduler {
10+
implicit val internalPool: ExecutionContext
11+
def onTimeout // implementors should decide here what they want to do when a timeout occur
12+
private var isTimeoutedBool = new AtomicBoolean(false);
13+
def isTimeouted = isTimeoutedBool.get // We need this property as isClosed takes time to complete and
14+
// we don't want the connection to be used again.
15+
16+
def addTimeout[A](promise: Promise[A], duration: Duration) : Option[ScheduledFuture[_]] = {
17+
if (duration != Duration.Inf) {
18+
val scheduledFuture = schedule(
19+
{
20+
if (promise.tryFailure(new TimeoutException(s"Operation is timeouted after it took too long to return (${duration})"))) {
21+
isTimeoutedBool.set(true)
22+
onTimeout
23+
}
24+
},
25+
duration)
26+
promise.future.onComplete(x => scheduledFuture.cancel(false))
27+
28+
return Some(scheduledFuture)
29+
}
30+
return None
31+
}
32+
33+
def schedule(block: => Unit, duration: Duration) : ScheduledFuture[_] =
34+
NettyUtils.DefaultEventLoopGroup.schedule(new Runnable {
35+
override def run(): Unit = block
36+
}, duration.toMillis, TimeUnit.MILLISECONDS)
37+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2013 Maurício Linhares
3+
*
4+
* Maurício Linhares licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
package com.github.mauricio.async.db.pool
18+
19+
import java.util.concurrent.atomic.AtomicInteger
20+
import com.github.mauricio.async.db.util.ExecutorServiceUtils
21+
/**
22+
* Implementation of TimeoutScheduler used for testing
23+
*/
24+
class DummyTimeoutScheduler extends TimeoutScheduler {
25+
implicit val internalPool = ExecutorServiceUtils.CachedExecutionContext
26+
private val timeOuts = new AtomicInteger
27+
override def onTimeout = timeOuts.incrementAndGet
28+
def timeoutCount = timeOuts.get()
29+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright 2013 Maurício Linhares
3+
*
4+
* Maurício Linhares licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
package com.github.mauricio.async.db.pool
17+
18+
import java.util.concurrent.{ScheduledFuture, TimeoutException}
19+
import com.github.mauricio.async.db.util.{ByteBufferUtils, ExecutorServiceUtils}
20+
import org.specs2.mutable.SpecificationWithJUnit
21+
import scala.concurrent.duration._
22+
import scala.concurrent.{Future, Promise}
23+
24+
/**
25+
* Tests for TimeoutScheduler
26+
*/
27+
class TimeoutSchedulerSpec extends SpecificationWithJUnit {
28+
29+
val TIMEOUT_DID_NOT_PASS = "timeout did not pass"
30+
31+
32+
"test timeout did not pass" in {
33+
val timeoutScheduler = new DummyTimeoutScheduler()
34+
val promise = Promise[String]()
35+
val scheduledFuture = timeoutScheduler.addTimeout(promise,Duration(1000, MILLISECONDS))
36+
Thread.sleep(100);
37+
promise.isCompleted === false
38+
promise.success(TIMEOUT_DID_NOT_PASS)
39+
Thread.sleep(1500)
40+
promise.future.value.get.get === TIMEOUT_DID_NOT_PASS
41+
scheduledFuture.get.isCancelled === true
42+
timeoutScheduler.timeoutCount === 0
43+
}
44+
45+
"test timeout passed" in {
46+
val timeoutMillis = 100
47+
val promise = Promise[String]()
48+
val timeoutScheduler = new DummyTimeoutScheduler()
49+
val scheduledFuture = timeoutScheduler.addTimeout(promise,Duration(timeoutMillis, MILLISECONDS))
50+
Thread.sleep(1000)
51+
promise.isCompleted === true
52+
scheduledFuture.get.isCancelled === false
53+
promise.trySuccess(TIMEOUT_DID_NOT_PASS)
54+
timeoutScheduler.timeoutCount === 1
55+
promise.future.value.get.get must throwA[TimeoutException](message = s"Operation is timeouted after it took too long to return \\(${timeoutMillis} milliseconds\\)")
56+
}
57+
58+
59+
"test no timeout" in {
60+
val timeoutScheduler = new DummyTimeoutScheduler()
61+
val promise = Promise[String]()
62+
val scheduledFuture = timeoutScheduler.addTimeout(promise,Duration.Inf)
63+
Thread.sleep(1000)
64+
scheduledFuture === None
65+
promise.isCompleted === false
66+
promise.success(TIMEOUT_DID_NOT_PASS)
67+
promise.future.value.get.get === TIMEOUT_DID_NOT_PASS
68+
timeoutScheduler.timeoutCount === 0
69+
}
70+
}
71+

mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/MySQLConnection.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import com.github.mauricio.async.db.mysql.exceptions.MySQLException
2525
import com.github.mauricio.async.db.mysql.message.client._
2626
import com.github.mauricio.async.db.mysql.message.server._
2727
import com.github.mauricio.async.db.mysql.util.CharsetMapper
28+
import com.github.mauricio.async.db.pool.TimeoutScheduler
2829
import com.github.mauricio.async.db.util.ChannelFutureTransformer.toFuture
2930
import com.github.mauricio.async.db.util._
3031
import io.netty.channel.{ChannelHandlerContext, EventLoopGroup}
@@ -46,6 +47,7 @@ class MySQLConnection(
4647
)
4748
extends MySQLHandlerDelegate
4849
with Connection
50+
with TimeoutScheduler
4951
{
5052

5153
import MySQLConnection.log
@@ -56,7 +58,7 @@ class MySQLConnection(
5658

5759
private final val connectionCount = MySQLConnection.Counter.incrementAndGet()
5860
private final val connectionId = s"[mysql-connection-$connectionCount]"
59-
private implicit val internalPool = executionContext
61+
override implicit val internalPool = executionContext
6062

6163
private final val connectionHandler = new MySQLConnectionHandler(
6264
configuration,
@@ -188,7 +190,7 @@ class MySQLConnection(
188190
val promise = Promise[QueryResult]()
189191
this.setQueryPromise(promise)
190192
this.connectionHandler.write(new QueryMessage(query))
191-
193+
addTimeout(promise, configuration.queryTimeout)
192194
promise.future
193195
}
194196

@@ -224,6 +226,7 @@ class MySQLConnection(
224226
}
225227

226228
def disconnect: Future[Connection] = this.close
229+
override def onTimeout = disconnect
227230

228231
def isConnected: Boolean = this.connectionHandler.isConnected
229232

@@ -236,7 +239,7 @@ class MySQLConnection(
236239
val promise = Promise[QueryResult]()
237240
this.setQueryPromise(promise)
238241
this.connectionHandler.sendPreparedStatement(query, values)
239-
242+
addTimeout(promise,configuration.queryTimeout)
240243
promise.future
241244
}
242245

mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/pool/MySQLConnectionFactory.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@ import com.github.mauricio.async.db.pool.ObjectFactory
2121
import com.github.mauricio.async.db.mysql.MySQLConnection
2222
import scala.util.Try
2323
import scala.concurrent.Await
24-
import scala.concurrent.duration._
2524
import com.github.mauricio.async.db.util.Log
26-
import com.github.mauricio.async.db.exceptions.{ConnectionStillRunningQueryException, ConnectionNotConnectedException}
25+
import com.github.mauricio.async.db.exceptions.{ConnectionTimeoutedException, ConnectionStillRunningQueryException, ConnectionNotConnectedException}
2726

2827
object MySQLConnectionFactory {
2928
final val log = Log.get[MySQLConnectionFactory]
@@ -90,7 +89,9 @@ class MySQLConnectionFactory( configuration : Configuration ) extends ObjectFact
9089
*/
9190
def validate(item: MySQLConnection): Try[MySQLConnection] = {
9291
Try{
93-
92+
if ( item.isTimeouted ) {
93+
throw new ConnectionTimeoutedException(item)
94+
}
9495
if ( !item.isConnected ) {
9596
throw new ConnectionNotConnectedException(item)
9697
}

mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/ConnectionHelper.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,19 @@ trait ConnectionHelper {
115115

116116
}
117117

118+
def withConfigurablePool[T]( configuration : Configuration )( fn : (ConnectionPool[MySQLConnection]) => T ) : T = {
119+
120+
val factory = new MySQLConnectionFactory(configuration)
121+
val pool = new ConnectionPool[MySQLConnection](factory, PoolConfiguration.Default)
122+
123+
try {
124+
fn(pool)
125+
} finally {
126+
awaitFuture( pool.close )
127+
}
128+
129+
}
130+
118131
def withConnection[T]( fn : (MySQLConnection) => T ) : T =
119132
withConfigurableConnection(this.defaultConfiguration)(fn)
120133

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2013 Maurício Linhares
3+
*
4+
* Maurício Linhares licenses this file to you under the Apache License,
5+
* version 2.0 (the "License"); you may not use this file except in compliance
6+
* with the License. You may obtain a copy of the License at:
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations
14+
* under the License.
15+
*/
16+
17+
package com.github.mauricio.async.db.mysql
18+
19+
import java.util.concurrent.TimeoutException
20+
import com.github.mauricio.async.db.Configuration
21+
import org.specs2.execute.{AsResult, Success, ResultExecution}
22+
import org.specs2.mutable.Specification
23+
import scala.concurrent.Await
24+
import scala.concurrent.duration._
25+
26+
class QueryTimeoutSpec extends Specification with ConnectionHelper {
27+
implicit def unitAsResult: AsResult[Unit] = new AsResult[Unit] {
28+
def asResult(r: =>Unit) =
29+
ResultExecution.execute(r)(_ => Success())
30+
}
31+
"Simple query with 1 nanosec timeout" in {
32+
withConfigurablePool(shortTimeoutConfiguration) {
33+
pool => {
34+
val connection = Await.result(pool.take, Duration(10,SECONDS))
35+
connection.isTimeouted === false
36+
connection.isConnected === true
37+
val queryResultFuture = connection.sendQuery("select sleep(1)")
38+
Await.result(queryResultFuture, Duration(10,SECONDS)) must throwA[TimeoutException]()
39+
connection.isTimeouted === true
40+
Await.ready(pool.giveBack(connection), Duration(10,SECONDS))
41+
pool.availables.count(_ == connection) === 0 // connection removed from pool
42+
// we do not know when the connection will be closed.
43+
}
44+
}
45+
}
46+
47+
"Simple query with 5 sec timeout" in {
48+
withConfigurablePool(longTimeoutConfiguration) {
49+
pool => {
50+
val connection = Await.result(pool.take, Duration(10,SECONDS))
51+
connection.isTimeouted === false
52+
connection.isConnected === true
53+
val queryResultFuture = connection.sendQuery("select sleep(1)")
54+
Await.result(queryResultFuture, Duration(10,SECONDS)).rows.get.size === 1
55+
connection.isTimeouted === false
56+
connection.isConnected === true
57+
Await.ready(pool.giveBack(connection), Duration(10,SECONDS))
58+
pool.availables.count(_ == connection) === 1 // connection returned to pool
59+
}
60+
}
61+
}
62+
63+
def shortTimeoutConfiguration = new Configuration(
64+
"mysql_async",
65+
"localhost",
66+
port = 3306,
67+
password = Some("root"),
68+
database = Some("mysql_async_tests"),
69+
queryTimeout = Duration(1,NANOSECONDS)
70+
)
71+
72+
def longTimeoutConfiguration = new Configuration(
73+
"mysql_async",
74+
"localhost",
75+
port = 3306,
76+
password = Some("root"),
77+
database = Some("mysql_async_tests"),
78+
queryTimeout = Duration(5,SECONDS)
79+
)
80+
}

postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnection.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import com.github.mauricio.async.db.QueryResult
2020
import com.github.mauricio.async.db.column.{ColumnEncoderRegistry, ColumnDecoderRegistry}
2121
import com.github.mauricio.async.db.exceptions.{InsufficientParametersException, ConnectionStillRunningQueryException}
2222
import com.github.mauricio.async.db.general.MutableResultSet
23+
import com.github.mauricio.async.db.pool.TimeoutScheduler
2324
import com.github.mauricio.async.db.postgresql.codec.{PostgreSQLConnectionDelegate, PostgreSQLConnectionHandler}
2425
import com.github.mauricio.async.db.postgresql.column.{PostgreSQLColumnDecoderRegistry, PostgreSQLColumnEncoderRegistry}
2526
import com.github.mauricio.async.db.postgresql.exceptions._
@@ -48,7 +49,8 @@ class PostgreSQLConnection
4849
executionContext : ExecutionContext = ExecutorServiceUtils.CachedExecutionContext
4950
)
5051
extends PostgreSQLConnectionDelegate
51-
with Connection {
52+
with Connection
53+
with TimeoutScheduler {
5254

5355
import PostgreSQLConnection._
5456

@@ -63,7 +65,7 @@ class PostgreSQLConnection
6365

6466
private final val currentCount = Counter.incrementAndGet()
6567
private final val preparedStatementsCounter = new AtomicInteger()
66-
private final implicit val internalExecutionContext = executionContext
68+
override implicit val internalPool = executionContext
6769

6870
private val parameterStatus = new scala.collection.mutable.HashMap[String, String]()
6971
private val parsedStatements = new scala.collection.mutable.HashMap[String, PreparedStatementHolder]()
@@ -91,6 +93,7 @@ class PostgreSQLConnection
9193
}
9294

9395
override def disconnect: Future[Connection] = this.connectionHandler.disconnect.map( c => this )
96+
override def onTimeout = disconnect
9497

9598
override def isConnected: Boolean = this.connectionHandler.isConnected
9699

@@ -103,7 +106,7 @@ class PostgreSQLConnection
103106
this.setQueryPromise(promise)
104107

105108
write(new QueryMessage(query))
106-
109+
addTimeout(promise,configuration.queryTimeout)
107110
promise.future
108111
}
109112

@@ -130,7 +133,7 @@ class PostgreSQLConnection
130133
holder.prepared = true
131134
new PreparedStatementOpeningMessage(holder.statementId, holder.realQuery, values, this.encoderRegistry)
132135
})
133-
136+
addTimeout(promise,configuration.queryTimeout)
134137
promise.future
135138
}
136139

0 commit comments

Comments
 (0)