Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions canal/canal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func TestCreateTableExp(t *testing.T) {
t.Fatalf("TestCreateTableExp:case %s failed\n", s)
}
for _, st := range stmts {
nodes := parseStmt(st)
nodes := parseDDLStmt(st)
if len(nodes) == 0 {
continue
}
Expand Down Expand Up @@ -207,7 +207,7 @@ func TestAlterTableExp(t *testing.T) {
t.Fatalf("TestAlterTableExp:case %s failed\n", s)
}
for _, st := range stmts {
nodes := parseStmt(st)
nodes := parseDDLStmt(st)
if len(nodes) == 0 {
continue
}
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestRenameTableExp(t *testing.T) {
t.Fatalf("TestRenameTableExp:case %s failed\n", s)
}
for _, st := range stmts {
nodes := parseStmt(st)
nodes := parseDDLStmt(st)
if len(nodes) == 0 {
continue
}
Expand Down Expand Up @@ -285,7 +285,7 @@ func TestDropTableExp(t *testing.T) {
t.Fatalf("TestDropTableExp:case %s failed\n", s)
}
for _, st := range stmts {
nodes := parseStmt(st)
nodes := parseDDLStmt(st)
if len(nodes) == 0 {
continue
}
Expand Down Expand Up @@ -329,7 +329,7 @@ func TestWithoutSchemeExp(t *testing.T) {
t.Fatalf("TestCreateTableExp:case %s failed\n", s.Query)
}
for _, st := range stmts {
nodes := parseStmt(st)
nodes := parseDDLStmt(st)
if len(nodes) == 0 {
continue
}
Expand Down
12 changes: 10 additions & 2 deletions canal/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package canal
import (
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/pingcap/tidb/parser/ast"
)

type EventHandler interface {
Expand All @@ -17,18 +18,22 @@ type EventHandler interface {
OnGTID(header *replication.EventHeader, gtid mysql.GTIDSet) error
// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
OnPosSynced(header *replication.EventHeader, pos mysql.Position, set mysql.GTIDSet, force bool) error
// OnQueryEvent is query event include(create user,drop user,create index event,etd.)
// Note: the OnQueryEvent has lower priority than OnDDL event
OnQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent, stmt ast.StmtNode, pos *Position) error
String() string
}

type DummyEventHandler struct {
}
type DummyEventHandler struct{}

func (h *DummyEventHandler) OnRotate(*replication.EventHeader, *replication.RotateEvent) error {
return nil
}

func (h *DummyEventHandler) OnTableChanged(*replication.EventHeader, string, string) error {
return nil
}

func (h *DummyEventHandler) OnDDL(*replication.EventHeader, mysql.Position, *replication.QueryEvent) error {
return nil
}
Expand All @@ -39,6 +44,9 @@ func (h *DummyEventHandler) OnPosSynced(*replication.EventHeader, mysql.Position
return nil
}

func (h *DummyEventHandler) OnQueryEvent(*replication.BinlogEvent, *replication.QueryEvent, ast.StmtNode, *Position) error {
return nil
}
func (h *DummyEventHandler) String() string { return "DummyEventHandler" }

// `SetEventHandler` registers the sync handler, you must register your
Expand Down
69 changes: 54 additions & 15 deletions canal/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,25 +141,29 @@ func (c *Canal) runSyncBinlog() error {
c.cfg.Logger.Errorf("parse query(%s) err %v, will skip this event", e.Query, err)
continue
}
posInfo := &Position{
Position: pos,
SavePos: savePos,
Force: force,
}
for _, stmt := range stmts {
nodes := parseStmt(stmt)
for _, node := range nodes {
if node.db == "" {
node.db = string(e.Schema)
}
if err = c.updateTable(ev.Header, node.db, node.table); err != nil {
return errors.Trace(err)
}
}
nodes := parseDDLStmt(stmt)
if len(nodes) > 0 {
savePos = true
force = true
// Now we only handle Table Changed DDL, maybe we will support more later.
if err = c.eventHandler.OnDDL(ev.Header, pos, e); err != nil {
return errors.Trace(err)
posInfo.SavePos = true
posInfo.Force = true
err := c.handleDDLEvent(ev, e, nodes, posInfo)
if err != nil {
c.cfg.Logger.Errorf("handle ddl event err %v", err)
}
} else {
err := c.handleQueryEvent(ev, e, stmt, posInfo)
if err != nil {
c.cfg.Logger.Errorf("handle query event err %v", err)
}
}
}
savePos = posInfo.SavePos
force = posInfo.Force
if savePos && e.GSet != nil {
c.master.UpdateGTIDSet(e.GSet)
}
Expand All @@ -183,7 +187,7 @@ type node struct {
table string
}

func parseStmt(stmt ast.StmtNode) (ns []*node) {
func parseDDLStmt(stmt ast.StmtNode) (ns []*node) {
switch t := stmt.(type) {
case *ast.RenameTableStmt:
for _, tableInfo := range t.TableToTables {
Expand Down Expand Up @@ -231,6 +235,7 @@ func (c *Canal) updateTable(header *replication.EventHeader, db, table string) (
}
return
}

func (c *Canal) updateReplicationDelay(ev *replication.BinlogEvent) {
var newDelay uint32
now := uint32(time.Now().Unix())
Expand Down Expand Up @@ -336,3 +341,37 @@ func (c *Canal) CatchMasterPos(timeout time.Duration) error {

return c.WaitUntilPos(pos, timeout)
}

type Position struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need it?

Copy link
Contributor Author

@axfor axfor Jun 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on mysql.Position adds SavePos and Force fields

I think queryEvent contains multiple sub-events, pos save logic, and it is up to the end-user event to decide whether to save or not


image
func (h *DummyEventHandler) OnQueryEvent(header *replication.EventHeader, stmt ast.StmtNode, pos *Position, e *replication.QueryEvent) error { //.... pos.SavePos =true return nil }

Of course, it can also be handled uniformly in the main loop of canal/sync.go events.

https://github.com/axfor/go-mysql/blob/feature-axx/canal/sync.go#L161

func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) { //.... for _, stmt := range stmts { nodes := parseDDLStmt(stmt) if len(nodes) > 0 { // OnDDL	} else { err := c.handleQueryEvent(ev.Header, stmt, posInfo, e) if err != nil { c.cfg.Logger.Errorf("handle query event err %v", err)	} else { savePos = true // for save pos force = true // for save pos	} } //.... } //.... }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is up to the end-user event to decide whether to save or not

I think it's more obvious to use return value to do it.

OnQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent, stmt ast.StmtNode, pos mysql.Position) (savePos bool, force bool, err error) 
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll modify it

mysql.Position
SavePos bool
Force bool
}

// handleDDLEvent is handle DDL event
func (c *Canal) handleDDLEvent(ev *replication.BinlogEvent, e *replication.QueryEvent, nodes []*node, pos *Position) error {
for _, node := range nodes {
if node.db == "" {
node.db = string(e.Schema)
}
if err := c.updateTable(ev.Header, node.db, node.table); err != nil {
return errors.Trace(err)
}
}
if len(nodes) > 0 {
// Now we only handle Table Changed DDL, maybe we will support more later.
if err := c.eventHandler.OnDDL(ev.Header, pos.Position, e); err != nil {
return errors.Trace(err)
}
}
return nil
}

// handleQueryEvent is handle some common query events (e.g., DDL,CREATE or DROP USER,GRANT)
// DDL event use handleDDLEvent, others use the handleQueryEvent
func (c *Canal) handleQueryEvent(ev *replication.BinlogEvent, e *replication.QueryEvent, stmt ast.StmtNode, pos *Position) error {
if err := c.eventHandler.OnQueryEvent(ev, e, stmt, pos); err != nil {
return errors.Trace(err)
}
return nil
}
10 changes: 6 additions & 4 deletions client/resp.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,10 @@ func (c *Conn) readResultsetStreaming(data []byte, binary bool, result *Result,
return nil
}

func (c *Conn) readResultColumns(result *Result) (err error) {
func (c *Conn) readResultColumns(result *Result) error {
var i = 0
var data []byte

var err error
for {
rawPkgLen := len(result.RawPkg)
result.RawPkg, err = c.ReadPacketReuseMem(result.RawPkg)
Expand Down Expand Up @@ -378,8 +378,9 @@ func (c *Conn) readResultColumns(result *Result) (err error) {
}
}

func (c *Conn) readResultRows(result *Result, isBinary bool) (err error) {
func (c *Conn) readResultRows(result *Result, isBinary bool) error {
var data []byte
var err error

for {
rawPkgLen := len(result.RawPkg)
Expand Down Expand Up @@ -425,10 +426,11 @@ func (c *Conn) readResultRows(result *Result, isBinary bool) (err error) {
return nil
}

func (c *Conn) readResultRowsStreaming(result *Result, isBinary bool, perRowCb SelectPerRowCallback) (err error) {
func (c *Conn) readResultRowsStreaming(result *Result, isBinary bool, perRowCb SelectPerRowCallback) error {
var (
data []byte
row []FieldValue
err error
)

for {
Expand Down
3 changes: 2 additions & 1 deletion mysql/field.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ const (
FieldValueTypeString
)

func (f *Field) Parse(p FieldData) (err error) {
func (f *Field) Parse(p FieldData) error {
f.Data = p

var n int
var err error
pos := 0
//skip catelog, always def
n, err = SkipLengthEncodedString(p)
Expand Down