Skip to content
This repository was archived by the owner on Sep 24, 2024. It is now read-only.

Commit 74b682c

Browse files
author
Doug Weber
committed
Add basic column count mismatch to trigger table schema reload.
1 parent 9f867b4 commit 74b682c

File tree

2 files changed

+24
-4
lines changed

2 files changed

+24
-4
lines changed

canal/canal.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,29 @@ func (c *Canal) WaitDumpDone() <-chan struct{} {
193193
return c.dumpDoneCh
194194
}
195195

196+
func (c *Canal) GetTableForEvent(te *replication.TableMapEvent) (*schema.Table, error) {
197+
key := fmt.Sprintf("%s.%s", te.Schema, te.Table)
198+
c.tableLock.Lock()
199+
t, ok := c.tables[key]
200+
c.tableLock.Unlock()
201+
202+
// return if we already have the table info and the columns count matches
203+
if ok && (te == nil || te.ColumnCount == uint64(len(t.Columns))) {
204+
return t, nil
205+
}
206+
207+
t, err := schema.NewTable(c, string(te.Schema), string(te.Table))
208+
if err != nil {
209+
return nil, errors.Trace(err)
210+
}
211+
212+
c.tableLock.Lock()
213+
c.tables[key] = t
214+
c.tableLock.Unlock()
215+
216+
return t, nil
217+
}
218+
196219
func (c *Canal) GetTable(db string, table string) (*schema.Table, error) {
197220
key := fmt.Sprintf("%s.%s", db, table)
198221
c.tableLock.Lock()

canal/sync.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,10 +79,7 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
7979
ev := e.Event.(*replication.RowsEvent)
8080

8181
// Caveat: table may be altered at runtime.
82-
schema := string(ev.Table.Schema)
83-
table := string(ev.Table.Table)
84-
85-
t, err := c.GetTable(schema, table)
82+
t, err := c.GetTableForEvent(ev.Table)
8683
if err != nil {
8784
return errors.Trace(err)
8885
}

0 commit comments

Comments
 (0)