Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
cbade51
add function of get term field's value by field name
Apr 8, 2017
1a3b997
change field to column
WangXiangUSTC Apr 9, 2017
06a1322
Update rows.go
WangXiangUSTC Apr 9, 2017
bbed0d6
Merge remote-tracking branch 'remotesrc/master'
Apr 19, 2017
525eb21
add function to set dump charset
Apr 19, 2017
fddc322
add set of charcter
Apr 26, 2017
10f2f26
add config of mysql charset
Apr 30, 2017
8f5f42f
Update canal.go
WangXiangUSTC Apr 30, 2017
568899f
Update config.go
WangXiangUSTC Apr 30, 2017
1c2b0a6
Update config.go
WangXiangUSTC May 1, 2017
d41eb60
remove comment
WangXiangUSTC May 4, 2017
c342f9c
Update dump_test.go
WangXiangUSTC May 4, 2017
acf0521
Update config.go
WangXiangUSTC May 4, 2017
1652936
remove unused code
WangXiangUSTC May 4, 2017
c7792f5
update go-mysql version
WangXiangUSTC May 5, 2017
f444682
merge from remote master
Jul 22, 2017
09168b9
fix a bug
WangXiangUSTC Jul 22, 2017
d90ee86
if use gtid sync, use gtid when retry
Jul 22, 2017
6582b7c
modify code
Jul 22, 2017
1e4c520
fix bug
Jul 23, 2017
011e9d0
fix bug
Jul 23, 2017
7a894fe
Update binlogsyncer.go
WangXiangUSTC Jul 23, 2017
489a5e9
Update config.go
WangXiangUSTC Jul 23, 2017
b646916
Update config.go
WangXiangUSTC Jul 23, 2017
ab8fee0
Update dump.go
WangXiangUSTC Jul 23, 2017
e5b7164
Update glide.lock
WangXiangUSTC Jul 23, 2017
89d8714
Update mariadb_gtid.go
WangXiangUSTC Jul 24, 2017
9d183be
change if else to switch
Jul 24, 2017
55f10a0
change updategtidset to update
Jul 27, 2017
eb4c160
add gtid set information in xid event
Jul 30, 2017
bddcc7f
merge remote src
Jul 30, 2017
ddd4ee6
Update binlogsyncer.go
WangXiangUSTC Jul 31, 2017
b3f2817
Update event.go
WangXiangUSTC Jul 31, 2017
b72a2bc
Merge remote-tracking branch 'remote_src/master'
Aug 1, 2017
c5d3464
add gtid set information in query event
Aug 1, 2017
5c0846d
add gtid set information in query event
Aug 1, 2017
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix bug
  • Loading branch information
WangXiangUSTC committed Jul 23, 2017
commit 1e4c52064f8a658b68a8ea6bdf3635c4d916b5e7
2 changes: 2 additions & 0 deletions mysql/gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ type GTIDSet interface {
Equal(o GTIDSet) bool

Contain(o GTIDSet) bool

UpdateGTIDSet(GTIDStr string) error
}

func ParseGTIDSet(flavor string, s string) (GTIDSet, error) {
Expand Down
11 changes: 11 additions & 0 deletions mysql/mariadb_gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,14 @@ func (gtid MariadbGTID) Contain(o GTIDSet) bool {

return gtid.DomainID == other.DomainID && gtid.SequenceNumber >= other.SequenceNumber
}

func (gtid MariadbGTID) UpdateGTIDSet(GTIDStr string) error {
newGTID, err := ParseMariadbGTIDSet(GTIDStr)
if err != nil {
return err
} else {
gtid = newGTID
}

return nil
}
11 changes: 11 additions & 0 deletions mysql/mysql_gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,17 @@ func (s *MysqlGTIDSet) AddSet(set *UUIDSet) {
}
}

func (s *MysqlGTIDSet) UpdateGTIDSet(GTIDStr string) error {
uuidSet, err := ParseUUIDSet(GTIDStr)
if err != nil {
return err
}

s.AddSet(uuidSet)

return nil
}

func (s *MysqlGTIDSet) Contain(o GTIDSet) bool {
sub, ok := o.(*MysqlGTIDSet)
if !ok {
Expand Down
37 changes: 21 additions & 16 deletions replication/binlogsyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ type BinlogSyncer struct {
nextPos Position

useGTID bool
nextGTIDStr string

gset *GTIDSet

running bool

Expand Down Expand Up @@ -483,16 +484,16 @@ func (b *BinlogSyncer) retrySync() error {
b.m.Lock()
defer b.m.Unlock()

log.Infof("begin to re-sync from %s", b.nextPos)

b.parser.Reset()

if b.useGTID {
if err := b.prepareSyncGTID(b.nextGTIDStr); err != nil {
log.Infof("begin to re-sync from %s", b.gset.String())
if err := b.prepareSyncGTID(b.gset); err != nil {
return errors.Trace(err)
}

} else {
log.Infof("begin to re-sync from %s", b.nextPos)
if err := b.prepareSyncPos(b.nextPos); err != nil {
return errors.Trace(err)
}
Expand All @@ -518,16 +519,11 @@ func (b *BinlogSyncer) prepareSyncPos(pos Position) error {
return nil
}

func (b *BinlogSyncer) prepareSyncGTID(GTIDStr string) error {
func (b *BinlogSyncer) prepareSyncGTID(gset GTIDSet) error {
if err := b.prepare(); err != nil {
return errors.Trace(err)
}

gset, err := ParseGTIDSet(b.cfg.Flavor, GTIDStr)
if err != nil {
return err
}

if b.cfg.Flavor != MariaDBFlavor {
// default use MySQL
err = b.writeBinlogDumpMysqlGTIDCommand(gset)
Expand All @@ -538,7 +534,7 @@ func (b *BinlogSyncer) prepareSyncGTID(GTIDStr string) error {
if err != nil {
return err
}
return nil
return nil
}

func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
Expand All @@ -556,7 +552,7 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) {

// we meet connection error, should re-connect again with
// last nextPos or nextGTID we got.
if len(b.nextPos.Name) == 0 && len(b.nextGTIDStr) == 0 {
if len(b.nextPos.Name) == 0 && b.gset == nil {
// we can't get the correct position, close.
s.closeWithError(err)
return
Expand Down Expand Up @@ -631,14 +627,23 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
b.nextPos.Name = string(re.NextLogName)
b.nextPos.Pos = uint32(re.Position)
log.Infof("rotate to %s", b.nextPos)
} else if ge, ok := e.Event.(*GTIDEvent); b.useGTID && ok {
} else if ge, ok := e.Event.(*GTIDEvent); b.useGTID && ok {
u, _ := uuid.FromBytes(ge.SID)
SID := u.String()
GNO := ge.GNO
b.nextGTIDStr = fmt.Sprintf("%s:%d", SID, GNO)
} else if mge, ok := e.Event.(*MariadbGTIDEvent); b.useGTID && ok {
err := b.gset.UpdateGTIDSet(fmt.Sprintf("%s:%d", SID, GNO))
if err != nil {
return errors.Trace(err)
}
log.Infof("update gtidset %s", b.gset.String())
} else if mge, ok := e.Event.(*MariadbGTIDEvent); b.useGTID && ok {
GTID := mge.GTID
b.nextGTIDStr = fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber)
err := b.gset.UpdateGTIDSet(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber)
log.Infof("update gtidset %s", b.gset.String())
if err != nil {
return errors.Trace(err)
}
log.Infof("update gtidset %s", b.gset.String())
}
needStop := false
select {
Expand Down