Skip to content

Commit 1a46a5f

Browse files
committed
canal: add XID handler.
1 parent 000fedb commit 1a46a5f

File tree

2 files changed

+7
-2
lines changed

2 files changed

+7
-2
lines changed

canal/handler.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ type EventHandler interface {
1010
OnRotate(ctx context.Context, roateEvent *replication.RotateEvent) error
1111
OnDDL(ctx context.Context, nextPos mysql.Position, queryEvent *replication.QueryEvent) error
1212
OnRow(ctx context.Context, e *RowsEvent) error
13+
OnXID(ctx context.Context, nextPos mysql.Position) error
1314
String() string
1415
}
1516

@@ -20,8 +21,9 @@ func (h *DummyEventHandler) OnRotate(context.Context, *replication.RotateEvent)
2021
func (h *DummyEventHandler) OnDDL(context.Context, mysql.Position, *replication.QueryEvent) error {
2122
return nil
2223
}
23-
func (h *DummyEventHandler) OnRow(context.Context, *RowsEvent) error { return nil }
24-
func (h *DummyEventHandler) String() string { return "DummyEventHandler" }
24+
func (h *DummyEventHandler) OnRow(context.Context, *RowsEvent) error { return nil }
25+
func (h *DummyEventHandler) OnXID(context.Context, mysql.Position) error { return nil }
26+
func (h *DummyEventHandler) String() string { return "DummyEventHandler" }
2527

2628
// `SetEventHandler` registers the sync handler, you must register your
2729
// own handler before starting Canal.

canal/sync.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ func (c *Canal) startSyncBinlog() error {
7272
continue
7373
case *replication.XIDEvent:
7474
// try to save the position later
75+
if err := c.eventHandler.OnXID(c.ctx, pos); err != nil {
76+
return errors.Trace(err)
77+
}
7578
case *replication.QueryEvent:
7679
// handle alert table query
7780
if mb := expAlterTable.FindSubmatch(e.Query); mb != nil {

0 commit comments

Comments
 (0)