|
1 | 1 | package replication |
2 | 2 |
|
3 | 3 | import ( |
| 4 | +"bytes" |
| 5 | +"compress/zlib" |
4 | 6 | "encoding/binary" |
5 | 7 | "encoding/hex" |
6 | 8 | "fmt" |
@@ -830,6 +832,9 @@ type RowsEvent struct { |
830 | 832 | tables map[uint64]*TableMapEvent |
831 | 833 | needBitmap2 bool |
832 | 834 |
|
| 835 | +// for mariadb *_COMPRESSED_EVENT_V1 |
| 836 | +compressed bool |
| 837 | + |
833 | 838 | eventType EventType |
834 | 839 |
|
835 | 840 | Table *TableMapEvent |
@@ -970,9 +975,9 @@ func (e *RowsEvent) DecodeData(pos int, data []byte) (err2 error) { |
970 | 975 |
|
971 | 976 | var rowImageType EnumRowImageType |
972 | 977 | switch e.eventType { |
973 | | -case WRITE_ROWS_EVENTv0, WRITE_ROWS_EVENTv1, WRITE_ROWS_EVENTv2: |
| 978 | +case WRITE_ROWS_EVENTv0, WRITE_ROWS_EVENTv1, WRITE_ROWS_EVENTv2, MARIADB_WRITE_ROWS_COMPRESSED_EVENT_V1: |
974 | 979 | rowImageType = EnumRowImageTypeWriteAI |
975 | | -case DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2: |
| 980 | +case DELETE_ROWS_EVENTv0, DELETE_ROWS_EVENTv1, DELETE_ROWS_EVENTv2, MARIADB_DELETE_ROWS_COMPRESSED_EVENT_V1: |
976 | 981 | rowImageType = EnumRowImageTypeDeleteBI |
977 | 982 | default: |
978 | 983 | rowImageType = EnumRowImageTypeUpdateBI |
@@ -1002,6 +1007,13 @@ func (e *RowsEvent) Decode(data []byte) error { |
1002 | 1007 | if err != nil { |
1003 | 1008 | return err |
1004 | 1009 | } |
| 1010 | +if e.compressed { |
| 1011 | +uncompressedData, err := e.decompressData(pos, data) |
| 1012 | +if err != nil { |
| 1013 | +return err |
| 1014 | +} |
| 1015 | +return e.DecodeData(0, uncompressedData) |
| 1016 | +} |
1005 | 1017 | return e.DecodeData(pos, data) |
1006 | 1018 | } |
1007 | 1019 |
|
@@ -1103,6 +1115,28 @@ func (e *RowsEvent) parseFracTime(t interface{}) interface{} { |
1103 | 1115 | return v.Time |
1104 | 1116 | } |
1105 | 1117 |
|
| 1118 | +func (e *RowsEvent) decompressData(pos int, data []byte) ([]byte, error) { |
| 1119 | +// algorithm always 0=zlib |
| 1120 | +// algorithm := (data[pos] & 0x07) >> 4 |
| 1121 | +headerSize := int(data[pos] & 0x07) |
| 1122 | +pos++ |
| 1123 | + |
| 1124 | +uncompressedDataSize := BFixedLengthInt(data[pos : pos+headerSize]) |
| 1125 | + |
| 1126 | +pos += headerSize |
| 1127 | +uncompressedData := make([]byte, uncompressedDataSize) |
| 1128 | +r, err := zlib.NewReader(bytes.NewReader(data[pos:])) |
| 1129 | +if err != nil { |
| 1130 | +return nil, err |
| 1131 | +} |
| 1132 | +defer r.Close() |
| 1133 | +_, err = io.ReadFull(r, uncompressedData) |
| 1134 | +if err != nil { |
| 1135 | +return nil, err |
| 1136 | +} |
| 1137 | +return uncompressedData, nil |
| 1138 | +} |
| 1139 | + |
1106 | 1140 | // see mysql sql/log_event.cc log_event_print_value |
1107 | 1141 | func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16, isPartial bool) (v interface{}, n int, err error) { |
1108 | 1142 | var length = 0 |
|
0 commit comments