Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
move gtid update after all event handlers are executed
It is a common use case that we have a separate goroutine to update gtid set. We have to ensure that gtid saver sees updated gtid only after all the tasks are done in event handlers.
  • Loading branch information
taylorchu committed Apr 20, 2019
commit 47718bfe61d80f93e6869bc9eb28eefddd07dde3
14 changes: 7 additions & 7 deletions canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/satori/go.uuid"
uuid "github.com/satori/go.uuid"
"github.com/siddontang/go-log/log"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
Expand Down Expand Up @@ -92,14 +92,14 @@ func (c *Canal) runSyncBinlog() error {
}
continue
case *replication.XIDEvent:
if e.GSet != nil {
c.master.UpdateGTIDSet(e.GSet)
}
savePos = true
// try to save the position later
if err := c.eventHandler.OnXID(pos); err != nil {
return errors.Trace(err)
}
if e.GSet != nil {
c.master.UpdateGTIDSet(e.GSet)
}
case *replication.MariadbGTIDEvent:
// try to save the GTID later
gtid, err := mysql.ParseMariadbGTIDSet(e.GTID.String())
Expand All @@ -119,9 +119,6 @@ func (c *Canal) runSyncBinlog() error {
return errors.Trace(err)
}
case *replication.QueryEvent:
if e.GSet != nil {
c.master.UpdateGTIDSet(e.GSet)
}
var (
mb [][]byte
db []byte
Expand Down Expand Up @@ -159,6 +156,9 @@ func (c *Canal) runSyncBinlog() error {
if err = c.eventHandler.OnDDL(pos, e); err != nil {
return errors.Trace(err)
}
if e.GSet != nil {
c.master.UpdateGTIDSet(e.GSet)
}
default:
continue
}
Expand Down