@@ -10,7 +10,6 @@ import (
1010"github.com/juju/errors"
1111"github.com/shopspring/decimal"
1212"github.com/siddontang/go-log/log"
13- "github.com/siddontang/go-mysql/dump"
1413"github.com/siddontang/go-mysql/mysql"
1514"github.com/siddontang/go-mysql/schema"
1615)
@@ -50,47 +49,43 @@ func (h *dumpParseHandler) Data(db string, table string, values []string) error
5049for i , v := range values {
5150if v == "NULL" {
5251vs [i ] = nil
52+ } else if v == "_binary ''" {
53+ vs [i ] = []byte {}
5354} else if v [0 ] != '\'' {
5455if tableInfo .Columns [i ].Type == schema .TYPE_NUMBER {
5556n , err := strconv .ParseInt (v , 10 , 64 )
5657if err != nil {
57- log .Errorf ("parse row %v at %d error %v, skip" , values , i , err )
58- return dump .ErrSkip
58+ return fmt .Errorf ("parse row %v at %d error %v, int expected" , values , i , err )
5959}
6060vs [i ] = n
6161} else if tableInfo .Columns [i ].Type == schema .TYPE_FLOAT {
6262f , err := strconv .ParseFloat (v , 64 )
6363if err != nil {
64- log .Errorf ("parse row %v at %d error %v, skip" , values , i , err )
65- return dump .ErrSkip
64+ return fmt .Errorf ("parse row %v at %d error %v, float expected" , values , i , err )
6665}
6766vs [i ] = f
6867} else if tableInfo .Columns [i ].Type == schema .TYPE_DECIMAL {
6968if h .c .cfg .UseDecimal {
7069d , err := decimal .NewFromString (v )
7170if err != nil {
72- log .Errorf ("parse row %v at %d error %v, skip" , values , i , err )
73- return dump .ErrSkip
71+ return fmt .Errorf ("parse row %v at %d error %v, decimal expected" , values , i , err )
7472}
7573vs [i ] = d
7674} else {
7775f , err := strconv .ParseFloat (v , 64 )
7876if err != nil {
79- log .Errorf ("parse row %v at %d error %v, skip" , values , i , err )
80- return dump .ErrSkip
77+ return fmt .Errorf ("parse row %v at %d error %v, float expected" , values , i , err )
8178}
8279vs [i ] = f
8380}
8481} else if strings .HasPrefix (v , "0x" ) {
8582buf , err := hex .DecodeString (v [2 :])
8683if err != nil {
87- log .Errorf ("parse row %v at %d error %v, skip" , values , i , err )
88- return dump .ErrSkip
84+ return fmt .Errorf ("parse row %v at %d error %v, hex literal expected" , values , i , err )
8985}
9086vs [i ] = string (buf )
9187} else {
92- log .Errorf ("parse row %v error, invalid type at %d, skip" , values , i )
93- return dump .ErrSkip
88+ return fmt .Errorf ("parse row %v error, invalid type at %d" , values , i )
9489}
9590} else {
9691vs [i ] = v [1 : len (v )- 1 ]
@@ -130,6 +125,8 @@ func (c *Canal) dump() error {
130125return errors .New ("mysqldump does not exist" )
131126}
132127
128+ c .master .UpdateTimestamp (uint32 (time .Now ().Unix ()))
129+
133130h := & dumpParseHandler {c : c }
134131// If users call StartFromGTID with empty position to start dumping with gtid,
135132// we record the current gtid position before dump starts.
@@ -161,7 +158,9 @@ func (c *Canal) dump() error {
161158
162159pos := mysql.Position {Name : h .name , Pos : uint32 (h .pos )}
163160c .master .Update (pos )
164- c .eventHandler .OnPosSynced (pos , true )
161+ if err := c .eventHandler .OnPosSynced (pos , true ); err != nil {
162+ return errors .Trace (err )
163+ }
165164var startPos fmt.Stringer = pos
166165if h .gset != nil {
167166c .master .UpdateGTIDSet (h .gset )
0 commit comments