Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.

Commit f92e75e

Browse files
committed
resolve merge conflicts
2 parents 1e0795a + fb1c323 commit f92e75e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2860
-891
lines changed

README.md

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,9 @@ Let's break this down. Assume there are two tables stored in two databases, and
132132
| PostgreSQL >=10 | `postgresql://<user>:<password>@<host>:5432/<database>` | 💚 |
133133
| MySQL | `mysql://<user>:<password>@<hostname>:5432/<database>` | 💚 |
134134
| Snowflake | `"snowflake://<user>[:<password>]@<account>/<database>/<SCHEMA>?warehouse=<WAREHOUSE>&role=<role>[&authenticator=externalbrowser]"` | 💚 |
135+
| BigQuery | `bigquery://<project>/<dataset>` | 💚 |
136+
| Redshift | `redshift://<username>:<password>@<hostname>:5439/<database>` | 💚 |
135137
| Oracle | `oracle://<username>:<password>@<hostname>/database` | 💛 |
136-
| BigQuery | `bigquery://<project>/<dataset>` | 💛 |
137-
| Redshift | `redshift://<username>:<password>@<hostname>:5439/<database>` | 💛 |
138138
| Presto | `presto://<username>:<password>@<hostname>:8080/<database>` | 💛 |
139139
| Databricks | `databricks://<http_path>:<access_token>@<server_hostname>/<catalog>/<schema>` | 💛 |
140140
| Trino | `trino://<username>:<password>@<hostname>:8080/<database>` | 💛 |
@@ -145,6 +145,8 @@ Let's break this down. Assume there are two tables stored in two databases, and
145145
| Pinot | | 📝 |
146146
| Druid | | 📝 |
147147
| Kafka | | 📝 |
148+
| DuckDB | | 📝 |
149+
| SQLite | | 📝 |
148150

149151
* 💚: Implemented and thoroughly tested.
150152
* 💛: Implemented, but not thoroughly tested yet.
@@ -163,7 +165,7 @@ may be case-sensitive. This is the case for the Snowflake schema and table names
163165
## Options:
164166

165167
- `--help` - Show help message and exit.
166-
- `-k` or `--key-column` - Name of the primary key column
168+
- `-k` or `--key-columns` - Name of the primary key column. If none provided, default is 'id'.
167169
- `-t` or `--update-column` - Name of updated_at/last_updated column
168170
- `-c` or `--columns` - Names of extra columns to compare. Can be used more than once in the same command.
169171
Accepts a name or a pattern like in SQL.
@@ -178,12 +180,24 @@ may be case-sensitive. This is the case for the Snowflake schema and table names
178180
Example: `--min-age=5min` ignores rows from the last 5 minutes.
179181
Valid units: `d, days, h, hours, min, minutes, mon, months, s, seconds, w, weeks, y, years`
180182
- `--max-age` - Considers only rows younger than specified. See `--min-age`.
181-
- `--bisection-factor` - Segments per iteration. When set to 2, it performs binary search.
182-
- `--bisection-threshold` - Minimal bisection threshold. i.e. maximum size of pages to diff locally.
183183
- `-j` or `--threads` - Number of worker threads to use per database. Default=1.
184184
- `-w`, `--where` - An additional 'where' expression to restrict the search space.
185185
- `--conf`, `--run` - Specify the run and configuration from a TOML file. (see below)
186186
- `--no-tracking` - data-diff sends home anonymous usage data. Use this to disable it.
187+
- `-a`, `--algorithm` `[auto|joindiff|hashdiff]` - Force algorithm choice
188+
189+
Same-DB diff only:
190+
- `-m`, `--materialize` - Materialize the diff results into a new table in the database.
191+
If a table exists by that name, it will be replaced.
192+
Use `%t` in the name to place a timestamp.
193+
Example: `-m test_mat_%t`
194+
- `--assume-unique-key` - Skip validating the uniqueness of the key column during joindiff, which is costly in non-cloud dbs.
195+
- `--sample-exclusive-rows` - Sample several rows that only appear in one of the tables, but not the other. Use with `-s`.
196+
197+
Cross-DB diff only:
198+
- `--bisection-threshold` - Minimal size of segment to be split. Smaller segments will be downloaded and compared locally.
199+
- `--bisection-factor` - Segments per iteration. When set to 2, it performs binary search.
200+
187201

188202

189203
### How to use with a configuration file

data_diff/__init__.py

Lines changed: 70 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,49 @@
1-
from typing import Tuple, Iterator, Optional, Union
1+
from typing import Sequence, Tuple, Iterator, Optional, Union
22

33
from .tracking import disable_tracking
44
from .databases.connect import connect
55
from .databases.database_types import DbKey, DbTime, DbPath
6-
from .diff_tables import TableSegment, TableDiffer, DEFAULT_BISECTION_THRESHOLD, DEFAULT_BISECTION_FACTOR
6+
from .diff_tables import Algorithm
7+
from .hashdiff_tables import HashDiffer, DEFAULT_BISECTION_THRESHOLD, DEFAULT_BISECTION_FACTOR
8+
from .joindiff_tables import JoinDiffer
9+
from .table_segment import TableSegment
710

811

912
def connect_to_table(
1013
db_info: Union[str, dict],
1114
table_name: Union[DbPath, str],
12-
key_column: str = "id",
15+
key_columns: str = ("id",),
1316
thread_count: Optional[int] = 1,
1417
**kwargs,
15-
):
18+
) -> TableSegment:
1619
"""Connects to the given database, and creates a TableSegment instance
1720
1821
Parameters:
1922
db_info: Either a URI string, or a dict of connection options.
2023
table_name: Name of the table as a string, or a tuple that signifies the path.
21-
key_column: Name of the key column
22-
thread_count: Number of threads for this connection (only if using a threadpooled implementation)
24+
key_columns: Names of the key columns
25+
thread_count: Number of threads for this connection (only if using a threadpooled db implementation)
26+
27+
See Also:
28+
:meth:`connect`
2329
"""
30+
if isinstance(key_columns, str):
31+
key_columns = (key_columns,)
2432

2533
db = connect(db_info, thread_count=thread_count)
2634

2735
if isinstance(table_name, str):
2836
table_name = db.parse_table_name(table_name)
2937

30-
return TableSegment(db, table_name, key_column, **kwargs)
38+
return TableSegment(db, table_name, key_columns, **kwargs)
3139

3240

3341
def diff_tables(
3442
table1: TableSegment,
3543
table2: TableSegment,
3644
*,
3745
# Name of the key column, which uniquely identifies each row (usually id)
38-
key_column: str = None,
46+
key_columns: Sequence[str] = None,
3947
# Name of updated column, which signals that rows changed (usually updated_at or last_update)
4048
update_column: str = None,
4149
# Extra columns to compare
@@ -46,31 +54,63 @@ def diff_tables(
4654
# Start/end update_column values, used to restrict the segment
4755
min_update: DbTime = None,
4856
max_update: DbTime = None,
49-
# Into how many segments to bisect per iteration
57+
# Algorithm
58+
algorithm: Algorithm = Algorithm.HASHDIFF,
59+
# Into how many segments to bisect per iteration (hashdiff only)
5060
bisection_factor: int = DEFAULT_BISECTION_FACTOR,
51-
# When should we stop bisecting and compare locally (in row count)
61+
# When should we stop bisecting and compare locally (in row count; hashdiff only)
5262
bisection_threshold: int = DEFAULT_BISECTION_THRESHOLD,
5363
# Enable/disable threaded diffing. Needed to take advantage of database threads.
5464
threaded: bool = True,
5565
# Maximum size of each threadpool. None = auto. Only relevant when threaded is True.
5666
# There may be many pools, so number of actual threads can be a lot higher.
5767
max_threadpool_size: Optional[int] = 1,
58-
# Enable/disable debug prints
59-
debug: bool = False,
6068
) -> Iterator:
61-
"""Efficiently finds the diff between table1 and table2.
69+
"""Finds the diff between table1 and table2.
70+
71+
Parameters:
72+
key_columns (Tuple[str, ...]): Name of the key column, which uniquely identifies each row (usually id)
73+
update_column (str, optional): Name of updated column, which signals that rows changed.
74+
Usually updated_at or last_update. Used by `min_update` and `max_update`.
75+
extra_columns (Tuple[str, ...], optional): Extra columns to compare
76+
min_key (:data:`DbKey`, optional): Lowest key value, used to restrict the segment
77+
max_key (:data:`DbKey`, optional): Highest key value, used to restrict the segment
78+
min_update (:data:`DbTime`, optional): Lowest update_column value, used to restrict the segment
79+
max_update (:data:`DbTime`, optional): Highest update_column value, used to restrict the segment
80+
algorithm (:class:`Algorithm`): Which diffing algorithm to use (`HASHDIFF` or `JOINDIFF`)
81+
bisection_factor (int): Into how many segments to bisect per iteration. (Used when algorithm is `HASHDIFF`)
82+
bisection_threshold (Number): Minimal row count of segment to bisect, otherwise download
83+
and compare locally. (Used when algorithm is `HASHDIFF`).
84+
threaded (bool): Enable/disable threaded diffing. Needed to take advantage of database threads.
85+
max_threadpool_size (int): Maximum size of each threadpool. ``None`` means auto.
86+
Only relevant when `threaded` is ``True``.
87+
There may be many pools, so number of actual threads can be a lot higher.
88+
89+
Note:
90+
The following parameters are used to override the corresponding attributes of the given :class:`TableSegment` instances:
91+
`key_columns`, `update_column`, `extra_columns`, `min_key`, `max_key`.
92+
If different values are needed per table, it's possible to omit them here, and instead set
93+
them directly when creating each :class:`TableSegment`.
6294
6395
Example:
6496
>>> table1 = connect_to_table('postgresql:///', 'Rating', 'id')
6597
>>> list(diff_tables(table1, table1))
6698
[]
6799
100+
See Also:
101+
:class:`TableSegment`
102+
:class:`HashDiffer`
103+
:class:`JoinDiffer`
104+
68105
"""
106+
if isinstance(key_columns, str):
107+
key_columns = (key_columns,)
108+
69109
tables = [table1, table2]
70110
override_attrs = {
71111
k: v
72112
for k, v in dict(
73-
key_column=key_column,
113+
key_columns=key_columns,
74114
update_column=update_column,
75115
extra_columns=extra_columns,
76116
min_key=min_key,
@@ -83,11 +123,20 @@ def diff_tables(
83123

84124
segments = [t.new(**override_attrs) for t in tables] if override_attrs else tables
85125

86-
differ = TableDiffer(
87-
bisection_factor=bisection_factor,
88-
bisection_threshold=bisection_threshold,
89-
debug=debug,
90-
threaded=threaded,
91-
max_threadpool_size=max_threadpool_size,
92-
)
126+
algorithm = Algorithm(algorithm)
127+
if algorithm == Algorithm.HASHDIFF:
128+
differ = HashDiffer(
129+
bisection_factor=bisection_factor,
130+
bisection_threshold=bisection_threshold,
131+
threaded=threaded,
132+
max_threadpool_size=max_threadpool_size,
133+
)
134+
elif algorithm == Algorithm.JOINDIFF:
135+
differ = JoinDiffer(
136+
threaded=threaded,
137+
max_threadpool_size=max_threadpool_size,
138+
)
139+
else:
140+
raise ValueError(f"Unknown algorithm: {algorithm}")
141+
93142
return differ.diff_tables(*segments)

0 commit comments

Comments
 (0)