Skip to content

Commit b7f430c

Browse files
committed
add base canal framework
1 parent 110e03a commit b7f430c

File tree

7 files changed

+699
-0
lines changed

7 files changed

+699
-0
lines changed

canal/canal.go

Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
package canal
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"os/exec"
7+
"path"
8+
"strconv"
9+
"strings"
10+
"sync"
11+
12+
"github.com/siddontang/go-mysql/client"
13+
"github.com/siddontang/go-mysql/dump"
14+
"github.com/siddontang/go-mysql/mysql"
15+
"github.com/siddontang/go-mysql/replication"
16+
"github.com/siddontang/go-mysql/schema"
17+
"github.com/siddontang/go/log"
18+
"github.com/siddontang/go/sync2"
19+
)
20+
21+
var errCanalClosed = errors.New("canal was closed")
22+
23+
// Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc...
24+
// MySQL must open row format for binlog
25+
type Canal struct {
26+
m sync.Mutex
27+
28+
cfg *Config
29+
30+
master *masterInfo
31+
dumper *dump.Dumper
32+
dumpDoneCh chan struct{}
33+
syncer *replication.BinlogSyncer
34+
35+
rsLock sync.Mutex
36+
rsHandlers []RowsEventHandler
37+
38+
connLock sync.Mutex
39+
conn *client.Conn
40+
41+
wg sync.WaitGroup
42+
43+
tableLock sync.Mutex
44+
tableExecuter *tableExecuter
45+
tables map[string]*schema.Table
46+
47+
quit chan struct{}
48+
closed sync2.AtomicBool
49+
}
50+
51+
func NewCanal(cfg *Config) (*Canal, error) {
52+
c := new(Canal)
53+
c.cfg = cfg
54+
c.closed.Set(false)
55+
c.quit = make(chan struct{})
56+
57+
c.dumpDoneCh = make(chan struct{})
58+
c.rsHandlers = make([]RowsEventHandler, 0, 4)
59+
c.tables = make(map[string]*schema.Table)
60+
c.tableExecuter = &tableExecuter{c}
61+
62+
var err error
63+
if c.master, err = loadMasterInfo(c.masterInfoPath()); err != nil {
64+
return nil, err
65+
} else if len(c.master.Addr) != 0 && c.master.Addr != c.cfg.Addr {
66+
log.Infof("MySQL addr %s in old master.info, but new %s, reset", c.master.Addr, c.cfg.Addr)
67+
// may use another MySQL, reset
68+
c.master = &masterInfo{}
69+
}
70+
71+
c.master.Addr = c.cfg.Addr
72+
73+
if c.dumper, err = dump.NewDumper(c.cfg.Dump.ExecutionPath, c.cfg.Addr, c.cfg.User, c.cfg.Password); err != nil {
74+
if err != exec.ErrNotFound {
75+
return nil, err
76+
}
77+
//no mysqldump, use binlog only
78+
c.dumper = nil
79+
}
80+
81+
if err = c.prepareSyncer(); err != nil {
82+
return nil, err
83+
}
84+
85+
c.wg.Add(1)
86+
go c.run()
87+
88+
return c, nil
89+
}
90+
91+
func (c *Canal) run() error {
92+
defer c.wg.Done()
93+
94+
if err := c.tryDump(); err != nil {
95+
log.Errorf("canal dump mysql err: %v", err)
96+
return err
97+
}
98+
99+
close(c.dumpDoneCh)
100+
101+
if err := c.startSyncBinlog(); err != nil {
102+
log.Errorf("canal start sync binlog err: %v", err)
103+
return err
104+
}
105+
106+
return nil
107+
}
108+
109+
func (c *Canal) isClosed() bool {
110+
return c.closed.Get()
111+
}
112+
113+
func (c *Canal) Close() {
114+
log.Infof("close canal")
115+
116+
c.m.Lock()
117+
defer c.m.Unlock()
118+
119+
if c.isClosed() {
120+
return
121+
}
122+
123+
c.closed.Set(true)
124+
125+
close(c.quit)
126+
127+
c.connLock.Lock()
128+
c.conn.Close()
129+
c.conn = nil
130+
c.connLock.Unlock()
131+
132+
if c.syncer != nil {
133+
c.syncer.Close()
134+
c.syncer = nil
135+
}
136+
137+
c.master.Close()
138+
139+
c.wg.Wait()
140+
}
141+
142+
func (c *Canal) WaitDumpDone() <-chan struct{} {
143+
return c.dumpDoneCh
144+
}
145+
146+
type tableExecuter struct {
147+
c *Canal
148+
}
149+
150+
func (e *tableExecuter) Execute(query string, args ...interface{}) (*mysql.Result, error) {
151+
return e.c.executeSql(query, args...)
152+
}
153+
154+
func (c *Canal) getTable(db string, table string) (*schema.Table, error) {
155+
key := fmt.Sprintf("%s.%s", db, table)
156+
c.tableLock.Lock()
157+
t, ok := c.tables[key]
158+
c.tableLock.Unlock()
159+
160+
if ok {
161+
return t, nil
162+
}
163+
164+
t, err := schema.NewTable(c.tableExecuter, db, table)
165+
if err != nil {
166+
return nil, err
167+
}
168+
169+
c.tableLock.Lock()
170+
c.tables[key] = t
171+
c.tableLock.Unlock()
172+
173+
return t, nil
174+
}
175+
176+
func (c *Canal) checkBinlogFormat() error {
177+
res, err := c.executeSql(`SHOW GLOBAL VARIABLES LIKE "binlog_format";`)
178+
if err != nil {
179+
return err
180+
} else if f, _ := res.GetString(0, 1); f != "ROW" {
181+
return fmt.Errorf("binlog must ROW format, but %s now", f)
182+
}
183+
184+
// need to check MySQL binlog row image? full, minimal or noblob?
185+
// now only log
186+
if c.cfg.Flavor == mysql.MySQLFlavor {
187+
if res, err = c.executeSql(`SHOW GLOBAL VARIABLES LIKE "binlog_row_image"`); err != nil {
188+
return err
189+
}
190+
191+
rowImage, _ := res.GetString(0, 1)
192+
log.Infof("MySQL use binlog row %s image", rowImage)
193+
}
194+
195+
return nil
196+
}
197+
198+
func (c *Canal) prepareSyncer() error {
199+
c.syncer = replication.NewBinlogSyncer(c.cfg.ServerID, c.cfg.Flavor)
200+
201+
seps := strings.Split(c.cfg.Addr, ":")
202+
if len(seps) != 2 {
203+
return fmt.Errorf("invalid mysql addr format %s, must host:port", c.cfg.Addr)
204+
}
205+
206+
port, err := strconv.ParseUint(seps[1], 10, 16)
207+
if err != nil {
208+
return err
209+
}
210+
211+
if err = c.syncer.RegisterSlave(seps[0], uint16(port), c.cfg.User, c.cfg.Password); err != nil {
212+
return err
213+
}
214+
return nil
215+
}
216+
217+
func (c *Canal) masterInfoPath() string {
218+
return path.Join(c.cfg.DataDir, "master.info")
219+
}
220+
221+
func (c *Canal) executeSql(cmd string, args ...interface{}) (rr *mysql.Result, err error) {
222+
c.connLock.Lock()
223+
defer c.connLock.Unlock()
224+
225+
retryNum := 3
226+
for i := 0; i < retryNum; i++ {
227+
if c.conn == nil {
228+
c.conn, err = client.Connect(c.cfg.Addr, c.cfg.User, c.cfg.Password, "")
229+
if err != nil {
230+
return nil, err
231+
}
232+
}
233+
234+
rr, err = c.conn.Execute(cmd, args...)
235+
if err != nil && err != mysql.ErrBadConn {
236+
return
237+
} else if err == mysql.ErrBadConn {
238+
c.conn.Close()
239+
c.conn = nil
240+
continue
241+
} else {
242+
return
243+
}
244+
}
245+
return
246+
}

canal/canal_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package canal
2+
3+
import (
4+
"flag"
5+
"testing"
6+
7+
. "gopkg.in/check.v1"
8+
)
9+
10+
var testHost = flag.String("host", "127.0.0.1", "MySQL host")
11+
12+
func Test(t *testing.T) {
13+
TestingT(t)
14+
}
15+
16+
type canalTestSuite struct {
17+
}
18+
19+
var _ = Suite(&canalTestSuite{})
20+
21+
func (s *canalTestSuite) SetUpSuite(c *C) {
22+
23+
}
24+
25+
func (s *canalTestSuite) TearDownSuite(c *C) {
26+
27+
}

canal/config.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package canal
2+
3+
import (
4+
"io/ioutil"
5+
"math/rand"
6+
"time"
7+
8+
"github.com/BurntSushi/toml"
9+
)
10+
11+
type DumpConfig struct {
12+
// mysqldump execution path, like mysqldump or /usr/bin/mysqldump, etc...
13+
ExecutionPath string `toml:"mysqldump"`
14+
15+
// Will override Databases, tables is in database table_db
16+
Tables []string `toml:"tables"`
17+
TableDB string `toml:"table_db"`
18+
19+
Databases []string `toml:"dbs"`
20+
21+
// Ignore table format is db.table
22+
IgnoreTables []string `toml:"ignore_tables"`
23+
}
24+
25+
type Config struct {
26+
Addr string `toml:"addr"`
27+
User string `toml:"user"`
28+
Password string `toml:"password"`
29+
30+
ServerID uint32 `toml:"server_id"`
31+
Flavor string `toml:"flavor"`
32+
DataDir string `toml:"data_dir"`
33+
34+
Dump DumpConfig `toml:"dump"`
35+
}
36+
37+
func NewConfigWithFile(name string) (*Config, error) {
38+
data, err := ioutil.ReadFile(name)
39+
if err != nil {
40+
return nil, err
41+
}
42+
43+
return NewConfig(string(data))
44+
}
45+
46+
func NewConfig(data string) (*Config, error) {
47+
var c Config
48+
49+
_, err := toml.Decode(data, &c)
50+
if err != nil {
51+
return nil, err
52+
}
53+
54+
return &c, nil
55+
}
56+
57+
func NewDefaultConfig() *Config {
58+
c := new(Config)
59+
60+
c.Addr = "127.0.0.1:3306"
61+
c.User = "root"
62+
c.Password = ""
63+
64+
rand.Seed(time.Now().Unix())
65+
c.ServerID = uint32(rand.Intn(1000)) + 1001
66+
67+
c.Flavor = "mysql"
68+
69+
c.DataDir = "./var"
70+
c.Dump.ExecutionPath = "mysqldump"
71+
72+
return c
73+
}

0 commit comments

Comments
 (0)