Skip to content

Commit 9ee4383

Browse files
authored
schema pg replication (#604)
* fixes references, format files * adds explicit fallback for unknown pgout types * switches to uv, bumps Python and dlt dependencies * fixes dependencies * skips linting on Python 3.13 for now * enables python 3.12, fixes linter errors * fixes mongo config and secrets * fixes bigquery test for mongo * allows to replicate from schema publication
1 parent ca23fa2 commit 9ee4383

File tree

6 files changed

+333
-156
lines changed

6 files changed

+333
-156
lines changed

sources/pg_replication/__init__.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import dlt
66

7+
from dlt.common import logger
78
from dlt.common.typing import TDataItem
89
from dlt.common.schema.typing import TTableSchemaColumns
910
from dlt.extract.items import DataItemWithMeta
@@ -13,7 +14,7 @@
1314

1415

1516
@dlt.resource(
16-
name=lambda args: args["slot_name"] + "_" + args["pub_name"],
17+
name=lambda args: args["slot_name"],
1718
standalone=True,
1819
)
1920
def replication_resource(
@@ -75,15 +76,17 @@ def replication_resource(
7576
"""
7677
# start where we left off in previous run
7778
start_lsn = dlt.current.resource_state().get("last_commit_lsn", 0)
78-
if flush_slot:
79+
if flush_slot and start_lsn:
7980
advance_slot(start_lsn, slot_name, credentials)
8081

8182
# continue until last message in replication slot
8283
options = {"publication_names": pub_name, "proto_version": "1"}
8384
upto_lsn = get_max_lsn(slot_name, options, credentials)
8485
if upto_lsn is None:
8586
return
86-
87+
logger.info(
88+
f"Replicating slot {slot_name} publication {pub_name} from {start_lsn} to {upto_lsn}"
89+
)
8790
# generate items in batches
8891
while True:
8992
gen = ItemGenerator(

sources/pg_replication/decoders.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from abc import ABC, abstractmethod
77
from dataclasses import dataclass
88
from datetime import datetime, timedelta, timezone
9-
from typing import List, Optional, Union
9+
from typing import List, NamedTuple, Optional, Union
1010

1111
# integer byte lengths
1212
INT8 = 1
@@ -28,8 +28,7 @@ def convert_bytes_to_utf8(_in_bytes: Union[bytes, bytearray]) -> str:
2828
return (_in_bytes).decode("utf-8")
2929

3030

31-
@dataclass(frozen=True)
32-
class ColumnData:
31+
class ColumnData(NamedTuple):
3332
# col_data_category is NOT the type. it means null value/toasted(not sent)/text formatted
3433
col_data_category: Optional[str]
3534
col_data_length: Optional[int] = None
@@ -39,8 +38,7 @@ def __repr__(self) -> str:
3938
return f"[col_data_category='{self.col_data_category}', col_data_length={self.col_data_length}, col_data='{self.col_data}']"
4039

4140

42-
@dataclass(frozen=True)
43-
class ColumnType:
41+
class ColumnType(NamedTuple):
4442
"""https://www.postgresql.org/docs/12/catalog-pg-attribute.html"""
4543

4644
part_of_pkey: int
@@ -49,15 +47,18 @@ class ColumnType:
4947
atttypmod: int
5048

5149

52-
@dataclass(frozen=True)
53-
class TupleData:
50+
class TupleData(NamedTuple):
5451
n_columns: int
5552
column_data: List[ColumnData]
5653

5754
def __repr__(self) -> str:
5855
return f"n_columns: {self.n_columns}, data: {self.column_data}"
5956

6057

58+
# TODO: you can make decoding way faster by
59+
# - moving all the decoding core to PgoutputMessage
60+
# - use struct unpack and increase offset manually to reduce calls
61+
# - use tuples to represent data, separate data from decoding!
6162
class PgoutputMessage(ABC):
6263
def __init__(self, buffer: bytes):
6364
self.buffer: io.BytesIO = io.BytesIO(buffer)

0 commit comments

Comments
 (0)