Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions canal/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,14 @@ func (c *Canal) GetTable(db string, table string) (*schema.Table, error) {
return t, nil
}

// ClearTableCache clear table cache
func (c *Canal) ClearTableCache(db []byte, table []byte) {
key := fmt.Sprintf("%s.%s", db, table)
c.tableLock.Lock()
delete(c.tables, key)
c.tableLock.Unlock()
}

// Check MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB
func (c *Canal) CheckBinlogRowImage(image string) error {
// need to check MySQL binlog row image? full, minimal or noblob?
Expand Down
24 changes: 23 additions & 1 deletion canal/canal_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package canal

import (
"bytes"
"flag"
"fmt"
"os"
Expand Down Expand Up @@ -36,7 +37,7 @@ func (s *canalTestSuite) SetUpSuite(c *C) {
var err error
s.c, err = NewCanal(cfg)
c.Assert(err, IsNil)

s.execute(c, "DROP TABLE IF EXISTS test.canal_test")
sql := `
CREATE TABLE IF NOT EXISTS test.canal_test (
id int AUTO_INCREMENT,
Expand Down Expand Up @@ -88,7 +89,28 @@ func (s *canalTestSuite) TestCanal(c *C) {
for i := 1; i < 10; i++ {
s.execute(c, "INSERT INTO test.canal_test (name) VALUES (?)", fmt.Sprintf("%d", i))
}
s.execute(c, "ALTER TABLE test.canal_test ADD `age` INT(5) NOT NULL AFTER `name`")
s.execute(c, "INSERT INTO test.canal_test (name,age) VALUES (?,?)", "d", "18")

err := s.c.CatchMasterPos(100)
c.Assert(err, IsNil)
}

func TestAlterTableExp(t *testing.T) {
cases := []string{
"ALTER TABLE `mydb`.`mytable` ADD `field2` DATE NULL AFTER `field1`;",
"ALTER TABLE `mytable` ADD `field2` DATE NULL AFTER `field1`;",
"ALTER TABLE mydb.mytable ADD `field2` DATE NULL AFTER `field1`;",
"ALTER table mytable ADD `field2` DATE NULL AFTER `field1`;",
"alter TABLE mydb.mytable ADD field2 DATE NULL AFTER `field1`;",
}

table := []byte("mytable")
db := []byte("mydb")
for _, s := range cases {
m := expAlterTable.FindSubmatch([]byte(s))
if m == nil || !bytes.Equal(m[2], table) || (len(m[1]) > 0 && !bytes.Equal(m[1], db)) {
t.Fatalf("TestAlterTableExp: case %s failed\n", s)
}
}
}
18 changes: 18 additions & 0 deletions canal/sync.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package canal

import (
"regexp"
"time"

"golang.org/x/net/context"
Expand All @@ -11,6 +12,10 @@ import (
"github.com/siddontang/go-mysql/replication"
)

var (
expAlterTable = regexp.MustCompile("(?i)^ALTER\\sTABLE\\s.*?`{0,1}(.*?)`{0,1}\\.{0,1}`{0,1}([^`\\.]+?)`{0,1}\\s.*")
)

func (c *Canal) startSyncBinlog() error {
pos := mysql.Position{c.master.Name, c.master.Position}

Expand Down Expand Up @@ -64,6 +69,19 @@ func (c *Canal) startSyncBinlog() error {
continue
case *replication.XIDEvent:
// try to save the position later
case *replication.QueryEvent:
// handle alert table query
if mb := expAlterTable.FindSubmatch(e.Query); mb != nil {
if len(mb[1]) == 0 {
mb[1] = e.Schema
}
c.ClearTableCache(mb[1], mb[2])
log.Infof("table structure changed, clear table cache: %s.%s\n", mb[1], mb[2])
forceSavePos = true
} else {
// skip others
continue
}
default:
continue
}
Expand Down