Skip to content

Commit 92973d3

Browse files
authored
Merge pull request #3 from instructure/MCE-18274_port_fs_loader
MCE-18274 port fs loader
2 parents 16cd7d0 + 1b646a6 commit 92973d3

File tree

4 files changed

+94
-0
lines changed

4 files changed

+94
-0
lines changed

canal/canal.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"io"
77
"net"
88
"os"
9+
"path"
910
"regexp"
1011
"strconv"
1112
"strings"
@@ -83,6 +84,12 @@ func NewCanal(cfg *Config) (*Canal, error) {
8384
}
8485
c.master = &masterInfo{logger: c.cfg.Logger}
8586

87+
// allow the master info to be loaded from a configured loader
88+
c.master.infoLoader = cfg.InfoLoader
89+
if c.master.infoLoader == nil {
90+
c.master.infoLoader = NewFsInfoLoader(c.masterInfoPath())
91+
}
92+
8693
c.delay = new(uint32)
8794

8895
var err error
@@ -506,6 +513,10 @@ func (c *Canal) connect(options ...func(*client.Conn)) (*client.Conn, error) {
506513
c.cfg.User, c.cfg.Password, "", c.cfg.Dialer, options...)
507514
}
508515

516+
func (c *Canal) masterInfoPath() string {
517+
return path.Join(c.cfg.DataDir, "master.info")
518+
}
519+
509520
// Execute a SQL
510521
func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error) {
511522
c.connLock.Lock()

canal/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ type Config struct {
5757
Charset string `toml:"charset"`
5858
ServerID uint32 `toml:"server_id"`
5959
Flavor string `toml:"flavor"`
60+
DataDir string `toml:"data_dir"`
6061
HeartbeatPeriod time.Duration `toml:"heartbeat_period"`
6162
ReadTimeout time.Duration `toml:"read_timeout"`
6263

@@ -99,6 +100,9 @@ type Config struct {
99100

100101
//Set Localhost
101102
Localhost string
103+
104+
// allow the master info loading to be configured
105+
InfoLoader MasterInfoLoader
102106
}
103107

104108
func NewConfigWithFile(name string) (*Config, error) {

canal/fsloader.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package canal
2+
3+
import (
4+
"bytes"
5+
"os"
6+
7+
"github.com/BurntSushi/toml"
8+
"github.com/instructure/mc-go-mysql/mysql"
9+
"github.com/pingcap/errors"
10+
"github.com/siddontang/go-log/log"
11+
"github.com/siddontang/go/ioutil2"
12+
)
13+
14+
type fsInfoLoader struct {
15+
path string
16+
}
17+
18+
func NewFsInfoLoader(path string) MasterInfoLoader {
19+
return &fsInfoLoader{path: path}
20+
}
21+
22+
func (l *fsInfoLoader) Load(setValues MasterInfoSetter) error {
23+
f, err := os.Open(l.path)
24+
if err != nil && !os.IsNotExist(errors.Cause(err)) {
25+
return errors.Trace(err)
26+
} else if os.IsNotExist(errors.Cause(err)) {
27+
return nil
28+
}
29+
defer f.Close()
30+
31+
var m masterInfo
32+
_, err = toml.DecodeReader(f, &m)
33+
34+
if err != nil {
35+
return err
36+
}
37+
38+
return setValues(m.Addr, m.pos.Name, m.pos.Pos)
39+
}
40+
41+
func (l *fsInfoLoader) Save(addr, name string, position uint32, force bool) error {
42+
var buf bytes.Buffer
43+
e := toml.NewEncoder(&buf)
44+
pos := mysql.Position{Name: name, Pos: position}
45+
46+
m := &masterInfo{
47+
Addr: addr,
48+
pos: pos,
49+
}
50+
51+
enc_err := e.Encode(m)
52+
if enc_err != nil {
53+
log.Errorf("canal save master info to file %s err %v", l.path, enc_err)
54+
}
55+
56+
var err error
57+
if err = ioutil2.WriteFileAtomic(l.path, buf.Bytes(), 0644); err != nil {
58+
log.Errorf("canal save master info to file %s err %v", l.path, err)
59+
}
60+
61+
return errors.Trace(err)
62+
}

canal/master.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,30 @@ import (
1010
type masterInfo struct {
1111
sync.RWMutex
1212

13+
Addr string
14+
1315
pos mysql.Position
1416

1517
gset mysql.GTIDSet
1618

1719
timestamp uint32
1820

1921
logger loggers.Advanced
22+
23+
infoLoader MasterInfoLoader
24+
}
25+
26+
// abstract the way in which the master info is loaded and saved
27+
type MasterInfoSetter func(addr, name string, position uint32) error
28+
type MasterInfoLoader interface {
29+
Load(setValues MasterInfoSetter) error
30+
Save(addr, name string, position uint32, force bool) error
31+
}
32+
33+
func (m *masterInfo) Setter(addr, name string, position uint32) error {
34+
m.Addr = addr
35+
m.pos = mysql.Position{Name: name, Pos: position}
36+
return nil
2037
}
2138

2239
func (m *masterInfo) Update(pos mysql.Position) {

0 commit comments

Comments
 (0)