Skip to content

Commit 85341cc

Browse files
authored
Merge pull request #7 from axfor/feature-axx
added query event in dummyEventHandler1
2 parents 29bc749 + fabc38a commit 85341cc

File tree

4 files changed

+70
-34
lines changed

4 files changed

+70
-34
lines changed

canal/handler.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ type EventHandler interface {
1717
OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error
1818
// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
1919
OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error
20+
// OnQueryEvent is query event include(create user,drop user,create index event,etd.)
21+
OnQueryEvent(header *replication.EventHeader, queryEvent *replication.QueryEvent) error
2022
String() string
2123
}
2224

@@ -38,7 +40,9 @@ func (h *DummyEventHandler) OnGTID(*replication.EventHeader, mysql.GTIDSet) erro
3840
func (h *DummyEventHandler) OnPosSynced(*replication.EventHeader, mysql.Position, mysql.GTIDSet, bool) error {
3941
return nil
4042
}
41-
43+
func (h *DummyEventHandler) OnQueryEvent(header *replication.EventHeader, queryEvent *replication.QueryEvent) error {
44+
return nil
45+
}
4246
func (h *DummyEventHandler) String() string { return "DummyEventHandler" }
4347

4448
// `SetEventHandler` registers the sync handler, you must register your

canal/sync.go

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -142,22 +142,9 @@ func (c *Canal) runSyncBinlog() error {
142142
continue
143143
}
144144
for _, stmt := range stmts {
145-
nodes := parseStmt(stmt)
146-
for _, node := range nodes {
147-
if node.db == "" {
148-
node.db = string(e.Schema)
149-
}
150-
if err = c.updateTable(ev.Header, node.db, node.table); err != nil {
151-
return errors.Trace(err)
152-
}
153-
}
154-
if len(nodes) > 0 {
155-
savePos = true
156-
force = true
157-
// Now we only handle Table Changed DDL, maybe we will support more later.
158-
if err = c.eventHandler.OnDDL(ev.Header, pos, e); err != nil {
159-
return errors.Trace(err)
160-
}
145+
err := c.handleQueryEvent(ev, e, stmt, pos, &savePos, &force)
146+
if err != nil {
147+
c.cfg.Logger.Errorf("handle query event(%s) err %v", e.Query, err)
161148
}
162149
}
163150
if savePos && e.GSet != nil {
@@ -336,3 +323,45 @@ func (c *Canal) CatchMasterPos(timeout time.Duration) error {
336323

337324
return c.WaitUntilPos(pos, timeout)
338325
}
326+
327+
// handleQueryEvent is handle some common query events (e.g., DDL,CREATE or DROP USER,GRANT),
328+
// others use UnknownQueryEvent unified callbacks to expose to users
329+
func (c *Canal) handleQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent,
330+
stmt ast.StmtNode, pos mysql.Position, savePos, force *bool) error {
331+
switch t := stmt.(type) {
332+
case *ast.RenameTableStmt, *ast.AlterTableStmt, *ast.DropTableStmt, *ast.CreateTableStmt, *ast.TruncateTableStmt:
333+
return c.handleDDLEvent(ev, e, t, pos, savePos, force)
334+
default:
335+
return c.handleUnknownQueryEvent(ev, e, t, pos, savePos, force)
336+
}
337+
}
338+
339+
func (c *Canal) handleDDLEvent(ev *replication.BinlogEvent, e *replication.QueryEvent,
340+
stmt ast.StmtNode, pos mysql.Position, savePos, force *bool) error {
341+
nodes := parseStmt(stmt)
342+
for _, node := range nodes {
343+
if node.db == "" {
344+
node.db = string(e.Schema)
345+
}
346+
if err := c.updateTable(ev.Header, node.db, node.table); err != nil {
347+
return errors.Trace(err)
348+
}
349+
}
350+
if len(nodes) > 0 {
351+
*savePos = true
352+
*force = true
353+
// Now we only handle Table Changed DDL, maybe we will support more later.
354+
if err := c.eventHandler.OnDDL(ev.Header, pos, e); err != nil {
355+
return errors.Trace(err)
356+
}
357+
}
358+
return nil
359+
}
360+
361+
func (c *Canal) handleUnknownQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent,
362+
stmt ast.StmtNode, pos mysql.Position, savePos, force *bool) error {
363+
if err := c.eventHandler.OnQueryEvent(ev.Header, e); err != nil {
364+
return errors.Trace(err)
365+
}
366+
return nil
367+
}

client/resp.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -336,15 +336,15 @@ func (c *Conn) readResultsetStreaming(data []byte, binary bool, result *Result,
336336
return nil
337337
}
338338

339-
func (c *Conn) readResultColumns(result *Result) (err error) {
339+
func (c *Conn) readResultColumns(result *Result) error {
340340
var i = 0
341341
var data []byte
342-
342+
var err error
343343
for {
344344
rawPkgLen := len(result.RawPkg)
345345
result.RawPkg, err = c.ReadPacketReuseMem(result.RawPkg)
346346
if err != nil {
347-
return
347+
return err
348348
}
349349
data = result.RawPkg[rawPkgLen:]
350350

@@ -361,15 +361,15 @@ func (c *Conn) readResultColumns(result *Result) (err error) {
361361
err = ErrMalformPacket
362362
}
363363

364-
return
364+
return err
365365
}
366366

367367
if result.Fields[i] == nil {
368368
result.Fields[i] = &Field{}
369369
}
370370
err = result.Fields[i].Parse(data)
371371
if err != nil {
372-
return
372+
return err
373373
}
374374

375375
result.FieldNames[hack.String(result.Fields[i].Name)] = i
@@ -378,14 +378,15 @@ func (c *Conn) readResultColumns(result *Result) (err error) {
378378
}
379379
}
380380

381-
func (c *Conn) readResultRows(result *Result, isBinary bool) (err error) {
381+
func (c *Conn) readResultRows(result *Result, isBinary bool) error {
382382
var data []byte
383+
var err error
383384

384385
for {
385386
rawPkgLen := len(result.RawPkg)
386387
result.RawPkg, err = c.ReadPacketReuseMem(result.RawPkg)
387388
if err != nil {
388-
return
389+
return err
389390
}
390391
data = result.RawPkg[rawPkgLen:]
391392

@@ -425,16 +426,17 @@ func (c *Conn) readResultRows(result *Result, isBinary bool) (err error) {
425426
return nil
426427
}
427428

428-
func (c *Conn) readResultRowsStreaming(result *Result, isBinary bool, perRowCb SelectPerRowCallback) (err error) {
429+
func (c *Conn) readResultRowsStreaming(result *Result, isBinary bool, perRowCb SelectPerRowCallback) error {
429430
var (
430431
data []byte
431432
row []FieldValue
433+
err error
432434
)
433435

434436
for {
435437
data, err = c.ReadPacketReuseMem(data[:0])
436438
if err != nil {
437-
return
439+
return err
438440
}
439441

440442
// EOF Packet

mysql/field.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,50 +41,51 @@ const (
4141
FieldValueTypeString
4242
)
4343

44-
func (f *Field) Parse(p FieldData) (err error) {
44+
func (f *Field) Parse(p FieldData) error {
4545
f.Data = p
4646

4747
var n int
48+
var err error
4849
pos := 0
4950
//skip catelog, always def
5051
n, err = SkipLengthEncodedString(p)
5152
if err != nil {
52-
return
53+
return err
5354
}
5455
pos += n
5556

5657
//schema
5758
f.Schema, _, n, err = LengthEncodedString(p[pos:])
5859
if err != nil {
59-
return
60+
return err
6061
}
6162
pos += n
6263

6364
//table
6465
f.Table, _, n, err = LengthEncodedString(p[pos:])
6566
if err != nil {
66-
return
67+
return err
6768
}
6869
pos += n
6970

7071
//org_table
7172
f.OrgTable, _, n, err = LengthEncodedString(p[pos:])
7273
if err != nil {
73-
return
74+
return err
7475
}
7576
pos += n
7677

7778
//name
7879
f.Name, _, n, err = LengthEncodedString(p[pos:])
7980
if err != nil {
80-
return
81+
return err
8182
}
8283
pos += n
8384

8485
//org_name
8586
f.OrgName, _, n, err = LengthEncodedString(p[pos:])
8687
if err != nil {
87-
return
88+
return err
8889
}
8990
pos += n
9091

@@ -123,7 +124,7 @@ func (f *Field) Parse(p FieldData) (err error) {
123124

124125
if pos+int(f.DefaultValueLength) > len(p) {
125126
err = ErrMalformPacket
126-
return
127+
return err
127128
}
128129

129130
//default value string[$len]

0 commit comments

Comments
 (0)