Skip to content

Commit 3f9fa3b

Browse files
committed
Merge pull request go-mysql-org#11 from jasonzzz/master
add support for BeginLoadQueryEvent and ExecuteLoadQueryEvent
2 parents 04766b6 + d7558ae commit 3f9fa3b

File tree

2 files changed

+84
-1
lines changed

2 files changed

+84
-1
lines changed

replication/event.go

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ func (e *QueryEvent) Decode(data []byte) error {
268268
}
269269

270270
func (e *QueryEvent) Dump(w io.Writer) {
271-
fmt.Fprintf(w, "Salve proxy ID: %d\n", e.SlaveProxyID)
271+
fmt.Fprintf(w, "Slave proxy ID: %d\n", e.SlaveProxyID)
272272
fmt.Fprintf(w, "Execution time: %d\n", e.ExecutionTime)
273273
fmt.Fprintf(w, "Error code: %d\n", e.ErrorCode)
274274
//fmt.Fprintf(w, "Status vars: \n%s", hex.Dump(e.StatusVars))
@@ -299,6 +299,85 @@ func (e *GTIDEvent) Dump(w io.Writer) {
299299
fmt.Fprintln(w)
300300
}
301301

302+
type BeginLoadQueryEvent struct {
303+
FileID uint32
304+
BlockData []byte
305+
}
306+
307+
func (e *BeginLoadQueryEvent) Decode(data []byte) error {
308+
pos := 0
309+
310+
e.FileID = binary.LittleEndian.Uint32(data[pos:])
311+
pos += 4
312+
313+
e.BlockData = data[pos:]
314+
315+
return nil
316+
}
317+
318+
func (e *BeginLoadQueryEvent) Dump(w io.Writer) {
319+
fmt.Fprintf(w, "File ID: %d\n", e.FileID)
320+
fmt.Fprintf(w, "Block data: %s\n", e.BlockData)
321+
fmt.Fprintln(w)
322+
}
323+
324+
type ExecuteLoadQueryEvent struct {
325+
SlaveProxyID uint32
326+
ExecutionTime uint32
327+
SchemaLength uint8
328+
ErrorCode uint16
329+
StatusVars uint16
330+
FileID uint32
331+
StartPos uint32
332+
EndPos uint32
333+
DupHandlingFlags uint8
334+
}
335+
336+
func (e *ExecuteLoadQueryEvent) Decode(data []byte) error {
337+
pos := 0
338+
339+
e.SlaveProxyID = binary.LittleEndian.Uint32(data[pos:])
340+
pos += 4
341+
342+
e.ExecutionTime = binary.LittleEndian.Uint32(data[pos:])
343+
pos += 4
344+
345+
e.SchemaLength = uint8(data[pos])
346+
pos++
347+
348+
e.ErrorCode = binary.LittleEndian.Uint16(data[pos:])
349+
pos += 2
350+
351+
e.StatusVars = binary.LittleEndian.Uint16(data[pos:])
352+
pos += 2
353+
354+
e.FileID = binary.LittleEndian.Uint32(data[pos:])
355+
pos += 4
356+
357+
e.StartPos = binary.LittleEndian.Uint32(data[pos:])
358+
pos += 4
359+
360+
e.EndPos = binary.LittleEndian.Uint32(data[pos:])
361+
pos += 4
362+
363+
e.DupHandlingFlags = uint8(data[pos])
364+
365+
return nil
366+
}
367+
368+
func (e *ExecuteLoadQueryEvent) Dump(w io.Writer) {
369+
fmt.Fprintf(w, "Slave proxy ID: %d\n", e.SlaveProxyID)
370+
fmt.Fprintf(w, "Execution time: %d\n", e.ExecutionTime)
371+
fmt.Fprintf(w, "Schame length: %d\n", e.SchemaLength)
372+
fmt.Fprintf(w, "Error code: %d\n", e.ErrorCode)
373+
fmt.Fprintf(w, "Status vars length: %d\n", e.StatusVars)
374+
fmt.Fprintf(w, "File ID: %d\n", e.FileID)
375+
fmt.Fprintf(w, "Start pos: %d\n", e.StartPos)
376+
fmt.Fprintf(w, "End pos: %d\n", e.EndPos)
377+
fmt.Fprintf(w, "Dup handling flags: %d\n", e.DupHandlingFlags)
378+
fmt.Fprintln(w)
379+
}
380+
302381
// case MARIADB_ANNOTATE_ROWS_EVENT:
303382
// return "MariadbAnnotateRowsEvent"
304383

replication/parser.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,10 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte) (Event, error) {
163163
e = &RowsQueryEvent{}
164164
case GTID_EVENT:
165165
e = &GTIDEvent{}
166+
case BEGIN_LOAD_QUERY_EVENT:
167+
e = &BeginLoadQueryEvent{}
168+
case EXECUTE_LOAD_QUERY_EVENT:
169+
e = &ExecuteLoadQueryEvent{}
166170
case MARIADB_ANNOTATE_ROWS_EVENT:
167171
e = &MariadbAnnotaeRowsEvent{}
168172
case MARIADB_BINLOG_CHECKPOINT_EVENT:

0 commit comments

Comments
 (0)