Skip to content

Commit 87028ff

Browse files
authored
fix canal test (go-mysql-org#118)
1 parent 3d30cde commit 87028ff

File tree

4 files changed

+44
-21
lines changed

4 files changed

+44
-21
lines changed

canal/canal_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"os"
88
"testing"
9+
"time"
910

1011
"github.com/ngaut/log"
1112
. "github.com/pingcap/check"
@@ -92,7 +93,7 @@ func (s *canalTestSuite) TestCanal(c *C) {
9293
s.execute(c, "ALTER TABLE test.canal_test ADD `age` INT(5) NOT NULL AFTER `name`")
9394
s.execute(c, "INSERT INTO test.canal_test (name,age) VALUES (?,?)", "d", "18")
9495

95-
err := s.c.CatchMasterPos(100)
96+
err := s.c.CatchMasterPos(10 * time.Second)
9697
c.Assert(err, IsNil)
9798
}
9899

@@ -101,8 +102,8 @@ func TestAlterTableExp(t *testing.T) {
101102
"ALTER TABLE `mydb`.`mytable` ADD `field2` DATE NULL AFTER `field1`;",
102103
"ALTER TABLE `mytable` ADD `field2` DATE NULL AFTER `field1`;",
103104
"ALTER TABLE mydb.mytable ADD `field2` DATE NULL AFTER `field1`;",
104-
"ALTER table mytable ADD `field2` DATE NULL AFTER `field1`;",
105-
"alter TABLE mydb.mytable ADD field2 DATE NULL AFTER `field1`;",
105+
"ALTER TABLE mytable ADD `field2` DATE NULL AFTER `field1`;",
106+
"ALTER TABLE mydb.mytable ADD field2 DATE NULL AFTER `field1`;",
106107
}
107108

108109
table := []byte("mytable")

canal/master.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type masterInfo struct {
2020

2121
name string
2222

23-
l sync.Mutex
23+
l sync.RWMutex
2424

2525
lastSaveTime time.Time
2626
}
@@ -68,6 +68,7 @@ func (m *masterInfo) Save(force bool) error {
6868
}
6969

7070
func (m *masterInfo) Update(name string, pos uint32) {
71+
log.Debugf("update master position (%s, %d)", name, pos)
7172
m.l.Lock()
7273
m.Name = name
7374
m.Position = pos
@@ -76,10 +77,10 @@ func (m *masterInfo) Update(name string, pos uint32) {
7677

7778
func (m *masterInfo) Pos() mysql.Position {
7879
var pos mysql.Position
79-
m.l.Lock()
80+
m.l.RLock()
8081
pos.Name = m.Name
8182
pos.Pos = m.Position
82-
m.l.Unlock()
83+
m.l.RUnlock()
8384

8485
return pos
8586
}

canal/sync.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/ngaut/log"
1111
"github.com/siddontang/go-mysql/mysql"
1212
"github.com/siddontang/go-mysql/replication"
13+
"github.com/siddontang/go-mysql/schema"
1314
)
1415

1516
var (
@@ -62,7 +63,9 @@ func (c *Canal) startSyncBinlog() error {
6263
log.Infof("rotate binlog to %v", pos)
6364
case *replication.RowsEvent:
6465
// we only focus row based event
65-
if err = c.handleRowsEvent(ev); err != nil {
66+
err = c.handleRowsEvent(ev)
67+
if err != nil && errors.Cause(err) != schema.ErrTableNotExist {
68+
// We can ignore table not exist error
6669
log.Errorf("handle rows event error %v", err)
6770
return errors.Trace(err)
6871
}
@@ -119,21 +122,18 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
119122
return c.travelRowsEventHandler(events)
120123
}
121124

122-
func (c *Canal) WaitUntilPos(pos mysql.Position, timeout int) error {
123-
if timeout <= 0 {
124-
timeout = 60
125-
}
126-
127-
timer := time.NewTimer(time.Duration(timeout) * time.Second)
125+
func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error {
126+
timer := time.NewTimer(timeout)
128127
for {
129128
select {
130129
case <-timer.C:
131-
return errors.Errorf("wait position %v err", pos)
130+
return errors.Errorf("wait position %v too long > %s", pos, timeout)
132131
default:
133-
curpos := c.master.Pos()
134-
if curpos.Compare(pos) >= 0 {
132+
curPos := c.master.Pos()
133+
if curPos.Compare(pos) >= 0 {
135134
return nil
136135
} else {
136+
log.Debugf("master pos is %v, wait catching %v", curPos, pos)
137137
time.Sleep(100 * time.Millisecond)
138138
}
139139
}
@@ -142,7 +142,7 @@ func (c *Canal) WaitUntilPos(pos mysql.Position, timeout int) error {
142142
return nil
143143
}
144144

145-
func (c *Canal) CatchMasterPos(timeout int) error {
145+
func (c *Canal) CatchMasterPos(timeout time.Duration) error {
146146
rr, err := c.Execute("SHOW MASTER STATUS")
147147
if err != nil {
148148
return errors.Trace(err)

schema/schema.go

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"github.com/siddontang/go-mysql/mysql"
1313
)
1414

15+
var ErrTableNotExist = errors.New("table is not exist")
16+
1517
const (
1618
TYPE_NUMBER = iota + 1 // tinyint, smallint, mediumint, int, bigint, year
1719
TYPE_FLOAT // float, double
@@ -144,20 +146,39 @@ func (idx *Index) FindColumn(name string) int {
144146
return -1
145147
}
146148

149+
func isTableExist(conn mysql.Executer, schema string, name string) (bool, error) {
150+
query := fmt.Sprintf("SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '%s' and TABLE_NAME = '%s' LIMIT 1", schema, name)
151+
r, err := conn.Execute(query)
152+
if err != nil {
153+
return false, errors.Trace(err)
154+
}
155+
156+
return r.RowNumber() == 1, nil
157+
}
158+
147159
func NewTable(conn mysql.Executer, schema string, name string) (*Table, error) {
160+
ok, err := isTableExist(conn, schema, name)
161+
if err != nil {
162+
return nil, errors.Trace(err)
163+
}
164+
165+
if !ok {
166+
return nil, ErrTableNotExist
167+
}
168+
148169
ta := &Table{
149170
Schema: schema,
150171
Name: name,
151172
Columns: make([]TableColumn, 0, 16),
152173
Indexes: make([]*Index, 0, 8),
153174
}
154175

155-
if err := ta.fetchColumns(conn); err != nil {
156-
return nil, err
176+
if err = ta.fetchColumns(conn); err != nil {
177+
return nil, errors.Trace(err)
157178
}
158179

159-
if err := ta.fetchIndexes(conn); err != nil {
160-
return nil, err
180+
if err = ta.fetchIndexes(conn); err != nil {
181+
return nil, errors.Trace(err)
161182
}
162183

163184
return ta, nil

0 commit comments

Comments
 (0)