Skip to content

Commit cb167c0

Browse files
IANTHEREALsiddontang
authored andcommitted
*: support multiple sources replication for mariadb (go-mysql-org#272)
1 parent ef96922 commit cb167c0

File tree

4 files changed

+351
-41
lines changed

4 files changed

+351
-41
lines changed

canal/sync.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,11 @@ func (c *Canal) runSyncBinlog() error {
9898
}
9999
case *replication.MariadbGTIDEvent:
100100
// try to save the GTID later
101-
gtid := &e.GTID
101+
gtid, err := mysql.ParseMariadbGTIDSet(e.GTID.String())
102+
if err != nil {
103+
return errors.Trace(err)
104+
}
105+
102106
c.master.UpdateGTID(gtid)
103107
if err := c.eventHandler.OnGTID(gtid); err != nil {
104108
return errors.Trace(err)

failover/mariadb_gtid_handler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,12 @@ func (h *MariadbGTIDHandler) FindBestSlaves(slaves []*Server) ([]*Server, error)
4646
if len(str) == 0 {
4747
seq = 0
4848
} else {
49-
g, err := ParseMariadbGTIDSet(str)
49+
g, err := ParseMariadbGTID(str)
5050
if err != nil {
5151
return nil, errors.Trace(err)
5252
}
5353

54-
seq = g.(*MariadbGTID).SequenceNumber
54+
seq = g.SequenceNumber
5555
}
5656

5757
ps[i] = seq

mysql/mariadb_gtid.go

Lines changed: 141 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,24 @@
11
package mysql
22

33
import (
4+
"bytes"
45
"fmt"
56
"strconv"
67
"strings"
78

89
"github.com/juju/errors"
10+
"github.com/siddontang/go/hack"
911
)
1012

13+
// MariadbGTID represent mariadb gtid, [domain ID]-[server-id]-[sequence]
1114
type MariadbGTID struct {
1215
DomainID uint32
1316
ServerID uint32
1417
SequenceNumber uint64
1518
}
1619

17-
// We don't support multi source replication, so the mariadb gtid set may have only domain-server-sequence
18-
func ParseMariadbGTIDSet(str string) (GTIDSet, error) {
20+
// ParseMariadbGTID parses mariadb gtid, [domain ID]-[server-id]-[sequence]
21+
func ParseMariadbGTID(str string) (*MariadbGTID, error) {
1922
if len(str) == 0 {
2023
return &MariadbGTID{0, 0, 0}, nil
2124
}
@@ -57,41 +60,158 @@ func (gtid *MariadbGTID) String() string {
5760
return fmt.Sprintf("%d-%d-%d", gtid.DomainID, gtid.ServerID, gtid.SequenceNumber)
5861
}
5962

60-
func (gtid *MariadbGTID) Encode() []byte {
61-
return []byte(gtid.String())
63+
// Contain return whether one mariadb gtid covers another mariadb gtid
64+
func (gtid *MariadbGTID) Contain(other *MariadbGTID) bool {
65+
return gtid.DomainID == other.DomainID && gtid.SequenceNumber >= other.SequenceNumber
6266
}
6367

64-
func (gtid *MariadbGTID) Equal(o GTIDSet) bool {
65-
other, ok := o.(*MariadbGTID)
66-
if !ok {
67-
return false
68+
// Clone clones a mariadb gtid
69+
func (gtid *MariadbGTID) Clone() *MariadbGTID {
70+
o := new(MariadbGTID)
71+
*o = *gtid
72+
return o
73+
}
74+
75+
func (gtid *MariadbGTID) forward(newer *MariadbGTID) error {
76+
if newer.DomainID != gtid.DomainID {
77+
return errors.Errorf("%s is not same with doamin of %s", newer, gtid)
78+
}
79+
80+
if newer.SequenceNumber <= gtid.SequenceNumber {
81+
return errors.Errorf("out of order binlog appears with gtid %s vs current position gtid %s", newer, gtid)
6882
}
6983

70-
return *gtid == *other
84+
gtid.ServerID = newer.ServerID
85+
gtid.SequenceNumber = newer.SequenceNumber
86+
return nil
7187
}
7288

73-
func (gtid *MariadbGTID) Contain(o GTIDSet) bool {
74-
other, ok := o.(*MariadbGTID)
75-
if !ok {
76-
return false
89+
// MariadbGTIDSet is a set of mariadb gtid
90+
type MariadbGTIDSet struct {
91+
Sets map[uint32]*MariadbGTID
92+
}
93+
94+
// ParseMariadbGTIDSet parses str into mariadb gtid sets
95+
func ParseMariadbGTIDSet(str string) (GTIDSet, error) {
96+
s := new(MariadbGTIDSet)
97+
s.Sets = make(map[uint32]*MariadbGTID)
98+
if str == "" {
99+
return s, nil
77100
}
78101

79-
return gtid.DomainID == other.DomainID && gtid.SequenceNumber >= other.SequenceNumber
102+
sp := strings.Split(str, ",")
103+
104+
//todo, handle redundant same uuid
105+
for i := 0; i < len(sp); i++ {
106+
err := s.Update(sp[i])
107+
if err != nil {
108+
return nil, errors.Trace(err)
109+
}
110+
}
111+
return s, nil
80112
}
113+
// AddSet adds mariadb gtid into mariadb gtid set
114+
func (s *MariadbGTIDSet) AddSet(gtid *MariadbGTID) error {
115+
if gtid == nil {
116+
return nil
117+
}
81118

82-
func (gtid *MariadbGTID) Update(GTIDStr string) error {
83-
newGTID, err := ParseMariadbGTIDSet(GTIDStr)
119+
o, ok := s.Sets[gtid.DomainID]
120+
if ok {
121+
err := o.forward(gtid)
122+
if err != nil {
123+
return errors.Trace(err)
124+
}
125+
} else {
126+
s.Sets[gtid.DomainID] = gtid
127+
}
128+
129+
return nil
130+
}
131+
132+
// Update updates mariadb gtid set
133+
func (s *MariadbGTIDSet) Update(GTIDStr string) error {
134+
gtid, err := ParseMariadbGTID(GTIDStr)
84135
if err != nil {
85136
return err
86137
}
87138

88-
*gtid = *(newGTID.(*MariadbGTID))
139+
err = s.AddSet(gtid)
140+
return errors.Trace(err)
141+
}
142+
89143

90-
return nil
144+
func (s *MariadbGTIDSet) String() string {
145+
return hack.String(s.Encode())
91146
}
92147

93-
func (gtid *MariadbGTID) Clone() GTIDSet {
94-
clone := new(MariadbGTID)
95-
*clone = *gtid
148+
// Encode encodes mariadb gtid set
149+
func (s *MariadbGTIDSet) Encode() []byte {
150+
var buf bytes.Buffer
151+
sep := ""
152+
for _, gtid := range s.Sets {
153+
buf.WriteString(sep)
154+
buf.WriteString(gtid.String())
155+
sep = ","
156+
}
157+
158+
return buf.Bytes()
159+
}
160+
161+
// Clone clones a mariadb gtid set
162+
func (s *MariadbGTIDSet) Clone() GTIDSet {
163+
clone := &MariadbGTIDSet{
164+
Sets: make(map[uint32]*MariadbGTID),
165+
}
166+
for domainID, gtid := range s.Sets {
167+
clone.Sets[domainID] = gtid.Clone()
168+
}
169+
96170
return clone
97171
}
172+
173+
// Equal returns true if two mariadb gtid set is same, otherwise return false
174+
func (s *MariadbGTIDSet) Equal(o GTIDSet) bool {
175+
other, ok := o.(*MariadbGTIDSet)
176+
if !ok {
177+
return false
178+
}
179+
180+
if len(other.Sets) != len(s.Sets) {
181+
return false
182+
}
183+
184+
for domainID, gtid := range other.Sets {
185+
o, ok := s.Sets[domainID]
186+
if !ok {
187+
return false
188+
}
189+
190+
if *gtid != *o {
191+
return false
192+
}
193+
}
194+
195+
return true
196+
}
197+
198+
// Contain return whether one mariadb gtid set covers another mariadb gtid set
199+
func (s *MariadbGTIDSet) Contain(o GTIDSet) bool {
200+
other, ok := o.(*MariadbGTIDSet)
201+
if !ok {
202+
return false
203+
}
204+
205+
for doaminID, gtid := range other.Sets {
206+
o, ok := s.Sets[doaminID]
207+
if !ok {
208+
return false
209+
}
210+
211+
if !o.Contain(gtid) {
212+
return false
213+
}
214+
}
215+
216+
return true
217+
}

0 commit comments

Comments
 (0)