Skip to content

Commit e1f53c0

Browse files
authored
Merge pull request #9 from axfor/feature-axx
Impove query event param position
2 parents efa4423 + ae1b091 commit e1f53c0

File tree

2 files changed

+26
-22
lines changed

2 files changed

+26
-22
lines changed

canal/handler.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ type EventHandler interface {
1919
// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
2020
OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error
2121
// OnQueryEvent is query event include(create user,drop user,create index event,etd.)
22-
OnQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent, stmt ast.StmtNode, pos mysql.Position, force bool) (bool, error)
22+
OnQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent, stmt ast.StmtNode, pos *Position) error
2323
String() string
2424
}
2525

@@ -41,9 +41,8 @@ func (h *DummyEventHandler) OnGTID(*replication.EventHeader, mysql.GTIDSet) erro
4141
func (h *DummyEventHandler) OnPosSynced(*replication.EventHeader, mysql.Position, mysql.GTIDSet, bool) error {
4242
return nil
4343
}
44-
func (h *DummyEventHandler) OnQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent,
45-
stmt ast.StmtNode, pos mysql.Position, force bool) (bool, error) {
46-
return force, nil
44+
func (h *DummyEventHandler) OnQueryEvent(*replication.BinlogEvent, *replication.QueryEvent, ast.StmtNode, *Position) error {
45+
return nil
4746
}
4847
func (h *DummyEventHandler) String() string { return "DummyEventHandler" }
4948

canal/sync.go

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -141,12 +141,19 @@ func (c *Canal) runSyncBinlog() error {
141141
c.cfg.Logger.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
142142
continue
143143
}
144+
posInfo := &Position{
145+
Position: pos,
146+
SavePos: savePos,
147+
Force: force,
148+
}
144149
for _, stmt := range stmts {
145-
err := c.handleQueryEvent(ev, e, stmt, pos, &savePos, &force)
150+
err := c.handleQueryEvent(ev, e, stmt, posInfo)
146151
if err != nil {
147152
c.cfg.Logger.Errorf("handle query event(%s) err %v", e.Query, err)
148153
}
149154
}
155+
savePos = posInfo.SavePos
156+
force = posInfo.Force
150157
if savePos && e.GSet != nil {
151158
c.master.UpdateGTIDSet(e.GSet)
152159
}
@@ -324,20 +331,24 @@ func (c *Canal) CatchMasterPos(timeout time.Duration) error {
324331
return c.WaitUntilPos(pos, timeout)
325332
}
326333

334+
type Position struct {
335+
mysql.Position
336+
SavePos bool
337+
Force bool
338+
}
339+
327340
// handleQueryEvent is handle some common query events (e.g., DDL,CREATE or DROP USER,GRANT),
328341
// others use UnknownQueryEvent unified callbacks to expose to users
329-
func (c *Canal) handleQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent,
330-
stmt ast.StmtNode, pos mysql.Position, savePos, force *bool) error {
342+
func (c *Canal) handleQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent, stmt ast.StmtNode, pos *Position) error {
331343
switch t := stmt.(type) {
332344
case *ast.RenameTableStmt, *ast.AlterTableStmt, *ast.DropTableStmt, *ast.CreateTableStmt, *ast.TruncateTableStmt:
333-
return c.handleDDLEvent(ev, e, t, pos, savePos, force)
345+
return c.handleDDLEvent(ev, e, t, pos)
334346
default:
335-
return c.handleUnknownQueryEvent(ev, e, t, pos, savePos, force)
347+
return c.handleUnknownQueryEvent(ev, e, t, pos)
336348
}
337349
}
338350

339-
func (c *Canal) handleDDLEvent(ev *replication.BinlogEvent, e *replication.QueryEvent,
340-
stmt ast.StmtNode, pos mysql.Position, savePos, force *bool) error {
351+
func (c *Canal) handleDDLEvent(ev *replication.BinlogEvent, e *replication.QueryEvent, stmt ast.StmtNode, pos *Position) error {
341352
nodes := parseStmt(stmt)
342353
for _, node := range nodes {
343354
if node.db == "" {
@@ -348,25 +359,19 @@ func (c *Canal) handleDDLEvent(ev *replication.BinlogEvent, e *replication.Query
348359
}
349360
}
350361
if len(nodes) > 0 {
351-
*savePos = true
352-
*force = true
362+
pos.SavePos = true
363+
pos.Force = true
353364
// Now we only handle Table Changed DDL, maybe we will support more later.
354-
if err := c.eventHandler.OnDDL(ev.Header, pos, e); err != nil {
365+
if err := c.eventHandler.OnDDL(ev.Header, pos.Position, e); err != nil {
355366
return errors.Trace(err)
356367
}
357368
}
358369
return nil
359370
}
360371

361-
func (c *Canal) handleUnknownQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent,
362-
stmt ast.StmtNode, pos mysql.Position, savePos, force *bool) error {
363-
f, err := c.eventHandler.OnQueryEvent(ev, e, stmt, pos, *force)
364-
if err != nil {
372+
func (c *Canal) handleUnknownQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent, stmt ast.StmtNode, pos *Position) error {
373+
if err := c.eventHandler.OnQueryEvent(ev, e, stmt, pos); err != nil {
365374
return errors.Trace(err)
366375
}
367-
if f {
368-
*savePos = true
369-
*force = true
370-
}
371376
return nil
372377
}

0 commit comments

Comments
 (0)