Skip to content

I can't retrieve the updated or written field names; all I get are things like UNKNOWN_COL0. #612

@xiaoyue9527

Description

@xiaoyue9527

import base64
from datetime import date, datetime
import json
import traceback
import pymysql
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
TableMapEvent
)

def default(obj):
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, date):
return obj.isoformat()
elif isinstance(obj, bytes):
return base64.b64encode(obj).decode('ascii')
raise TypeError(f"Object of type {obj.class.name} is not JSON serializable")

class BinlogListener:
def init(self, mysql_settings):
self.mysql_settings = mysql_settings
self.stream = None
self.table_map = {}

def start_stream(self): events = [DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent] self.stream = BinLogStreamReader( connection_settings=self.mysql_settings, server_id=101, only_events=events, resume_stream=True, blocking=True, ) def process_events(self): if self.stream is None: self.start_stream() for binlogevent in self.stream: try: binlogevent.dump() for row in binlogevent.rows: event = {"schema": binlogevent.schema, "table": binlogevent.table} print(event) if isinstance(binlogevent, DeleteRowsEvent): event["action"] = "delete" event["data"] = row["values"] elif isinstance(binlogevent, UpdateRowsEvent): event["action"] = "update" event["data"] = row["after_values"] elif isinstance(binlogevent, WriteRowsEvent): event["action"] = "insert" event["data"] = row["values"] print(json.dumps(event, default=default)) except Exception as e: traceback.print_exc() def stop_stream(self): if self.stream is not None: self.stream.close() self.stream = None 

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions