Skip to content

Commit f4ae78f

Browse files
committed
Refactor binary encoding
1 parent 8657281 commit f4ae78f

File tree

6 files changed

+80
-83
lines changed

6 files changed

+80
-83
lines changed

mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/binary/BinaryRowEncoder.scala

Lines changed: 2 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,13 @@
1616

1717
package com.github.mauricio.async.db.mysql.binary
1818

19-
import io.netty.buffer.{Unpooled, ByteBuf}
19+
import io.netty.buffer.ByteBuf
2020
import java.nio.ByteBuffer
2121
import java.nio.charset.Charset
2222
import com.github.mauricio.async.db.mysql.binary.encoder._
2323
import com.github.mauricio.async.db.util._
2424
import org.joda.time._
2525
import scala.Some
26-
import com.github.mauricio.async.db.mysql.column.ColumnTypes
27-
import java.nio.ByteOrder
2826

2927
object BinaryRowEncoder {
3028
final val log = Log.get[BinaryRowEncoder]
@@ -66,75 +64,7 @@ class BinaryRowEncoder( charset : Charset ) {
6664
classOf[java.lang.Boolean] -> BooleanEncoder
6765
)
6866

69-
def encode( values : Seq[Any] ) : ByteBuf = {
70-
71-
val nullBitsCount = (values.size + 7) / 8
72-
val nullBits = new Array[Byte](nullBitsCount)
73-
val bitMapBuffer = ByteBufferUtils.mysqlBuffer(1 + nullBitsCount)
74-
val parameterTypesBuffer = ByteBufferUtils.mysqlBuffer(values.size * 2)
75-
val parameterValuesBuffer = ByteBufferUtils.mysqlBuffer()
76-
77-
78-
var index = 0
79-
80-
while ( index < values.length ) {
81-
val value = values(index)
82-
if ( value == null || value == None ) {
83-
nullBits(index / 8) = (nullBits(index / 8) | (1 << (index & 7))).asInstanceOf[Byte]
84-
parameterTypesBuffer.writeShort(ColumnTypes.FIELD_TYPE_NULL)
85-
} else {
86-
value match {
87-
case Some(v) => encode(parameterTypesBuffer, parameterValuesBuffer, v)
88-
case _ => encode(parameterTypesBuffer, parameterValuesBuffer, value)
89-
}
90-
}
91-
index += 1
92-
}
93-
94-
bitMapBuffer.writeBytes(nullBits)
95-
if ( values.size > 0 ) {
96-
bitMapBuffer.writeByte(1)
97-
} else {
98-
bitMapBuffer.writeByte(0)
99-
}
100-
101-
Unpooled.wrappedBuffer( bitMapBuffer, parameterTypesBuffer, parameterValuesBuffer )
102-
}
103-
104-
private def encode(parameterTypesBuffer: ByteBuf, parameterValuesBuffer: ByteBuf, value: Any): Unit = {
105-
val encoder = encoderFor(value)
106-
parameterTypesBuffer.writeShort(encoder.encodesTo)
107-
if (!encoder.isLong(value))
108-
encoder.encode(value, parameterValuesBuffer)
109-
}
110-
111-
def isLong( maybeValue : Any ) : Boolean = {
112-
if ( maybeValue == null || maybeValue == None ) {
113-
false
114-
} else {
115-
val value = maybeValue match {
116-
case Some(v) => v
117-
case _ => maybeValue
118-
}
119-
val encoder = encoderFor(value)
120-
encoder.isLong(value)
121-
}
122-
}
123-
124-
def encodeLong( maybeValue: Any ) : ByteBuf = {
125-
if ( maybeValue == null || maybeValue == None ) {
126-
throw new UnsupportedOperationException("Cannot encode NULL as long value")
127-
} else {
128-
val value = maybeValue match {
129-
case Some(v) => v
130-
case _ => maybeValue
131-
}
132-
val encoder = encoderFor(value)
133-
encoder.encodeLong(value)
134-
}
135-
}
136-
137-
private def encoderFor( v : Any ) : BinaryEncoder = {
67+
def encoderFor( v : Any ) : BinaryEncoder = {
13868

13969
this.encoders.get(v.getClass) match {
14070
case Some(encoder) => encoder

mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/codec/MySQLConnectionHandler.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,13 +237,25 @@ class MySQLConnectionHandler(
237237
this.currentParameters.clear()
238238

239239
values.zipWithIndex.foreach { case (value, index) =>
240-
if (encoder.rowEncoder.isLong(value))
241-
writeAndHandleError(new SendLongDataMessage(statementId, value, index))
240+
if (isLong(value))
241+
writeAndHandleError(new SendLongDataMessage( statementId, value, index ))
242242
}
243243

244244
writeAndHandleError(new PreparedStatementExecuteMessage( statementId, values, parameters ))
245245
}
246246

247+
private def isLong( maybeValue : Any ) : Boolean = {
248+
if ( maybeValue == null || maybeValue == None ) {
249+
false
250+
} else {
251+
val value = maybeValue match {
252+
case Some(v) => v
253+
case _ => maybeValue
254+
}
255+
encoder.isLong(value)
256+
}
257+
}
258+
247259
private def onPreparedStatementPrepareResponse( message : PreparedStatementPrepareResponse ) {
248260
this.currentPreparedStatementHolder = new PreparedStatementHolder( this.currentPreparedStatement.statement, message)
249261
}

mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/codec/MySQLOneToOneEncoder.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,18 @@ class MySQLOneToOneEncoder(charset: Charset, charsetMapper: CharsetMapper) exten
3636

3737
import MySQLOneToOneEncoder.log
3838

39-
final val rowEncoder = new BinaryRowEncoder(charset)
40-
4139
private final val handshakeResponseEncoder = new HandshakeResponseEncoder(charset, charsetMapper)
4240
private final val queryEncoder = new QueryMessageEncoder(charset)
41+
private final val rowEncoder = new BinaryRowEncoder(charset)
4342
private final val prepareEncoder = new PreparedStatementPrepareEncoder(charset)
4443
private final val sendLongDataEncoder = new SendLongDataEncoder(rowEncoder)
4544
private final val executeEncoder = new PreparedStatementExecuteEncoder(rowEncoder)
4645
private final val authenticationSwitchEncoder = new AuthenticationSwitchResponseEncoder(charset)
4746

4847
private var sequence = 1
4948

49+
def isLong(value: Any): Boolean = rowEncoder.encoderFor(value).isLong(value)
50+
5051
def encode(ctx: ChannelHandlerContext, msg: Any, out: java.util.List[Object]): Unit = {
5152

5253
msg match {

mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/encoder/PreparedStatementExecuteEncoder.scala

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.github.mauricio.async.db.mysql.encoder
1818

1919
import io.netty.buffer.{ByteBuf, Unpooled}
20+
import com.github.mauricio.async.db.mysql.column.ColumnTypes
2021
import com.github.mauricio.async.db.mysql.binary.BinaryRowEncoder
2122
import com.github.mauricio.async.db.mysql.message.client.{PreparedStatementExecuteMessage, ClientMessage}
2223
import com.github.mauricio.async.db.util.ByteBufferUtils
@@ -35,10 +36,49 @@ class PreparedStatementExecuteEncoder( rowEncoder : BinaryRowEncoder ) extends M
3536
if ( m.parameters.isEmpty ) {
3637
buffer
3738
} else {
38-
val parametersBuffer = rowEncoder.encode(m.values)
39-
Unpooled.wrappedBuffer(buffer, parametersBuffer)
39+
Unpooled.wrappedBuffer(buffer, encode(m.values))
4040
}
4141

4242
}
4343

44+
private[encoder] def encode( values : Seq[Any] ) : ByteBuf = {
45+
val nullBitsCount = (values.size + 7) / 8
46+
val nullBits = new Array[Byte](nullBitsCount)
47+
val bitMapBuffer = ByteBufferUtils.mysqlBuffer(1 + nullBitsCount)
48+
val parameterTypesBuffer = ByteBufferUtils.mysqlBuffer(values.size * 2)
49+
val parameterValuesBuffer = ByteBufferUtils.mysqlBuffer()
50+
51+
var index = 0
52+
53+
while ( index < values.length ) {
54+
val value = values(index)
55+
if ( value == null || value == None ) {
56+
nullBits(index / 8) = (nullBits(index / 8) | (1 << (index & 7))).asInstanceOf[Byte]
57+
parameterTypesBuffer.writeShort(ColumnTypes.FIELD_TYPE_NULL)
58+
} else {
59+
value match {
60+
case Some(v) => encode(parameterTypesBuffer, parameterValuesBuffer, v)
61+
case _ => encode(parameterTypesBuffer, parameterValuesBuffer, value)
62+
}
63+
}
64+
index += 1
65+
}
66+
67+
bitMapBuffer.writeBytes(nullBits)
68+
if ( values.size > 0 ) {
69+
bitMapBuffer.writeByte(1)
70+
} else {
71+
bitMapBuffer.writeByte(0)
72+
}
73+
74+
Unpooled.wrappedBuffer( bitMapBuffer, parameterTypesBuffer, parameterValuesBuffer )
75+
}
76+
77+
private def encode(parameterTypesBuffer: ByteBuf, parameterValuesBuffer: ByteBuf, value: Any): Unit = {
78+
val encoder = rowEncoder.encoderFor(value)
79+
parameterTypesBuffer.writeShort(encoder.encodesTo)
80+
if (!encoder.isLong(value))
81+
encoder.encode(value, parameterValuesBuffer)
82+
}
83+
4484
}

mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/encoder/SendLongDataEncoder.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,20 @@ class SendLongDataEncoder( rowEncoder : BinaryRowEncoder ) extends MessageEncode
1515
buffer.writeBytes(m.statementId)
1616
buffer.writeShort(m.paramId)
1717

18-
Unpooled.wrappedBuffer(buffer, rowEncoder.encodeLong(m.value))
18+
Unpooled.wrappedBuffer(buffer, encodeLong(m.value))
19+
}
20+
21+
private def encodeLong( maybeValue: Any ) : ByteBuf = {
22+
if ( maybeValue == null || maybeValue == None ) {
23+
throw new UnsupportedOperationException("Cannot encode NULL as long value")
24+
} else {
25+
val value = maybeValue match {
26+
case Some(v) => v
27+
case _ => maybeValue
28+
}
29+
val encoder = rowEncoder.encoderFor(value)
30+
encoder.encodeLong(value)
31+
}
1932
}
2033

2134
}

mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/binary/BinaryRowEncoderSpec.scala renamed to mysql-async/src/test/scala/com/github/mauricio/async/db/mysql/encoder/PreparedStatementExecuteEncoderSpec.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@
1414
* under the License.
1515
*/
1616

17-
package com.github.mauricio.async.db.mysql.binary
17+
package com.github.mauricio.async.db.mysql.encoder
1818

19-
import org.specs2.mutable.Specification
19+
import com.github.mauricio.async.db.mysql.binary.BinaryRowEncoder
2020
import io.netty.util.CharsetUtil
21+
import org.specs2.mutable.Specification
2122

22-
class BinaryRowEncoderSpec extends Specification {
23+
class PreparedStatementExecuteEncoderSpec extends Specification {
2324

24-
val encoder = new BinaryRowEncoder(CharsetUtil.UTF_8)
25+
val encoder = new PreparedStatementExecuteEncoder(new BinaryRowEncoder(CharsetUtil.UTF_8))
2526

2627
"binary row encoder" should {
2728

0 commit comments

Comments
 (0)