Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Improve the parameters and pos saving logic of the OnQueryEvent function
Signed-off-by: axfor <aixiaoxiang2009@hotmail.com>
  • Loading branch information
axfor committed Jun 25, 2023
commit 3c524227ca41eb7c82b89a7c687e7023212beca1
10 changes: 5 additions & 5 deletions canal/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type EventHandler interface {
// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error
// OnQueryEvent is query event include (create user, drop user, create index, etc.)
// Note: the OnQueryEvent has lower priority than OnDDL event
OnQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent, stmt ast.StmtNode, pos *Position) error
// Note: the OnQueryEvent has lower priority than OnDDL even
OnQueryEvent(header *replication.EventHeader, stmt ast.StmtNode, pos mysql.Position, e *replication.QueryEvent) (bool, bool, error)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use named return in this interface, or explain what's the meaning of two bools in comments. So users of this library will understand when they implement this interface.

String() string
}

Expand All @@ -43,9 +43,9 @@ func (h *DummyEventHandler) OnGTID(*replication.EventHeader, mysql.GTIDSet) erro
func (h *DummyEventHandler) OnPosSynced(*replication.EventHeader, mysql.Position, mysql.GTIDSet, bool) error {
return nil
}

func (h *DummyEventHandler) OnQueryEvent(*replication.BinlogEvent, *replication.QueryEvent, ast.StmtNode, *Position) error {
return nil
func (h *DummyEventHandler) OnQueryEvent(*replication.EventHeader, ast.StmtNode, mysql.Position, *replication.QueryEvent) (bool, bool, error) {
savePos, force := false, false
return savePos, force, nil
}
func (h *DummyEventHandler) String() string { return "DummyEventHandler" }

Expand Down
34 changes: 11 additions & 23 deletions canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,29 +141,22 @@ func (c *Canal) runSyncBinlog() error {
c.cfg.Logger.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
continue
}
posInfo := &Position{
Position: pos,
SavePos: savePos,
Force: force,
}
for _, stmt := range stmts {
nodes := parseDDLStmt(stmt)
if len(nodes) > 0 {
posInfo.SavePos = true
posInfo.Force = true
err := c.handleDDLEvent(ev, e, nodes, posInfo)
savePos = true
force = true
err := c.handleDDLEvent(ev, e, nodes, pos)
if err != nil {
c.cfg.Logger.Errorf("handle ddl event err %v", err)
}
} else {
err := c.handleQueryEvent(ev, e, stmt, posInfo)
savePos, force, err = c.handleQueryEvent(ev.Header, stmt, pos, e)
if err != nil {
c.cfg.Logger.Errorf("handle query event err %v", err)
}
}
}
savePos = posInfo.SavePos
force = posInfo.Force
if savePos && e.GSet != nil {
c.master.UpdateGTIDSet(e.GSet)
}
Expand Down Expand Up @@ -342,14 +335,8 @@ func (c *Canal) CatchMasterPos(timeout time.Duration) error {
return c.WaitUntilPos(pos, timeout)
}

type Position struct {
mysql.Position
SavePos bool
Force bool
}

// handleDDLEvent is handle DDL event
func (c *Canal) handleDDLEvent(ev *replication.BinlogEvent, e *replication.QueryEvent, nodes []*node, pos *Position) error {
func (c *Canal) handleDDLEvent(ev *replication.BinlogEvent, e *replication.QueryEvent, nodes []*node, pos mysql.Position) error {
for _, node := range nodes {
if node.db == "" {
node.db = string(e.Schema)
Expand All @@ -360,7 +347,7 @@ func (c *Canal) handleDDLEvent(ev *replication.BinlogEvent, e *replication.Query
}
if len(nodes) > 0 {
// Now we only handle Table Changed DDL, maybe we will support more later.
if err := c.eventHandler.OnDDL(ev.Header, pos.Position, e); err != nil {
if err := c.eventHandler.OnDDL(ev.Header, pos, e); err != nil {
return errors.Trace(err)
}
}
Expand All @@ -369,9 +356,10 @@ func (c *Canal) handleDDLEvent(ev *replication.BinlogEvent, e *replication.Query

// handleQueryEvent is handle some common query events (e.g., DDL,CREATE or DROP USER,GRANT)
// DDL event use handleDDLEvent, others use the handleQueryEvent
func (c *Canal) handleQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent, stmt ast.StmtNode, pos *Position) error {
if err := c.eventHandler.OnQueryEvent(ev, e, stmt, pos); err != nil {
return errors.Trace(err)
func (c *Canal) handleQueryEvent(header *replication.EventHeader, stmt ast.StmtNode, pos mysql.Position, e *replication.QueryEvent) (bool, bool, error) {
savePos, force, err := c.eventHandler.OnQueryEvent(header, stmt, pos, e)
if err != nil {
return savePos, force, errors.Trace(err)
}
return nil
return savePos, force, nil
}