Skip to content

Commit b96b48f

Browse files
committed
Separate protocol encoder for MySQL SendLongData
1 parent 4ff4189 commit b96b48f

File tree

5 files changed

+96
-83
lines changed

5 files changed

+96
-83
lines changed

mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/codec/MySQLConnectionHandler.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class MySQLConnectionHandler(
5252
private final val connectionPromise = Promise[MySQLConnectionHandler]
5353
private final val decoder = new MySQLFrameDecoder(configuration.charset, connectionId)
5454
private final val encoder = new MySQLOneToOneEncoder(configuration.charset, charsetMapper)
55+
private final val sendLongDataEncoder = new SendLongDataEncoder(configuration.charset)
5556
private final val currentParameters = new ArrayBuffer[ColumnDefinitionMessage]()
5657
private final val currentColumns = new ArrayBuffer[ColumnDefinitionMessage]()
5758
private final val parsedStatements = new HashMap[String,PreparedStatementHolder]()
@@ -70,6 +71,7 @@ class MySQLConnectionHandler(
7071
channel.pipeline.addLast(
7172
decoder,
7273
encoder,
74+
sendLongDataEncoder,
7375
MySQLConnectionHandler.this)
7476
}
7577

@@ -252,7 +254,7 @@ class MySQLConnectionHandler(
252254
case Some(v) => v
253255
case _ => maybeValue
254256
}
255-
encoder.isLong(value)
257+
sendLongDataEncoder.isLong(value)
256258
}
257259
}
258260

mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/codec/MySQLOneToOneEncoder.scala

Lines changed: 33 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -32,69 +32,57 @@ object MySQLOneToOneEncoder {
3232
val log = Log.get[MySQLOneToOneEncoder]
3333
}
3434

35-
class MySQLOneToOneEncoder(charset: Charset, charsetMapper: CharsetMapper) extends MessageToMessageEncoder[Any] {
35+
class MySQLOneToOneEncoder(charset: Charset, charsetMapper: CharsetMapper)
36+
extends MessageToMessageEncoder[ClientMessage](classOf[ClientMessage]) {
3637

3738
import MySQLOneToOneEncoder.log
3839

3940
private final val handshakeResponseEncoder = new HandshakeResponseEncoder(charset, charsetMapper)
4041
private final val queryEncoder = new QueryMessageEncoder(charset)
4142
private final val rowEncoder = new BinaryRowEncoder(charset)
4243
private final val prepareEncoder = new PreparedStatementPrepareEncoder(charset)
43-
private final val sendLongDataEncoder = new SendLongDataEncoder(rowEncoder)
4444
private final val executeEncoder = new PreparedStatementExecuteEncoder(rowEncoder)
4545
private final val authenticationSwitchEncoder = new AuthenticationSwitchResponseEncoder(charset)
4646

4747
private var sequence = 1
4848

49-
def isLong(value: Any): Boolean = rowEncoder.encoderFor(value).isLong(value)
50-
51-
def encode(ctx: ChannelHandlerContext, msg: Any, out: java.util.List[Object]): Unit = {
52-
53-
msg match {
54-
case message: ClientMessage => {
55-
val encoder = (message.kind: @switch) match {
56-
case ClientMessage.ClientProtocolVersion => this.handshakeResponseEncoder
57-
case ClientMessage.Quit => {
58-
sequence = 0
59-
QuitMessageEncoder
60-
}
61-
case ClientMessage.Query => {
62-
sequence = 0
63-
this.queryEncoder
64-
}
65-
case ClientMessage.PreparedStatementExecute => {
66-
sequence = 0
67-
this.executeEncoder
68-
}
69-
case ClientMessage.PreparedStatementPrepare => {
70-
sequence = 0
71-
this.prepareEncoder
72-
}
73-
case ClientMessage.PreparedStatementSendLongData => {
74-
sequence = 0
75-
this.sendLongDataEncoder
76-
}
77-
case ClientMessage.AuthSwitchResponse => {
78-
sequence += 1
79-
this.authenticationSwitchEncoder
80-
}
81-
case _ => throw new EncoderNotAvailableException(message)
82-
}
83-
84-
val result = encoder.encode(message)
49+
def encode(ctx: ChannelHandlerContext, message: ClientMessage, out: java.util.List[Object]): Unit = {
50+
val encoder = (message.kind: @switch) match {
51+
case ClientMessage.ClientProtocolVersion => this.handshakeResponseEncoder
52+
case ClientMessage.Quit => {
53+
sequence = 0
54+
QuitMessageEncoder
55+
}
56+
case ClientMessage.Query => {
57+
sequence = 0
58+
this.queryEncoder
59+
}
60+
case ClientMessage.PreparedStatementExecute => {
61+
sequence = 0
62+
this.executeEncoder
63+
}
64+
case ClientMessage.PreparedStatementPrepare => {
65+
sequence = 0
66+
this.prepareEncoder
67+
}
68+
case ClientMessage.AuthSwitchResponse => {
69+
sequence += 1
70+
this.authenticationSwitchEncoder
71+
}
72+
case _ => throw new EncoderNotAvailableException(message)
73+
}
8574

86-
ByteBufferUtils.writePacketLength(result, sequence)
75+
val result: ByteBuf = encoder.encode(message)
8776

88-
sequence += 1
77+
ByteBufferUtils.writePacketLength(result, sequence)
8978

90-
if ( log.isTraceEnabled ) {
91-
log.trace(s"Writing message ${message.getClass.getName} - \n${BufferDumper.dumpAsHex(result)}")
92-
}
79+
sequence += 1
9380

94-
out.add(result)
95-
}
81+
if ( log.isTraceEnabled ) {
82+
log.trace(s"Writing message ${message.getClass.getName} - \n${BufferDumper.dumpAsHex(result)}")
9683
}
9784

85+
out.add(result)
9886
}
9987

10088
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package com.github.mauricio.async.db.mysql.codec
2+
3+
import java.nio.charset.Charset
4+
5+
import com.github.mauricio.async.db.mysql.binary.BinaryRowEncoder
6+
import com.github.mauricio.async.db.mysql.message.client.{ClientMessage, SendLongDataMessage}
7+
import com.github.mauricio.async.db.util.{Log, ByteBufferUtils}
8+
import io.netty.buffer.{Unpooled, ByteBuf}
9+
import io.netty.channel.ChannelHandlerContext
10+
import io.netty.handler.codec.MessageToMessageEncoder
11+
12+
object SendLongDataEncoder {
13+
val log = Log.get[SendLongDataEncoder]
14+
}
15+
16+
class SendLongDataEncoder(charset: Charset)
17+
extends MessageToMessageEncoder[SendLongDataMessage](classOf[SendLongDataMessage]) {
18+
19+
import com.github.mauricio.async.db.mysql.codec.SendLongDataEncoder.log
20+
21+
private final val rowEncoder = new BinaryRowEncoder(charset)
22+
23+
def isLong(value: Any): Boolean = rowEncoder.encoderFor(value).isLong(value)
24+
25+
def encode(ctx: ChannelHandlerContext, message: SendLongDataMessage, out: java.util.List[Object]): Unit = {
26+
val result: ByteBuf = encode(message)
27+
28+
ByteBufferUtils.writePacketLength(result, 0)
29+
30+
if ( log.isTraceEnabled ) {
31+
log.trace(s"Writing message ${message.toString}")
32+
}
33+
34+
out.add(result)
35+
}
36+
37+
private def encode(message: SendLongDataMessage): ByteBuf = {
38+
val buffer = ByteBufferUtils.packetBuffer()
39+
buffer.writeByte(ClientMessage.PreparedStatementSendLongData)
40+
buffer.writeBytes(message.statementId)
41+
buffer.writeShort(message.paramId)
42+
43+
Unpooled.wrappedBuffer(buffer, encodeValue(message.value))
44+
}
45+
46+
private def encodeValue(maybeValue: Any) : ByteBuf = {
47+
if ( maybeValue == null || maybeValue == None ) {
48+
throw new UnsupportedOperationException("Cannot encode NULL as long value")
49+
} else {
50+
val value = maybeValue match {
51+
case Some(v) => v
52+
case _ => maybeValue
53+
}
54+
val encoder = rowEncoder.encoderFor(value)
55+
encoder.encodeLong(value)
56+
}
57+
}
58+
59+
}

mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/encoder/SendLongDataEncoder.scala

Lines changed: 0 additions & 34 deletions
This file was deleted.

mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/message/client/SendLongDataMessage.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package com.github.mauricio.async.db.mysql.message.client
33
case class SendLongDataMessage (
44
statementId : Array[Byte],
55
value : Any,
6-
paramId : Int )
7-
extends ClientMessage( ClientMessage.PreparedStatementSendLongData ) {
8-
6+
paramId : Int ) {
97
override def toString = "SendLongDataMessage(statementId=" + statementId + ",paramId=" + paramId + ",value.getClass=" + value.getClass.getName +")"
108
}

0 commit comments

Comments
 (0)