|
14 | 14 | QueryEvent, RotateEvent, FormatDescriptionEvent, |
15 | 15 | XidEvent, GtidEvent, StopEvent, |
16 | 16 | BeginLoadQueryEvent, ExecuteLoadQueryEvent, |
17 | | - HeartbeatLogEvent, NotImplementedEvent) |
| 17 | + HeartbeatLogEvent, NotImplementedEvent, MariadbGtidEvent) |
18 | 18 | from .exceptions import BinLogNotEnabled |
19 | 19 | from .row_event import ( |
20 | 20 | UpdateRowsEvent, WriteRowsEvent, DeleteRowsEvent, TableMapEvent) |
@@ -139,7 +139,8 @@ def __init__(self, connection_settings, server_id, |
139 | 139 | report_slave=None, slave_uuid=None, |
140 | 140 | pymysql_wrapper=None, |
141 | 141 | fail_on_table_metadata_unavailable=False, |
142 | | - slave_heartbeat=None): |
| 142 | + slave_heartbeat=None, |
| 143 | + is_mariadb=False): |
143 | 144 | """ |
144 | 145 | Attributes: |
145 | 146 | ctl_connection_settings: Connection settings for cluster holding |
@@ -174,6 +175,8 @@ def __init__(self, connection_settings, server_id, |
174 | 175 | many event to skip in binlog). See |
175 | 176 | MASTER_HEARTBEAT_PERIOD in mysql documentation |
176 | 177 | for semantics |
| 178 | + is_mariadb: Flag to indicate it's a MariaDB server, used with auto_position |
| 179 | + to point to Mariadb specific GTID. |
177 | 180 | """ |
178 | 181 |
|
179 | 182 | self.__connection_settings = connection_settings |
@@ -211,6 +214,7 @@ def __init__(self, connection_settings, server_id, |
211 | 214 | self.log_file = log_file |
212 | 215 | self.auto_position = auto_position |
213 | 216 | self.skip_to_timestamp = skip_to_timestamp |
| 217 | + self.is_mariadb = is_mariadb |
214 | 218 |
|
215 | 219 | if end_log_pos: |
216 | 220 | self.is_past_end_log_pos = False |
@@ -341,77 +345,114 @@ def __connect_to_stream(self): |
341 | 345 | prelude += struct.pack('<I', self.__server_id) |
342 | 346 | prelude += self.log_file.encode() |
343 | 347 | else: |
344 | | - # Format for mysql packet master_auto_position |
345 | | - # |
346 | | - # All fields are little endian |
347 | | - # All fields are unsigned |
348 | | - |
349 | | - # Packet length uint 4bytes |
350 | | - # Packet type byte 1byte == 0x1e |
351 | | - # Binlog flags ushort 2bytes == 0 (for retrocompatibilty) |
352 | | - # Server id uint 4bytes |
353 | | - # binlognamesize uint 4bytes |
354 | | - # binlogname str Nbytes N = binlognamesize |
355 | | - # Zeroified |
356 | | - # binlog position uint 4bytes == 4 |
357 | | - # payload_size uint 4bytes |
358 | | - |
359 | | - # What come next, is the payload, where the slave gtid_executed |
360 | | - # is sent to the master |
361 | | - # n_sid ulong 8bytes == which size is the gtid_set |
362 | | - # | sid uuid 16bytes UUID as a binary |
363 | | - # | n_intervals ulong 8bytes == how many intervals are sent |
364 | | - # | for this gtid |
365 | | - # | | start ulong 8bytes Start position of this interval |
366 | | - # | | stop ulong 8bytes Stop position of this interval |
367 | | - |
368 | | - # A gtid set looks like: |
369 | | - # 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10, |
370 | | - # 1c2aad49-ae92-409a-b4df-d05a03e4702e:42-47:80-100:130-140 |
371 | | - # |
372 | | - # In this particular gtid set, |
373 | | - # 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10 |
374 | | - # is the first member of the set, it is called a gtid. |
375 | | - # In this gtid, 19d69c1e-ae97-4b8c-a1ef-9e12ba966457 is the sid |
376 | | - # and have two intervals, 1-3 and 8-10, 1 is the start position of |
377 | | - # the first interval 3 is the stop position of the first interval. |
378 | | - |
379 | | - gtid_set = GtidSet(self.auto_position) |
380 | | - encoded_data_size = gtid_set.encoded_length |
381 | | - |
382 | | - header_size = (2 + # binlog_flags |
383 | | - 4 + # server_id |
384 | | - 4 + # binlog_name_info_size |
385 | | - 4 + # empty binlog name |
386 | | - 8 + # binlog_pos_info_size |
387 | | - 4) # encoded_data_size |
388 | | - |
389 | | - prelude = b'' + struct.pack('<i', header_size + encoded_data_size)\ |
390 | | - + bytes(bytearray([COM_BINLOG_DUMP_GTID])) |
| 348 | + if self.is_mariadb: |
| 349 | + # https://mariadb.com/kb/en/5-slave-registration/ |
| 350 | + cur = self._stream_connection.cursor() |
391 | 351 |
|
392 | | - flags = 0 |
393 | | - if not self.__blocking: |
394 | | - flags |= 0x01 # BINLOG_DUMP_NON_BLOCK |
395 | | - flags |= 0x04 # BINLOG_THROUGH_GTID |
| 352 | + cur.execute("SET @mariadb_slave_capability=4") |
| 353 | + cur.execute("SET @slave_connect_state='%s'" % self.auto_position) |
| 354 | + cur.execute("SET @slave_gtid_strict_mode=1") |
| 355 | + cur.execute("SET @slave_gtid_ignore_duplicates=0") |
| 356 | + cur.close() |
396 | 357 |
|
397 | | - # binlog_flags (2 bytes) |
398 | | - # see: |
399 | | - # https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html |
400 | | - prelude += struct.pack('<H', flags) |
| 358 | + # https://mariadb.com/kb/en/com_binlog_dump/ |
| 359 | + header_size = ( |
| 360 | + 4 + # binlog pos |
| 361 | + 2 + # binlog flags |
| 362 | + 4 + # slave server_id, |
| 363 | + 4 # requested binlog file name , set it to empty |
| 364 | + ) |
401 | 365 |
|
402 | | - # server_id (4 bytes) |
403 | | - prelude += struct.pack('<I', self.__server_id) |
404 | | - # binlog_name_info_size (4 bytes) |
405 | | - prelude += struct.pack('<I', 3) |
406 | | - # empty_binlog_name (4 bytes) |
407 | | - prelude += b'\0\0\0' |
408 | | - # binlog_pos_info (8 bytes) |
409 | | - prelude += struct.pack('<Q', 4) |
410 | | - |
411 | | - # encoded_data_size (4 bytes) |
412 | | - prelude += struct.pack('<I', gtid_set.encoded_length) |
413 | | - # encoded_data |
414 | | - prelude += gtid_set.encoded() |
| 366 | + prelude = struct.pack('<i', header_size) + bytes(bytearray([COM_BINLOG_DUMP])) |
| 367 | + |
| 368 | + # binlog pos |
| 369 | + prelude += struct.pack('<i', 4) |
| 370 | + |
| 371 | + flags = 0 |
| 372 | + if not self.__blocking: |
| 373 | + flags |= 0x01 # BINLOG_DUMP_NON_BLOCK |
| 374 | + |
| 375 | + # binlog flags |
| 376 | + prelude += struct.pack('<H', flags) |
| 377 | + |
| 378 | + # server id (4 bytes) |
| 379 | + prelude += struct.pack('<I', self.__server_id) |
| 380 | + |
| 381 | + # empty_binlog_name (4 bytes) |
| 382 | + prelude += b'\0\0\0\0' |
| 383 | + |
| 384 | + else: |
| 385 | + # Format for mysql packet master_auto_position |
| 386 | + # |
| 387 | + # All fields are little endian |
| 388 | + # All fields are unsigned |
| 389 | + |
| 390 | + # Packet length uint 4bytes |
| 391 | + # Packet type byte 1byte == 0x1e |
| 392 | + # Binlog flags ushort 2bytes == 0 (for retrocompatibilty) |
| 393 | + # Server id uint 4bytes |
| 394 | + # binlognamesize uint 4bytes |
| 395 | + # binlogname str Nbytes N = binlognamesize |
| 396 | + # Zeroified |
| 397 | + # binlog position uint 4bytes == 4 |
| 398 | + # payload_size uint 4bytes |
| 399 | + |
| 400 | + # What come next, is the payload, where the slave gtid_executed |
| 401 | + # is sent to the master |
| 402 | + # n_sid ulong 8bytes == which size is the gtid_set |
| 403 | + # | sid uuid 16bytes UUID as a binary |
| 404 | + # | n_intervals ulong 8bytes == how many intervals are sent |
| 405 | + # | for this gtid |
| 406 | + # | | start ulong 8bytes Start position of this interval |
| 407 | + # | | stop ulong 8bytes Stop position of this interval |
| 408 | + |
| 409 | + # A gtid set looks like: |
| 410 | + # 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10, |
| 411 | + # 1c2aad49-ae92-409a-b4df-d05a03e4702e:42-47:80-100:130-140 |
| 412 | + # |
| 413 | + # In this particular gtid set, |
| 414 | + # 19d69c1e-ae97-4b8c-a1ef-9e12ba966457:1-3:8-10 |
| 415 | + # is the first member of the set, it is called a gtid. |
| 416 | + # In this gtid, 19d69c1e-ae97-4b8c-a1ef-9e12ba966457 is the sid |
| 417 | + # and have two intervals, 1-3 and 8-10, 1 is the start position of |
| 418 | + # the first interval 3 is the stop position of the first interval. |
| 419 | + |
| 420 | + gtid_set = GtidSet(self.auto_position) |
| 421 | + encoded_data_size = gtid_set.encoded_length |
| 422 | + |
| 423 | + header_size = (2 + # binlog_flags |
| 424 | + 4 + # server_id |
| 425 | + 4 + # binlog_name_info_size |
| 426 | + 4 + # empty binlog name |
| 427 | + 8 + # binlog_pos_info_size |
| 428 | + 4) # encoded_data_size |
| 429 | + |
| 430 | + prelude = b'' + struct.pack('<i', header_size + encoded_data_size)\ |
| 431 | + + bytes(bytearray([COM_BINLOG_DUMP_GTID])) |
| 432 | + |
| 433 | + flags = 0 |
| 434 | + if not self.__blocking: |
| 435 | + flags |= 0x01 # BINLOG_DUMP_NON_BLOCK |
| 436 | + flags |= 0x04 # BINLOG_THROUGH_GTID |
| 437 | + |
| 438 | + # binlog_flags (2 bytes) |
| 439 | + # see: |
| 440 | + # https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html |
| 441 | + prelude += struct.pack('<H', flags) |
| 442 | + |
| 443 | + # server_id (4 bytes) |
| 444 | + prelude += struct.pack('<I', self.__server_id) |
| 445 | + # binlog_name_info_size (4 bytes) |
| 446 | + prelude += struct.pack('<I', 3) |
| 447 | + # empty_binlog_namapprovale (4 bytes) |
| 448 | + prelude += b'\0\0\0' |
| 449 | + # binlog_pos_info (8 bytes) |
| 450 | + prelude += struct.pack('<Q', 4) |
| 451 | + |
| 452 | + # encoded_data_size (4 bytes) |
| 453 | + prelude += struct.pack('<I', gtid_set.encoded_length) |
| 454 | + # encoded_data |
| 455 | + prelude += gtid_set.encoded() |
415 | 456 |
|
416 | 457 | if pymysql.__version__ < LooseVersion("0.6"): |
417 | 458 | self._stream_connection.wfile.write(prelude) |
@@ -542,6 +583,7 @@ def _allowed_event_list(self, only_events, ignored_events, |
542 | 583 | TableMapEvent, |
543 | 584 | HeartbeatLogEvent, |
544 | 585 | NotImplementedEvent, |
| 586 | + MariadbGtidEvent |
545 | 587 | )) |
546 | 588 | if ignored_events is not None: |
547 | 589 | for e in ignored_events: |
|
0 commit comments