- Notifications
You must be signed in to change notification settings - Fork 689
Open
Description
import base64
from datetime import date, datetime
import json
import traceback
import pymysql
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
TableMapEvent
)
def default(obj):
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, date):
return obj.isoformat()
elif isinstance(obj, bytes):
return base64.b64encode(obj).decode('ascii')
raise TypeError(f"Object of type {obj.class.name} is not JSON serializable")
class BinlogListener:
def init(self, mysql_settings):
self.mysql_settings = mysql_settings
self.stream = None
self.table_map = {}
def start_stream(self): events = [DeleteRowsEvent, UpdateRowsEvent, WriteRowsEvent] self.stream = BinLogStreamReader( connection_settings=self.mysql_settings, server_id=101, only_events=events, resume_stream=True, blocking=True, ) def process_events(self): if self.stream is None: self.start_stream() for binlogevent in self.stream: try: binlogevent.dump() for row in binlogevent.rows: event = {"schema": binlogevent.schema, "table": binlogevent.table} print(event) if isinstance(binlogevent, DeleteRowsEvent): event["action"] = "delete" event["data"] = row["values"] elif isinstance(binlogevent, UpdateRowsEvent): event["action"] = "update" event["data"] = row["after_values"] elif isinstance(binlogevent, WriteRowsEvent): event["action"] = "insert" event["data"] = row["values"] print(json.dumps(event, default=default)) except Exception as e: traceback.print_exc() def stop_stream(self): if self.stream is not None: self.stream.close() self.stream = None
laminko
Metadata
Metadata
Assignees
Labels
No labels