Skip to content

Commit 1bfe8cd

Browse files
committed
Add support for MARIADB_QUERY_COMPRESSED_EVENT
1 parent dd14348 commit 1bfe8cd

File tree

5 files changed

+41
-26
lines changed

5 files changed

+41
-26
lines changed

mysql/util.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package mysql
22

33
import (
4+
"bytes"
5+
"compress/zlib"
46
"crypto/rand"
57
"crypto/rsa"
68
"crypto/sha1"
@@ -92,6 +94,24 @@ func EncryptPassword(password string, seed []byte, pub *rsa.PublicKey) ([]byte,
9294
return rsa.EncryptOAEP(sha1v, rand.Reader, pub, plain, nil)
9395
}
9496

97+
func DecompressMariadbData(data []byte) ([]byte, error) {
98+
// algorithm always 0=zlib
99+
// algorithm := (data[pos] & 0x07) >> 4
100+
headerSize := int(data[0] & 0x07)
101+
uncompressedDataSize := BFixedLengthInt(data[1 : 1+headerSize])
102+
uncompressedData := make([]byte, uncompressedDataSize)
103+
r, err := zlib.NewReader(bytes.NewReader(data[1+headerSize:]))
104+
if err != nil {
105+
return nil, err
106+
}
107+
defer r.Close()
108+
_, err = io.ReadFull(r, uncompressedData)
109+
if err != nil {
110+
return nil, err
111+
}
112+
return uncompressedData, nil
113+
}
114+
95115
// AppendLengthEncodedInteger: encodes a uint64 value and appends it to the given bytes slice
96116
func AppendLengthEncodedInteger(b []byte, n uint64) []byte {
97117
switch {

replication/const.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,10 @@ func (e EventType) String() string {
202202
return "TransactionPayloadEvent"
203203
case HEARTBEAT_LOG_EVENT_V2:
204204
return "HeartbeatLogEventV2"
205+
case MARIADB_START_ENCRYPTION_EVENT:
206+
return "MariadbStartEncryptionEvent"
207+
case MARIADB_QUERY_COMPRESSED_EVENT:
208+
return "MariadbQueryCompressedEvent"
205209
case MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1:
206210
return "MariadbWriteRowsCompressedEventV1"
207211
case MARIADB_UPDATE_ROWS_COMPRESSED_EVENT_V1:

replication/event.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,9 @@ type QueryEvent struct {
297297
Schema []byte
298298
Query []byte
299299

300+
// for mariadb QUERY_COMPRESSED_EVENT
301+
compressed bool
302+
300303
// in fact QueryEvent dosen't have the GTIDSet information, just for beneficial to use
301304
GSet GTIDSet
302305
}
@@ -328,7 +331,15 @@ func (e *QueryEvent) Decode(data []byte) error {
328331
//skip 0x00
329332
pos++
330333

331-
e.Query = data[pos:]
334+
if e.compressed {
335+
decompressedQuery, err := DecompressMariadbData(data[pos:])
336+
if err != nil {
337+
return err
338+
}
339+
e.Query = decompressedQuery
340+
} else {
341+
e.Query = data[pos:]
342+
}
332343
return nil
333344
}
334345

replication/parser.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,10 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte, rawData []byte) (
249249
switch h.EventType {
250250
case QUERY_EVENT:
251251
e = &QueryEvent{}
252+
case MARIADB_QUERY_COMPRESSED_EVENT:
253+
e = &QueryEvent{
254+
compressed: true,
255+
}
252256
case XID_EVENT:
253257
e = &XIDEvent{}
254258
case TABLE_MAP_EVENT:

replication/row_event.go

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package replication
22

33
import (
4-
"bytes"
5-
"compress/zlib"
64
"encoding/binary"
75
"encoding/hex"
86
"fmt"
@@ -1008,7 +1006,7 @@ func (e *RowsEvent) Decode(data []byte) error {
10081006
return err
10091007
}
10101008
if e.compressed {
1011-
uncompressedData, err := e.decompressData(pos, data)
1009+
uncompressedData, err := DecompressMariadbData(data[pos:])
10121010
if err != nil {
10131011
return err
10141012
}
@@ -1115,28 +1113,6 @@ func (e *RowsEvent) parseFracTime(t interface{}) interface{} {
11151113
return v.Time
11161114
}
11171115

1118-
func (e *RowsEvent) decompressData(pos int, data []byte) ([]byte, error) {
1119-
// algorithm always 0=zlib
1120-
// algorithm := (data[pos] & 0x07) >> 4
1121-
headerSize := int(data[pos] & 0x07)
1122-
pos++
1123-
1124-
uncompressedDataSize := BFixedLengthInt(data[pos : pos+headerSize])
1125-
1126-
pos += headerSize
1127-
uncompressedData := make([]byte, uncompressedDataSize)
1128-
r, err := zlib.NewReader(bytes.NewReader(data[pos:]))
1129-
if err != nil {
1130-
return nil, err
1131-
}
1132-
defer r.Close()
1133-
_, err = io.ReadFull(r, uncompressedData)
1134-
if err != nil {
1135-
return nil, err
1136-
}
1137-
return uncompressedData, nil
1138-
}
1139-
11401116
// see mysql sql/log_event.cc log_event_print_value
11411117
func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16, isPartial bool) (v interface{}, n int, err error) {
11421118
var length = 0

0 commit comments

Comments
 (0)