11package  canal
22
33import  (
4+ "log/slog" 
45"sync/atomic" 
56"time" 
67
@@ -20,15 +21,15 @@ func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
2021if  err  !=  nil  {
2122return  nil , errors .Errorf ("start sync replication at binlog %v error %v" , pos , err )
2223}
23- c .cfg .Logger .Infof ("start sync binlog at binlog file %v"  , pos )
24+ c .cfg .Logger .Info ("start sync binlog at binlog file"  ,  slog . Any ( "pos" , pos ) )
2425return  s , nil 
2526} else  {
2627gsetClone  :=  gset .Clone ()
2728s , err  :=  c .syncer .StartSyncGTID (gset )
2829if  err  !=  nil  {
2930return  nil , errors .Errorf ("start sync replication at GTID set %v error %v" , gset , err )
3031}
31- c .cfg .Logger .Infof ("start sync binlog at GTID set %v"  , gsetClone )
32+ c .cfg .Logger .Info ("start sync binlog at GTID set"  ,  slog . Any ( "gset" , gsetClone ) )
3233return  s , nil 
3334}
3435}
@@ -57,7 +58,7 @@ func (c *Canal) runSyncBinlog() error {
5758// and https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/rpl_binlog_sender.cc#L899 
5859if  ev .Header .Timestamp  ==  0  {
5960fakeRotateLogName  :=  string (e .NextLogName )
60- c .cfg .Logger .Infof ("received fake rotate event, next log name is %s"  ,  e .NextLogName )
61+ c .cfg .Logger .Info ("received fake rotate event"  ,  slog . String ( "nextLogName" ,  string ( e .NextLogName )) )
6162
6263if  fakeRotateLogName  !=  c .master .Position ().Name  {
6364c .cfg .Logger .Info ("log name changed, the fake rotate event will be handled as a real rotate event" )
@@ -93,17 +94,16 @@ func (c *Canal) handleEvent(ev *replication.BinlogEvent) error {
9394case  * replication.RotateEvent :
9495pos .Name  =  string (e .NextLogName )
9596pos .Pos  =  uint32 (e .Position )
96- c .cfg .Logger .Infof ("rotate binlog to %s"  , pos )
97+ c .cfg .Logger .Info ("rotate binlog"  ,  slog . Any ( "pos" , pos ) )
9798savePos  =  true 
9899force  =  true 
99100if  err  =  c .eventHandler .OnRotate (ev .Header , e ); err  !=  nil  {
100101return  errors .Trace (err )
101102}
102103case  * replication.RowsEvent :
103104// we only focus row based event 
104- err  =  c .handleRowsEvent (ev )
105- if  err  !=  nil  {
106- c .cfg .Logger .Errorf ("handle rows event at (%s, %d) error %v" , pos .Name , curPos , err )
105+ if  err  :=  c .handleRowsEvent (ev ); err  !=  nil  {
106+ c .cfg .Logger .Error ("handle rows event" , slog .String ("file" , pos .Name ), slog .Uint64 ("position" , uint64 (curPos )), slog .Any ("error" , err ))
107107return  errors .Trace (err )
108108}
109109return  nil 
@@ -113,7 +113,7 @@ func (c *Canal) handleEvent(ev *replication.BinlogEvent) error {
113113for  _ , subEvent  :=  range  ev .Events  {
114114err  =  c .handleEvent (subEvent )
115115if  err  !=  nil  {
116- c .cfg .Logger .Errorf ("handle transaction payload subevent at (%s, %d) error %v"  , pos .Name ,  curPos ,  err )
116+ c .cfg .Logger .Error ("handle transaction payload subevent"  ,  slog . String ( "file" , pos .Name ),  slog . Uint64 ( "position" ,  uint64 ( curPos )),  slog . Any ( "error" ,  err ) )
117117return  errors .Trace (err )
118118}
119119}
@@ -144,7 +144,7 @@ func (c *Canal) handleEvent(ev *replication.BinlogEvent) error {
144144if  err  !=  nil  {
145145// The parser does not understand all syntax. 
146146// For example, it won't parse [CREATE|DROP] TRIGGER statements. 
147- c .cfg .Logger .Errorf ( "parse query(%s) err %v , will skip this event"e .Query ,  err )
147+ c .cfg .Logger .Error ( "error parsing query , will skip this event"slog . String ( "query" ,  string ( e .Query )),  slog . Any ( "error" ,  err ) )
148148return  nil 
149149}
150150if  len (stmts ) >  0  {
@@ -246,7 +246,7 @@ func parseStmt(stmt ast.StmtNode) (ns []*node) {
246246
247247func  (c  * Canal ) updateTable (header  * replication.EventHeader , db , table  string ) (err  error ) {
248248c .ClearTableCache ([]byte (db ), []byte (table ))
249- c .cfg .Logger .Infof ("table structure changed, clear table cache: %s.%s \n "  , db ,  table )
249+ c .cfg .Logger .Info ("table structure changed, clear table cache"  ,  slog . String ( "database" , db ),  slog . String ( " table" ,  table ) )
250250if  err  =  c .eventHandler .OnTableChanged (header , db , table ); err  !=  nil  &&  errors .Cause (err ) !=  schema .ErrTableNotExist  {
251251return  errors .Trace (err )
252252}
@@ -316,7 +316,8 @@ func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error {
316316if  curPos .Compare (pos ) >=  0  {
317317return  nil 
318318} else  {
319- c .cfg .Logger .Debugf ("master pos is %v, wait catching %v" , curPos , pos )
319+ c .cfg .Logger .Debug ("master pos is behind, wait to catch up" , slog .String ("master file" , curPos .Name ), slog .Uint64 ("master position" , uint64 (curPos .Pos )),
320+ slog .String ("target file" , pos .Name ), slog .Uint64 ("target position" , uint64 (curPos .Pos )))
320321time .Sleep (100  *  time .Millisecond )
321322}
322323}
0 commit comments