Skip to content

Commit 3612c18

Browse files
authored
replication: support geometry (go-mysql-org#110)
1 parent d12e08a commit 3612c18

File tree

2 files changed

+52
-20
lines changed

2 files changed

+52
-20
lines changed

replication/replication_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,23 @@ func (t *testSyncerSuite) testSync(c *C, s *BinlogStreamer) {
211211
for _, query := range tbls {
212212
t.testExecute(c, query)
213213
}
214+
215+
// If MySQL supports JSON, it must supports GEOMETRY.
216+
t.testExecute(c, "DROP TABLE IF EXISTS test_geo")
217+
218+
str = `CREATE TABLE test_geo (g GEOMETRY)`
219+
_, err = t.c.Execute(str)
220+
c.Assert(err, IsNil)
221+
222+
tbls = []string{
223+
`INSERT INTO test_geo VALUES (POINT(1, 1))`,
224+
`INSERT INTO test_geo VALUES (LINESTRING(POINT(0,0), POINT(1,1), POINT(2,2)))`,
225+
// TODO: add more geometry tests
226+
}
227+
228+
for _, query := range tbls {
229+
t.testExecute(c, query)
230+
}
214231
}
215232

216233
t.wg.Wait()

replication/row_event.go

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -456,26 +456,7 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{
456456

457457
v, err = decodeBit(data, nbits, n)
458458
case MYSQL_TYPE_BLOB:
459-
switch meta {
460-
case 1:
461-
length = int(data[0])
462-
v = data[1 : 1+length]
463-
n = length + 1
464-
case 2:
465-
length = int(binary.LittleEndian.Uint16(data))
466-
v = data[2 : 2+length]
467-
n = length + 2
468-
case 3:
469-
length = int(FixedLengthInt(data[0:3]))
470-
v = data[3 : 3+length]
471-
n = length + 3
472-
case 4:
473-
length = int(binary.LittleEndian.Uint32(data))
474-
v = data[4 : 4+length]
475-
n = length + 4
476-
default:
477-
err = fmt.Errorf("invalid blob packlen = %d", meta)
478-
}
459+
v, n, err = decodeBlob(data, meta)
479460
case MYSQL_TYPE_VARCHAR,
480461
MYSQL_TYPE_VAR_STRING:
481462
length = int(meta)
@@ -487,6 +468,14 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{
487468
length = int(binary.LittleEndian.Uint32(data[0:]))
488469
n = length + int(meta)
489470
v, err = decodeJsonBinary(data[meta:n])
471+
case MYSQL_TYPE_GEOMETRY:
472+
// MySQL saves Geometry as Blob in binlog
473+
// Seem that the binary format is SRID (4 bytes) + WKB, outer can use
474+
// MySQL GeoFromWKB or others to create the geometry data.
475+
// Refer https://dev.mysql.com/doc/refman/5.7/en/gis-wkb-functions.html
476+
// I also find some go libs to handle WKB if possible
477+
// see https://github.com/twpayne/go-geom or https://github.com/paulmach/go.geo
478+
v, n, err = decodeBlob(data, meta)
490479
default:
491480
err = fmt.Errorf("unsupport type %d in binlog and don't know how to handle", tp)
492481
}
@@ -766,6 +755,32 @@ func decodeTime2(data []byte, dec uint16) (string, int, error) {
766755
return fmt.Sprintf("%s%02d:%02d:%02d", sign, hour, minute, second), n, nil
767756
}
768757

758+
func decodeBlob(data []byte, meta uint16) (v []byte, n int, err error) {
759+
var length int
760+
switch meta {
761+
case 1:
762+
length = int(data[0])
763+
v = data[1 : 1+length]
764+
n = length + 1
765+
case 2:
766+
length = int(binary.LittleEndian.Uint16(data))
767+
v = data[2 : 2+length]
768+
n = length + 2
769+
case 3:
770+
length = int(FixedLengthInt(data[0:3]))
771+
v = data[3 : 3+length]
772+
n = length + 3
773+
case 4:
774+
length = int(binary.LittleEndian.Uint32(data))
775+
v = data[4 : 4+length]
776+
n = length + 4
777+
default:
778+
err = fmt.Errorf("invalid blob packlen = %d", meta)
779+
}
780+
781+
return
782+
}
783+
769784
func (e *RowsEvent) Dump(w io.Writer) {
770785
fmt.Fprintf(w, "TableID: %d\n", e.TableID)
771786
fmt.Fprintf(w, "Flags: %d\n", e.Flags)

0 commit comments

Comments
 (0)