Skip to content

Commit 8804177

Browse files
authored
Merge pull request #1 from instructure/changes-from-masteryconnect-go-mysql-fork
Still-relevant changes from MasteryConnect/go-mysql fork
2 parents 29bc749 + c5fc32f commit 8804177

File tree

3 files changed

+30
-4
lines changed

3 files changed

+30
-4
lines changed

canal/canal.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,29 @@ func (c *Canal) checkTableMatch(key string) bool {
307307
return matchFlag
308308
}
309309

310+
func (c *Canal) GetTableForEvent(te *replication.TableMapEvent) (*schema.Table, error) {
311+
key := fmt.Sprintf("%s.%s", te.Schema, te.Table)
312+
c.tableLock.Lock()
313+
t, ok := c.tables[key]
314+
c.tableLock.Unlock()
315+
316+
// return if we already have the table info and the columns count matches
317+
if ok && (te == nil || te.ColumnCount == uint64(len(t.Columns))) {
318+
return t, nil
319+
}
320+
321+
t, err := schema.NewTable(c, string(te.Schema), string(te.Table))
322+
if err != nil {
323+
return nil, errors.Trace(err)
324+
}
325+
326+
c.tableLock.Lock()
327+
c.tables[key] = t
328+
c.tableLock.Unlock()
329+
330+
return t, nil
331+
}
332+
310333
func (c *Canal) GetTable(db string, table string) (*schema.Table, error) {
311334
key := fmt.Sprintf("%s.%s", db, table)
312335
// if table is excluded, return error and skip parsing event or dump

canal/sync.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -244,10 +244,7 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
244244
ev := e.Event.(*replication.RowsEvent)
245245

246246
// Caveat: table may be altered at runtime.
247-
schema := string(ev.Table.Schema)
248-
table := string(ev.Table.Table)
249-
250-
t, err := c.GetTable(schema, table)
247+
t, err := c.GetTableForEvent(ev.Table)
251248
if err != nil {
252249
return err
253250
}

schema/schema.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ const (
3636
TYPE_MEDIUM_INT
3737
TYPE_BINARY // binary, varbinary
3838
TYPE_POINT // coordinates
39+
TYPE_TEXT // text (base64 encoded)
40+
TYPE_BLOB // blob (base64 encoded)
3941
)
4042

4143
type TableColumn struct {
@@ -123,6 +125,10 @@ func (ta *Table) AddColumn(name string, columnType string, collation string, ext
123125
ta.Columns[index].Type = TYPE_BIT
124126
} else if strings.HasPrefix(columnType, "json") {
125127
ta.Columns[index].Type = TYPE_JSON
128+
} else if strings.HasPrefix(columnType, "text") {
129+
ta.Columns[index].Type = TYPE_TEXT
130+
} else if strings.HasPrefix(columnType, "blob") {
131+
ta.Columns[index].Type = TYPE_BLOB
126132
} else if strings.Contains(columnType, "point") {
127133
ta.Columns[index].Type = TYPE_POINT
128134
} else if strings.Contains(columnType, "mediumint") {

0 commit comments

Comments
 (0)