Skip to content
Prev Previous commit
Next Next commit
fix: improved points in implementation, removed dangerous places that…
… could lead to errors feat: comments explaining options, ReplicationHandler in tests
  • Loading branch information
Fizic committed Jan 10, 2023
commit 46a38cfb3a08ea8dc2e6de663670922c8d8c378e
19 changes: 15 additions & 4 deletions replication/binlogstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
)

var (
ErrNeedSyncAgain = errors.New("Last sync error or closed, try sync and get event again")
ErrSyncClosed = errors.New("Sync was closed")
ErrNeedSyncAgain = errors.New("Last sync error or closed, try sync and get event again")
ErrSyncClosed = errors.New("Sync was closed")
ErrStreamerIsFull = errors.New("streamer is full")
)

// BinlogStreamer gets the streaming event.
Expand Down Expand Up @@ -93,15 +94,25 @@ func NewBinlogStreamer() *BinlogStreamer {
return s
}

// AddEventToStreamer adds a binlog event to the streamer. You can use it when you want to add an event to the streamer manually.
// can be used in replication handlers
func (s *BinlogStreamer) AddEventToStreamer(ev *BinlogEvent) error {
select {
case s.ch <- ev:
return nil
case err := <-s.ech:
return err
default:
return ErrStreamerIsFull
}
}

func (s *BinlogStreamer) AddErrorToStreamer(err error) {
s.ech <- err
// AddErrorToStreamer adds an error to the streamer.
func (s *BinlogStreamer) AddErrorToStreamer(err error) bool {
select {
case s.ech <- err:
return true
default:
return false
}
}
42 changes: 21 additions & 21 deletions server/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ type Handler interface {
}

type ReplicationHandler interface {
//handle Replication command
// handle Replication command
HandleRegisterSlave(data []byte) error
HandleBinlogDump(pos *Position, s *replication.BinlogStreamer)
HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet, s *replication.BinlogStreamer)
HandleBinlogDump(pos *Position) (*replication.BinlogStreamer, error)
HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet) (*replication.BinlogStreamer, error)
}

func (c *Conn) HandleCommand() error {
Expand Down Expand Up @@ -141,36 +141,34 @@ func (c *Conn) dispatch(data []byte) interface{} {
return eofResponse{}
case COM_REGISTER_SLAVE:
if h, ok := c.h.(ReplicationHandler); ok {
if err := h.HandleRegisterSlave(data); err != nil {
return err
}
return nil
return h.HandleRegisterSlave(data)
} else {
return fmt.Errorf("the handler does not support replication protocol, use ReplicationHandler instead")
return c.h.HandleOtherCommand(cmd, data)
}
case COM_BINLOG_DUMP:
if h, ok := c.h.(ReplicationHandler); ok {
pos, _ := parseBinlogDump(data)
s := replication.NewBinlogStreamer()
go h.HandleBinlogDump(pos, s)

return s
if s, err := h.HandleBinlogDump(pos); err != nil {
return s
} else {
return err
}
} else {
return fmt.Errorf("the handler does not support replication protocol, use ReplicationHandler instead")
return c.h.HandleOtherCommand(cmd, data)
}
case COM_BINLOG_DUMP_GTID:
if h, ok := c.h.(ReplicationHandler); ok {
gtidSet, err := parseBinlogDumpGTID(data)
if err != nil {
return err
}

s := replication.NewBinlogStreamer()
go h.HandleBinlogDumpGTID(gtidSet, s)

return s
if s, err := h.HandleBinlogDumpGTID(gtidSet); err != nil {
return s
} else {
return err
}
} else {
return fmt.Errorf("the handler does not support replication protocol, use ReplicationHandler instead")
return c.h.HandleOtherCommand(cmd, data)
}
default:
return c.h.HandleOtherCommand(cmd, data)
Expand Down Expand Up @@ -209,10 +207,12 @@ func (h EmptyReplicationHandler) HandleRegisterSlave(data []byte) error {
return fmt.Errorf("not supported now")
}

func (h EmptyReplicationHandler) HandleBinlogDump(pos *Position, r *replication.BinlogStreamer) {
func (h EmptyReplicationHandler) HandleBinlogDump(pos *Position) (*replication.BinlogStreamer, error) {
return nil, fmt.Errorf("not supported now")
}

func (h EmptyReplicationHandler) HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet, r *replication.BinlogStreamer) {
func (h EmptyReplicationHandler) HandleBinlogDumpGTID(gtidSet *MysqlGTIDSet) (*replication.BinlogStreamer, error) {
return nil, fmt.Errorf("not supported now")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest adding r.close() here, just to gracefully release resources.


func (h EmptyHandler) HandleOtherCommand(cmd byte, data []byte) error {
Expand Down
1 change: 1 addition & 0 deletions server/command_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ package server

// Ensure EmptyHandler implements Handler interface or cause compile time error
var _ Handler = EmptyHandler{}
var _ ReplicationHandler = EmptyReplicationHandler{}
14 changes: 5 additions & 9 deletions server/resp.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package server
import (
"context"
"fmt"
"time"

. "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
)
Expand Down Expand Up @@ -200,16 +198,14 @@ func (c *Conn) writeFieldValues(fv []FieldValue) error {
return c.WritePacket(data)
}

// see: https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication.html
func (c *Conn) writeBinlogEvents(s *replication.BinlogStreamer) error {
for {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ev, err := s.GetEvent(ctx)
cancel()

if err == context.DeadlineExceeded {
continue
ev, err := s.GetEvent(context.Background())
if err != nil {
return err
}
data := make([]byte, 4, 32)
data := make([]byte, 4, 4+len(ev.RawData))
data = append(data, OK_HEADER)

data = append(data, ev.RawData...)
Expand Down