Skip to content

Commit adcdf75

Browse files
bytewatchsiddontang
authored andcommitted
fix Canal.startSyncer and position comparison (go-mysql-org#341)
1 parent a193cab commit adcdf75

File tree

9 files changed

+31
-38
lines changed

9 files changed

+31
-38
lines changed

canal/dump.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ func (c *Canal) dump() error {
159159
return errors.Trace(err)
160160
}
161161

162-
pos := mysql.Position{h.name, uint32(h.pos)}
162+
pos := mysql.Position{Name: h.name, Pos: uint32(h.pos)}
163163
c.master.Update(pos)
164164
c.eventHandler.OnPosSynced(pos, true)
165165
var startPos fmt.Stringer = pos

canal/master.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,5 +42,8 @@ func (m *masterInfo) GTIDSet() mysql.GTIDSet {
4242
m.RLock()
4343
defer m.RUnlock()
4444

45-
return m.gset
45+
if m.gset == nil {
46+
return nil
47+
}
48+
return m.gset.Clone()
4649
}

canal/sync.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,13 +229,13 @@ func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error {
229229
func (c *Canal) GetMasterPos() (mysql.Position, error) {
230230
rr, err := c.Execute("SHOW MASTER STATUS")
231231
if err != nil {
232-
return mysql.Position{"", 0}, errors.Trace(err)
232+
return mysql.Position{}, errors.Trace(err)
233233
}
234234

235235
name, _ := rr.GetString(0, 0)
236236
pos, _ := rr.GetInt(0, 1)
237237

238-
return mysql.Position{name, uint32(pos)}, nil
238+
return mysql.Position{Name: name, Pos: uint32(pos)}, nil
239239
}
240240

241241
func (c *Canal) GetMasterGTIDSet() (mysql.GTIDSet, error) {

cmd/go-mysqlbinlog/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func main() {
4747

4848
b := replication.NewBinlogSyncer(cfg)
4949

50-
pos := mysql.Position{*file, uint32(*pos)}
50+
pos := mysql.Position{Name: *file, Pos: uint32(*pos)}
5151
if len(*backupPath) > 0 {
5252
// Backup will always use RawMode.
5353
err := b.StartBackup(*backupPath, pos, 0)

failover/mariadb_gtid_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (h *MariadbGTIDHandler) WaitRelayLogDone(s *Server) error {
118118
fname, _ := r.GetStringByName(0, "Master_Log_File")
119119
pos, _ := r.GetIntByName(0, "Read_Master_Log_Pos")
120120

121-
return s.MasterPosWait(Position{fname, uint32(pos)}, 0)
121+
return s.MasterPosWait(Position{Name: fname, Pos: uint32(pos)}, 0)
122122
}
123123

124124
func (h *MariadbGTIDHandler) WaitCatchMaster(s *Server, m *Server) error {

failover/server.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ func (s *Server) FetchSlaveReadPos() (Position, error) {
152152
fname, _ := r.GetStringByName(0, "Master_Log_File")
153153
pos, _ := r.GetIntByName(0, "Read_Master_Log_Pos")
154154

155-
return Position{fname, uint32(pos)}, nil
155+
return Position{Name: fname, Pos: uint32(pos)}, nil
156156
}
157157

158158
// Get current executed binlog filename and position from master
@@ -165,7 +165,7 @@ func (s *Server) FetchSlaveExecutePos() (Position, error) {
165165
fname, _ := r.GetStringByName(0, "Relay_Master_Log_File")
166166
pos, _ := r.GetIntByName(0, "Exec_Master_Log_Pos")
167167

168-
return Position{fname, uint32(pos)}, nil
168+
return Position{Name: fname, Pos: uint32(pos)}, nil
169169
}
170170

171171
func (s *Server) MasterPosWait(pos Position, timeout int) error {

replication/backup_test.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,41 @@
11
package replication
22

33
import (
4-
"sync"
5-
"os"
6-
"time"
7-
. "github.com/pingcap/check"
8-
"github.com/siddontang/go-mysql/mysql"
94
"context"
105
"github.com/juju/errors"
6+
. "github.com/pingcap/check"
7+
"github.com/siddontang/go-mysql/mysql"
8+
"os"
9+
"sync"
10+
"time"
1111
)
1212

1313
func (t *testSyncerSuite) TestStartBackupEndInGivenTime(c *C) {
1414
t.setupTest(c, mysql.MySQLFlavor)
15-
15+
1616
t.testExecute(c, "RESET MASTER")
17-
17+
1818
var wg sync.WaitGroup
1919
wg.Add(1)
2020
defer wg.Wait()
21-
21+
2222
go func() {
2323
defer wg.Done()
24-
24+
2525
t.testSync(c, nil)
26-
26+
2727
t.testExecute(c, "FLUSH LOGS")
28-
28+
2929
t.testSync(c, nil)
3030
}()
31-
31+
3232
os.RemoveAll("./var")
3333
timeout := 2 * time.Second
34-
34+
3535
done := make(chan bool)
36-
36+
3737
go func() {
38-
err := t.b.StartBackup("./var", mysql.Position{"", uint32(0)}, timeout)
38+
err := t.b.StartBackup("./var", mysql.Position{Name: "", Pos: uint32(0)}, timeout)
3939
c.Assert(err, IsNil)
4040
done <- true
4141
}()

replication/binlogsyncer.go

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ func (b *BinlogSyncer) writeBinlogDumpCommand(p Position) error {
416416
}
417417

418418
func (b *BinlogSyncer) writeBinlogDumpMysqlGTIDCommand(gset GTIDSet) error {
419-
p := Position{"", 4}
419+
p := Position{Name: "", Pos: 4}
420420
gtidData := gset.Encode()
421421

422422
b.c.ResetSequence()
@@ -472,7 +472,7 @@ func (b *BinlogSyncer) writeBinlogDumpMariadbGTIDCommand(gset GTIDSet) error {
472472
}
473473

474474
// Since we use @slave_connect_state, the file and position here are ignored.
475-
return b.writeBinlogDumpCommand(Position{"", 0})
475+
return b.writeBinlogDumpCommand(Position{Name: "", Pos: 0})
476476
}
477477

478478
// localHostname returns the hostname that register slave would register as.
@@ -759,20 +759,10 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
759759
}
760760

761761
func (b *BinlogSyncer) getGtidSet() GTIDSet {
762-
var gtidSet GTIDSet
763-
764762
if b.gset == nil {
765763
return nil
766764
}
767-
768-
switch b.cfg.Flavor {
769-
case MariaDBFlavor:
770-
gtidSet, _ = ParseGTIDSet(MariaDBFlavor, b.gset.String())
771-
default:
772-
gtidSet, _ = ParseGTIDSet(MySQLFlavor, b.gset.String())
773-
}
774-
775-
return gtidSet
765+
return b.gset.Clone()
776766
}
777767

778768
// LastConnectionID returns last connectionID.

replication/replication_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ func (t *testSyncerSuite) testPositionSync(c *C) {
304304
binFile, _ := r.GetString(0, 0)
305305
binPos, _ := r.GetInt(0, 1)
306306

307-
s, err := t.b.StartSync(mysql.Position{binFile, uint32(binPos)})
307+
s, err := t.b.StartSync(mysql.Position{Name: binFile, Pos: uint32(binPos)})
308308
c.Assert(err, IsNil)
309309

310310
// Test re-sync.
@@ -400,7 +400,7 @@ func (t *testSyncerSuite) TestMysqlBinlogCodec(c *C) {
400400

401401
os.RemoveAll(binlogDir)
402402

403-
err := t.b.StartBackup(binlogDir, mysql.Position{"", uint32(0)}, 2*time.Second)
403+
err := t.b.StartBackup(binlogDir, mysql.Position{Name: "", Pos: uint32(0)}, 2*time.Second)
404404
c.Assert(err, IsNil)
405405

406406
p := NewBinlogParser()

0 commit comments

Comments
 (0)