Skip to content

Commit 1787346

Browse files
authored
add canal execution (go-mysql-org#98)
1 parent 08d0afc commit 1787346

File tree

7 files changed

+117
-9
lines changed

7 files changed

+117
-9
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ build:
44
rm -rf vendor && ln -s _vendor/vendor vendor
55
go build -o bin/go-mysqlbinlog cmd/go-mysqlbinlog/main.go
66
go build -o bin/go-mysqldump cmd/go-mysqldump/main.go
7+
go build -o bin/go-canal cmd/go-canal/main.go
78
rm -rf vendor
89

910
test:

canal/canal.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"fmt"
55
"io/ioutil"
66
"os"
7-
"os/exec"
87
"path"
98
"strconv"
109
"strings"
@@ -97,9 +96,10 @@ func (c *Canal) prepareDumper() error {
9796

9897
if c.dumper, err = dump.NewDumper(dumpPath,
9998
c.cfg.Addr, c.cfg.User, c.cfg.Password); err != nil {
100-
if err != exec.ErrNotFound {
101-
return errors.Trace(err)
102-
}
99+
return errors.Trace(err)
100+
}
101+
102+
if c.dumper == nil {
103103
//no mysqldump, use binlog only
104104
return nil
105105
}

canal/rows.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package canal
22

33
import (
4+
"fmt"
5+
46
"github.com/juju/errors"
57
"github.com/siddontang/go-mysql/schema"
68
)
@@ -50,3 +52,8 @@ func GetPKValues(table *schema.Table, row []interface{}) ([]interface{}, error)
5052

5153
return values, nil
5254
}
55+
56+
// String implements fmt.Stringer interface.
57+
func (r *RowsEvent) String() string {
58+
return fmt.Sprintf("%s %s %v", r.Action, r.Table, r.Rows)
59+
}

cmd/go-canal/main.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"os"
7+
"os/signal"
8+
"strings"
9+
"syscall"
10+
11+
"github.com/siddontang/go-mysql/canal"
12+
)
13+
14+
var host = flag.String("host", "127.0.0.1", "MySQL host")
15+
var port = flag.Int("port", 3306, "MySQL port")
16+
var user = flag.String("user", "root", "MySQL user, must have replication privilege")
17+
var password = flag.String("password", "", "MySQL password")
18+
19+
var flavor = flag.String("flavor", "mysql", "Flavor: mysql or mariadb")
20+
21+
var dataDir = flag.String("data-dir", "./var", "Path to store data, like master.info")
22+
23+
var serverID = flag.Int("server-id", 101, "Unique Server ID")
24+
var mysqldump = flag.String("mysqldump", "mysqldump", "mysqldump execution path")
25+
26+
var dbs = flag.String("dbs", "test", "dump databases, seperated by comma")
27+
var tables = flag.String("tables", "", "dump tables, seperated by comma, will overwrite dbs")
28+
var tableDB = flag.String("table_db", "test", "database for dump tables")
29+
var ignoreTables = flag.String("ignore_tables", "", "ignore tables, must be database.table format, separated by comma")
30+
31+
func main() {
32+
cfg := canal.NewDefaultConfig()
33+
cfg.Addr = fmt.Sprintf("%s:%d", *host, *port)
34+
cfg.User = *user
35+
cfg.Password = *password
36+
cfg.Flavor = *flavor
37+
cfg.DataDir = *dataDir
38+
39+
cfg.ServerID = uint32(*serverID)
40+
cfg.Dump.ExecutionPath = *mysqldump
41+
cfg.Dump.DiscardErr = false
42+
43+
c, err := canal.NewCanal(cfg)
44+
if err != nil {
45+
fmt.Printf("create canal err %v", err)
46+
os.Exit(1)
47+
}
48+
49+
if len(*ignoreTables) == 0 {
50+
subs := strings.Split(*ignoreTables, ",")
51+
for _, sub := range subs {
52+
if seps := strings.Split(sub, "."); len(seps) == 2 {
53+
c.AddDumpIgnoreTables(seps[0], seps[1])
54+
}
55+
}
56+
}
57+
58+
if len(*tables) > 0 && len(*tableDB) > 0 {
59+
subs := strings.Split(*tables, ",")
60+
c.AddDumpTables(*tableDB, subs...)
61+
} else if len(*dbs) > 0 {
62+
subs := strings.Split(*dbs, ",")
63+
c.AddDumpDatabases(subs...)
64+
}
65+
66+
c.RegRowsEventHandler(&handler{})
67+
68+
err = c.Start()
69+
if err != nil {
70+
fmt.Printf("start canal err %V", err)
71+
os.Exit(1)
72+
}
73+
74+
sc := make(chan os.Signal, 1)
75+
signal.Notify(sc,
76+
os.Kill,
77+
os.Interrupt,
78+
syscall.SIGHUP,
79+
syscall.SIGINT,
80+
syscall.SIGTERM,
81+
syscall.SIGQUIT)
82+
83+
<-sc
84+
85+
c.Close()
86+
}
87+
88+
type handler struct {
89+
}
90+
91+
func (h *handler) Do(e *canal.RowsEvent) error {
92+
fmt.Printf("%v\n", e)
93+
94+
return nil
95+
}
96+
97+
func (h *handler) String() string {
98+
return "TestHandler"
99+
}

cmd/go-mysqldump/main.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,15 @@ var output = flag.String("o", "", "dump output, empty for stdout")
1919
var dbs = flag.String("dbs", "", "dump databases, seperated by comma")
2020
var tables = flag.String("tables", "", "dump tables, seperated by comma, will overwrite dbs")
2121
var tableDB = flag.String("table_db", "", "database for dump tables")
22-
var ignoreTables = flag.String("ignore_tables", "", "ignore tables, must be database.table format, seperated by comma")
22+
var ignoreTables = flag.String("ignore_tables", "", "ignore tables, must be database.table format, separated by comma")
2323

2424
func main() {
2525
flag.Parse()
2626

2727
d, err := dump.NewDumper(*execution, *addr, *user, *password)
2828
if err != nil {
2929
fmt.Printf("Create Dumper error %v\n", errors.ErrorStack(err))
30-
return
30+
os.Exit(1)
3131
}
3232

3333
if len(*ignoreTables) == 0 {
@@ -53,14 +53,14 @@ func main() {
5353
f, err = os.OpenFile(*output, os.O_CREATE|os.O_WRONLY, 0644)
5454
if err != nil {
5555
fmt.Printf("Open file error %v\n", errors.ErrorStack(err))
56-
return
56+
os.Exit(1)
5757
}
5858
}
5959

6060
defer f.Close()
6161

6262
if err = d.Dump(f); err != nil {
6363
fmt.Printf("Dump MySQL error %v\n", errors.ErrorStack(err))
64-
return
64+
os.Exit(1)
6565
}
6666
}

dump/dump.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type Dumper struct {
3232

3333
func NewDumper(executionPath string, addr string, user string, password string) (*Dumper, error) {
3434
if len(executionPath) == 0 {
35-
executionPath = "mysqldump"
35+
return nil, nil
3636
}
3737

3838
path, err := exec.LookPath(executionPath)

dump/dump_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func (s *schemaTestSuite) SetUpSuite(c *C) {
3636

3737
s.d, err = NewDumper(*execution, fmt.Sprintf("%s:%d", *host, *port), "root", "")
3838
c.Assert(err, IsNil)
39+
c.Assert(s.d, NotNil)
3940

4041
s.d.SetErrOut(os.Stderr)
4142

0 commit comments

Comments
 (0)