11# -*- coding: utf-8 -*-
22
33import pymysql
4- import pymysql .cursors
54import struct
65
76from pymysql .constants .COMMAND import COM_BINLOG_DUMP
7+ from pymysql .cursors import DictCursor
88from pymysql .util import int2byte
99
1010from .packet import BinLogPacketWrapper
1111from .constants .BINLOG import TABLE_MAP_EVENT , ROTATE_EVENT
1212from .gtid import GtidSet
13- from .event import QueryEvent , RotateEvent , FormatDescriptionEvent , XidEvent , GtidEvent , NotImplementedEvent
14- from .row_event import UpdateRowsEvent , WriteRowsEvent , DeleteRowsEvent , TableMapEvent
13+ from .event import (
14+ QueryEvent , RotateEvent , FormatDescriptionEvent ,
15+ XidEvent , GtidEvent , NotImplementedEvent )
16+ from .row_event import (
17+ UpdateRowsEvent , WriteRowsEvent , DeleteRowsEvent , TableMapEvent )
1518
1619try :
1720 from pymysql .constants .COMMAND import COM_BINLOG_DUMP_GTID
2023 # See: https://github.com/PyMySQL/PyMySQL/pull/261
2124 COM_BINLOG_DUMP_GTID = 0x1e
2225
23- MYSQL_EXPECTED_ERROR_CODES = [2013 , 2006 ] #2013 Connection Lost
24- #2006 MySQL server has gone away
26+ # 2013 Connection Lost
27+ # 2006 MySQL server has gone away
28+ MYSQL_EXPECTED_ERROR_CODES = [2013 , 2006 ]
29+
2530
2631class BinLogStreamReader (object ):
2732 """Connect to replication stream and read event
@@ -31,21 +36,21 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
3136 blocking = False , only_events = None , log_file = None , log_pos = None ,
3237 filter_non_implemented_events = True ,
3338 ignored_events = None , auto_position = None ,
34- only_tables = None , only_schemas = None ,
35- freeze_schema = False ):
39+ only_tables = None , only_schemas = None ,
40+ freeze_schema = False ):
3641 """
3742 Attributes:
3843 resume_stream: Start for event from position or the latest event of
3944 binlog or from older available event
4045 blocking: Read on stream is blocking
4146 only_events: Array of allowed events
42- ignored_events: Array of ignoreded events
47+ ignored_events: Array of ignored events
4348 log_file: Set replication start log file
4449 log_pos: Set replication start log pos
4550 auto_position: Use master_auto_position gtid to set position
4651 only_tables: An array with the tables you want to watch
4752 only_schemas: An array with the schemas you want to watch
48- freeze_schema: If true do not support ALTER TABLE. It's faster. (default False)
53+ freeze_schema: If true do not support ALTER TABLE. It's faster.
4954 """
5055 self .__connection_settings = connection_settings
5156 self .__connection_settings ["charset" ] = "utf8"
@@ -58,16 +63,18 @@ def __init__(self, connection_settings, server_id, resume_stream=False,
5863 self .__only_tables = only_tables
5964 self .__only_schemas = only_schemas
6065 self .__freeze_schema = freeze_schema
61- self .__allowed_events = self ._allowed_event_list (only_events , ignored_events , filter_non_implemented_events )
66+ self .__allowed_events = self ._allowed_event_list (
67+ only_events , ignored_events , filter_non_implemented_events )
6268
63- # We can't filter on packet level TABLE_MAP and rotate event because we need
64- # them for handling other operations
65- self .__allowed_events_in_packet = frozenset ([TableMapEvent , RotateEvent ]).union (self .__allowed_events )
69+ # We can't filter on packet level TABLE_MAP and rotate event because
70+ # we need them for handling other operations
71+ self .__allowed_events_in_packet = frozenset (
72+ [TableMapEvent , RotateEvent ]).union (self .__allowed_events )
6673
6774 self .__server_id = server_id
6875 self .__use_checksum = False
6976
70- #Store table meta information
77+ # Store table meta information
7178 self .table_map = {}
7279 self .log_pos = log_pos
7380 self .log_file = log_file
@@ -84,14 +91,13 @@ def close(self):
8491 def __connect_to_ctl (self ):
8592 self ._ctl_connection_settings = dict (self .__connection_settings )
8693 self ._ctl_connection_settings ["db" ] = "information_schema"
87- self ._ctl_connection_settings ["cursorclass" ] = \
88- pymysql .cursors .DictCursor
94+ self ._ctl_connection_settings ["cursorclass" ] = DictCursor
8995 self ._ctl_connection = pymysql .connect (** self ._ctl_connection_settings )
9096 self ._ctl_connection ._get_table_information = self .__get_table_information
9197 self .__connected_ctl = True
9298
9399 def __checksum_enabled (self ):
94- ''' Return True if binlog-checksum = CRC32. Only for MySQL > 5.6 '''
100+ """ Return True if binlog-checksum = CRC32. Only for MySQL > 5.6"""
95101 cur = self ._stream_connection .cursor ()
96102 cur .execute ("SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'" )
97103 result = cur .fetchone ()
@@ -113,7 +119,8 @@ def __connect_to_stream(self):
113119
114120 self .__use_checksum = self .__checksum_enabled ()
115121
116- #If cheksum is enabled we need to inform the server about the that we support it
122+ # If checksum is enabled we need to inform the server about the that
123+ # we support it
117124 if self .__use_checksum :
118125 cur = self ._stream_connection .cursor ()
119126 cur .execute ("set @master_binlog_checksum= @@global.binlog_checksum" )
@@ -158,8 +165,9 @@ def __connect_to_stream(self):
158165 # Zeroified
159166 # binlog position uint 4bytes == 4
160167 # payload_size uint 4bytes
161- ## What come next, is the payload, where the slave gtid_executed
162- ## is sent to the master
168+
169+ # What come next, is the payload, where the slave gtid_executed
170+ # is sent to the master
163171 # n_sid ulong 8bytes == which size is the gtid_set
164172 # | sid uuid 16bytes UUID as a binary
165173 # | n_intervals ulong 8bytes == how many intervals are sent for this gtid
@@ -179,17 +187,16 @@ def __connect_to_stream(self):
179187 gtid_set = GtidSet (self .auto_position )
180188 encoded_data_size = gtid_set .encoded_length
181189
182- header_size = (2 + # binlog_flags
183- 4 + # server_id
184- 4 + # binlog_name_info_size
185- 4 + # empty binlog name
186- 8 + # binlog_pos_info_size
187- 4 ) # encoded_data_size
190+ header_size = (2 + # binlog_flags
191+ 4 + # server_id
192+ 4 + # binlog_name_info_size
193+ 4 + # empty binlog name
194+ 8 + # binlog_pos_info_size
195+ 4 ) # encoded_data_size
188196
189197 prelude = b'' + struct .pack ('<i' , header_size + encoded_data_size ) \
190198 + int2byte (COM_BINLOG_DUMP_GTID )
191199
192-
193200 # binlog_flags = 0 (2 bytes)
194201 prelude += struct .pack ('<H' , 0 )
195202 # server_id (4 bytes)
@@ -206,7 +213,6 @@ def __connect_to_stream(self):
206213 # encoded_data
207214 prelude += gtid_set .encoded ()
208215
209-
210216 if pymysql .__version__ < "0.6" :
211217 self ._stream_connection .wfile .write (prelude )
212218 self ._stream_connection .wfile .flush ()
@@ -247,7 +253,8 @@ def fetchone(self):
247253 self .__only_schemas ,
248254 self .__freeze_schema )
249255
250- if binlog_event .event_type == TABLE_MAP_EVENT and binlog_event .event is not None :
256+ if binlog_event .event_type == TABLE_MAP_EVENT and \
257+ binlog_event .event is not None :
251258 self .table_map [binlog_event .event .table_id ] = \
252259 binlog_event .event .get_table ()
253260
@@ -274,7 +281,8 @@ def fetchone(self):
274281
275282 return binlog_event .event
276283
277- def _allowed_event_list (self , only_events , ignored_events , filter_non_implemented_events ):
284+ def _allowed_event_list (self , only_events , ignored_events ,
285+ filter_non_implemented_events ):
278286 if only_events is not None :
279287 events = set (only_events )
280288 else :
0 commit comments