Skip to content

Commit c24032b

Browse files
Merge pull request julien-duponchelle#355 from dongwook-chan/extra-data
Fix parsing of row events for MySQL8 partitioned table
2 parents 8b67828 + 6e9be94 commit c24032b

File tree

3 files changed

+36
-1
lines changed

3 files changed

+36
-1
lines changed

pymysqlreplication/row_event.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,22 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
5757
self.event_type == BINLOG.DELETE_ROWS_EVENT_V2 or \
5858
self.event_type == BINLOG.UPDATE_ROWS_EVENT_V2:
5959
self.flags, self.extra_data_length = struct.unpack('<HH', self.packet.read(4))
60-
self.extra_data = self.packet.read(self.extra_data_length / 8)
60+
if self.extra_data_length > 2:
61+
self.extra_data_type = struct.unpack('<B', self.packet.read(1))[0]
62+
63+
# ndb information
64+
if self.extra_data_type == 0:
65+
self.nbd_info_length, self.nbd_info_format = struct.unpack('<BB', self.packet.read(1))
66+
self.nbd_info = self.packet.read(self.nbd_info_length - 2)
67+
# partition information
68+
elif self.extra_data_type == 1:
69+
if self.event_type == BINLOG.UPDATE_ROWS_EVENT_V2:
70+
self.partition_id, self.source_partition_id = struct.unpack('<HH', self.packet.read(4))
71+
else:
72+
self.partition_id = struct.unpack('<H', self.packet.read(2))[0]
73+
# etc
74+
else:
75+
self.extra_data = self.packet.read(self.extra_info_length - 3)
6176
else:
6277
self.flags = struct.unpack('<H', self.packet.read(2))[0]
6378

pymysqlreplication/tests/base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ def isMySQL57(self):
6060
version = float(self.getMySQLVersion().rsplit('.', 1)[0])
6161
return version == 5.7
6262

63+
def isMySQL80AndMore(self):
64+
version = float(self.getMySQLVersion().rsplit('.', 1)[0])
65+
return version >= 8.0
66+
6367
@property
6468
def supportsGTID(self):
6569
if not self.isMySQL56AndMore():

pymysqlreplication/tests/test_data_type.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,5 +625,21 @@ def test_zerofill(self):
625625
self.assertEqual(event.rows[0]["values"]["test4"], '0000000001')
626626
self.assertEqual(event.rows[0]["values"]["test5"], '00000000000000000001')
627627

628+
def test_partition_id(self):
629+
if not self.isMySQL80AndMore():
630+
self.skipTest("Not supported in this version of MySQL")
631+
create_query = "CREATE TABLE test (id INTEGER) \
632+
PARTITION BY RANGE (id) ( \
633+
PARTITION p0 VALUES LESS THAN (1), \
634+
PARTITION p1 VALUES LESS THAN (2), \
635+
PARTITION p2 VALUES LESS THAN (3), \
636+
PARTITION p3 VALUES LESS THAN (4), \
637+
PARTITION p4 VALUES LESS THAN (5) \
638+
)"
639+
insert_query = "INSERT INTO test (id) VALUES(3)"
640+
event = self.create_and_insert_value(create_query, insert_query)
641+
self.assertEqual(event.extra_data_type, 1)
642+
self.assertEqual(event.partition_id, 3)
643+
628644
if __name__ == "__main__":
629645
unittest.main()

0 commit comments

Comments
 (0)