44"regexp"
55"time"
66
7- "golang.org/x/net/context"
8-
97"github.com/juju/errors"
108"github.com/ngaut/log"
119"github.com/siddontang/go-mysql/mysql"
@@ -27,23 +25,13 @@ func (c *Canal) startSyncBinlog() error {
2725return errors .Errorf ("start sync replication at %v error %v" , pos , err )
2826}
2927
30- timeout := time .Second
3128for {
32- ctx , cancel := context .WithTimeout (c .ctx , 2 * time .Second )
33- ev , err := s .GetEvent (ctx )
34- cancel ()
35-
36- if err == context .DeadlineExceeded {
37- timeout = 2 * timeout
38- continue
39- }
29+ ev , err := s .GetEvent (c .ctx )
4030
4131if err != nil {
4232return errors .Trace (err )
4333}
4434
45- timeout = time .Second
46-
4735curPos := pos .Pos
4836//next binlog pos
4937pos .Pos = ev .Header .LogPos
@@ -58,7 +46,7 @@ func (c *Canal) startSyncBinlog() error {
5846pos .Pos = uint32 (e .Position )
5947log .Infof ("rotate binlog to %s" , pos )
6048
61- if err = c .eventHandler .OnRotate (c . ctx , e ); err != nil {
49+ if err = c .eventHandler .OnRotate (e ); err != nil {
6250return errors .Trace (err )
6351}
6452case * replication.RowsEvent :
@@ -72,7 +60,7 @@ func (c *Canal) startSyncBinlog() error {
7260continue
7361case * replication.XIDEvent :
7462// try to save the position later
75- if err := c .eventHandler .OnXID (c . ctx , pos ); err != nil {
63+ if err := c .eventHandler .OnXID (pos ); err != nil {
7664return errors .Trace (err )
7765}
7866case * replication.QueryEvent :
@@ -83,7 +71,7 @@ func (c *Canal) startSyncBinlog() error {
8371}
8472c .ClearTableCache (mb [1 ], mb [2 ])
8573log .Infof ("table structure changed, clear table cache: %s.%s\n " , mb [1 ], mb [2 ])
86- if err = c .eventHandler .OnDDL (c . ctx , pos , e ); err != nil {
74+ if err = c .eventHandler .OnDDL (pos , e ); err != nil {
8775return errors .Trace (err )
8876}
8977} else {
@@ -123,7 +111,7 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
123111return errors .Errorf ("%s not supported now" , e .Header .EventType )
124112}
125113events := newRowsEvent (t , action , ev .Rows )
126- return c .eventHandler .OnRow (c . ctx , events )
114+ return c .eventHandler .OnRow (events )
127115}
128116
129117func (c * Canal ) WaitUntilPos (pos mysql.Position , timeout time.Duration ) error {
0 commit comments