Skip to content

Commit c2fea88

Browse files
committed
Impove query event param position
Signed-off-by: axfor <aixiaoxiang2009@hotmail.com>
1 parent d9eb79e commit c2fea88

File tree

2 files changed

+27
-20
lines changed

2 files changed

+27
-20
lines changed

canal/handler.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ type EventHandler interface {
1919
// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
2020
OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error
2121
// OnQueryEvent is query event include(create user,drop user,create index event,etd.)
22-
OnQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent, stmt ast.StmtNode, pos mysql.Position, force bool) (bool, error)
22+
OnQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent, stmt ast.StmtNode, pos *Position) error
2323
String() string
2424
}
2525

@@ -41,9 +41,8 @@ func (h *DummyEventHandler) OnGTID(*replication.EventHeader, mysql.GTIDSet) erro
4141
func (h *DummyEventHandler) OnPosSynced(*replication.EventHeader, mysql.Position, mysql.GTIDSet, bool) error {
4242
return nil
4343
}
44-
func (h *DummyEventHandler) OnQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent,
45-
stmt ast.StmtNode, pos mysql.Position, force bool) (bool, error) {
46-
return force, nil
44+
func (h *DummyEventHandler) OnQueryEvent(*replication.BinlogEvent, *replication.QueryEvent, ast.StmtNode, *Position) error {
45+
return nil
4746
}
4847
func (h *DummyEventHandler) String() string { return "DummyEventHandler" }
4948

canal/sync.go

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,19 @@ func (c *Canal) runSyncBinlog() error {
141141
c.cfg.Logger.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
142142
continue
143143
}
144+
posInfo := &Position{
145+
Position: pos,
146+
SavePos: savePos,
147+
Force: force,
148+
}
144149
for _, stmt := range stmts {
145-
err := c.handleQueryEvent(ev, e, stmt, pos, &savePos, &force)
150+
err := c.handleQueryEvent(ev, e, stmt, posInfo)
146151
if err != nil {
147152
c.cfg.Logger.Errorf("handle query event(%s) err %v", e.Query, err)
148153
}
149154
}
155+
savePos = posInfo.SavePos
156+
force = posInfo.Force
150157
if savePos && e.GSet != nil {
151158
c.master.UpdateGTIDSet(e.GSet)
152159
}
@@ -324,20 +331,27 @@ func (c *Canal) CatchMasterPos(timeout time.Duration) error {
324331
return c.WaitUntilPos(pos, timeout)
325332
}
326333

334+
type Position struct {
335+
mysql.Position
336+
SavePos bool
337+
Force bool
338+
}
339+
327340
// handleQueryEvent is handle some common query events (e.g., DDL,CREATE or DROP USER,GRANT),
328341
// others use UnknownQueryEvent unified callbacks to expose to users
329342
func (c *Canal) handleQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent,
330-
stmt ast.StmtNode, pos mysql.Position, savePos, force *bool) error {
343+
stmt ast.StmtNode, pos *Position) error {
344+
331345
switch t := stmt.(type) {
332346
case *ast.RenameTableStmt, *ast.AlterTableStmt, *ast.DropTableStmt, *ast.CreateTableStmt, *ast.TruncateTableStmt:
333-
return c.handleDDLEvent(ev, e, t, pos, savePos, force)
347+
return c.handleDDLEvent(ev, e, t, pos)
334348
default:
335-
return c.handleUnknownQueryEvent(ev, e, t, pos, savePos, force)
349+
return c.handleUnknownQueryEvent(ev, e, t, pos)
336350
}
337351
}
338352

339353
func (c *Canal) handleDDLEvent(ev *replication.BinlogEvent, e *replication.QueryEvent,
340-
stmt ast.StmtNode, pos mysql.Position, savePos, force *bool) error {
354+
stmt ast.StmtNode, pos *Position) error {
341355
nodes := parseStmt(stmt)
342356
for _, node := range nodes {
343357
if node.db == "" {
@@ -348,25 +362,19 @@ func (c *Canal) handleDDLEvent(ev *replication.BinlogEvent, e *replication.Query
348362
}
349363
}
350364
if len(nodes) > 0 {
351-
*savePos = true
352-
*force = true
365+
pos.SavePos = true
366+
pos.Force = true
353367
// Now we only handle Table Changed DDL, maybe we will support more later.
354-
if err := c.eventHandler.OnDDL(ev.Header, pos, e); err != nil {
368+
if err := c.eventHandler.OnDDL(ev.Header, pos.Position, e); err != nil {
355369
return errors.Trace(err)
356370
}
357371
}
358372
return nil
359373
}
360374

361-
func (c *Canal) handleUnknownQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent,
362-
stmt ast.StmtNode, pos mysql.Position, savePos, force *bool) error {
363-
f, err := c.eventHandler.OnQueryEvent(ev, e, stmt, pos, *force)
364-
if err != nil {
375+
func (c *Canal) handleUnknownQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent, stmt ast.StmtNode, pos *Position) error {
376+
if err := c.eventHandler.OnQueryEvent(ev, e, stmt, pos); err != nil {
365377
return errors.Trace(err)
366378
}
367-
if f {
368-
*savePos = true
369-
*force = true
370-
}
371379
return nil
372380
}

0 commit comments

Comments
 (0)