Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.

Commit f02e9e6

Browse files
Leo FolsomLeo Folsom
authored andcommitted
Merge branch 'master' into readme-ideas
2 parents aba16b1 + ba223d1 commit f02e9e6

File tree

13 files changed

+337
-108
lines changed

13 files changed

+337
-108
lines changed

CONTRIBUTING.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ Contributions are very welcome! We'll be happy to help you in the process.
44

55
## What should I know before I get started?
66

7-
Go through the README and the documentation, and make sure that you understand how data-diff works.
7+
Go through the README and the documentation, and make sure that you understand how data-diff works.
88

99
## How to contribute?
1010

@@ -13,12 +13,12 @@ Go through the README and the documentation, and make sure that you understand h
1313
Please report the bug with as many details as you can.
1414

1515
1. Include the exact command that you used. Make sure to run data-diff with the `-d` flag for debug output.
16-
2. Provide the entire output of the command. (stdout, logs, exception)
16+
2. Provide the entire output of the command. (stdout, logs, exception)
1717
3. If possible, show us how we could reproduce the bug. i.e. how to set up an environment in which it occurs.
1818

1919
(When pasting, always make sure to redact sensitive information, like passwords.)
2020

21-
If data-diff returns incorrect results, i.e. false-positive or false-negative, please also include the original values.
21+
If data-diff returns incorrect results, i.e. false-positive or false-negative, please also include the original values.
2222

2323
Before you report a bug, make sure it doesn't already exist.
2424

@@ -66,7 +66,7 @@ Make sure to update the appropriate `TEST_*_CONN_STRING`, so that it will be inc
6666

6767
You can run the tests with `unittest`.
6868

69-
When running against multiple databases, the tests can take a long while.
69+
When running against multiple databases, the tests can take a long while.
7070

7171
To save time, we recommend running them with `unittest-parallel`.
7272

@@ -76,6 +76,6 @@ When debugging, we recommend using the `-f` flag, to stop on error. Also, use th
7676

7777
New databases should be added as a new module in the `data-diff/databases/` folder.
7878

79-
Make sure to update the `DATABASE_TYPES` dictionary in `tests/test_database_types.py`, so that it will be included in the tests.
80-
8179
If possible, please also add the database setup to `docker-compose.yml`, so that we can run and test it for ourselves. If you do, also update the CI (`ci.yml`).
80+
81+
Guide to implementing a new database driver: https://data-diff.readthedocs.io/en/latest/new-database-driver-guide.html

data_diff/__main__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,6 @@ def _main(
197197
bisection_threshold=bisection_threshold,
198198
threaded=threaded,
199199
max_threadpool_size=threads and threads * 2,
200-
debug=debug,
201200
)
202201

203202
if database1 is None or database2 is None:

data_diff/config.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,20 @@ def _apply_config(config: Dict[str, Any], run_name: str, kw: Dict[str, Any]):
2626
else:
2727
run_name = "default"
2828

29+
if 'database1' in kw:
30+
for attr in ('table1', 'database2', 'table2'):
31+
if kw[attr] is None:
32+
raise ValueError(f"Specified database1 but not {attr}. Must specify all 4 arguments, or niether.")
33+
34+
for index in "12":
35+
run_args[index] = {attr: kw.pop(f"{attr}{index}") for attr in ('database', 'table')}
36+
2937
# Process databases + tables
3038
for index in "12":
3139
args = run_args.pop(index, {})
3240
for attr in ("database", "table"):
3341
if attr not in args:
34-
raise ConfigParseError(f"Running 'run.{run_name}': Connection #{index} in missing attribute '{attr}'.")
42+
raise ConfigParseError(f"Running 'run.{run_name}': Connection #{index} is missing attribute '{attr}'.")
3543

3644
database = args.pop("database")
3745
table = args.pop("table")

data_diff/databases/base.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
Float,
1717
ColType_UUID,
1818
Native_UUID,
19-
String_Alphanum,
2019
String_UUID,
20+
String_Alphanum,
21+
String_FixedAlphanum,
22+
String_VaryingAlphanum,
2123
TemporalType,
2224
UnknownColType,
2325
Text,
@@ -79,6 +81,7 @@ class Database(AbstractDatabase):
7981

8082
TYPE_CLASSES: Dict[str, type] = {}
8183
default_schema: str = None
84+
SUPPORTS_ALPHANUMS = True
8285

8386
@property
8487
def name(self):
@@ -229,23 +232,22 @@ def _refine_coltypes(self, table_path: DbPath, col_dict: Dict[str, ColType], whe
229232
col_dict[col_name] = String_UUID()
230233
continue
231234

232-
alphanum_samples = [s for s in samples if s and String_Alphanum.test_value(s)]
233-
if alphanum_samples:
234-
if len(alphanum_samples) != len(samples):
235-
logger.warning(
236-
f"Mixed Alphanum/Non-Alphanum values detected in column {'.'.join(table_path)}.{col_name}, disabling Alphanum support."
237-
)
238-
else:
239-
assert col_name in col_dict
240-
lens = set(map(len, alphanum_samples))
241-
if len(lens) > 1:
235+
if self.SUPPORTS_ALPHANUMS: # Anything but MySQL (so far)
236+
alphanum_samples = [s for s in samples if String_Alphanum.test_value(s)]
237+
if alphanum_samples:
238+
if len(alphanum_samples) != len(samples):
242239
logger.warning(
243-
f"Mixed Alphanum lengths detected in column {'.'.join(table_path)}.{col_name}, disabling Alphanum support."
240+
f"Mixed Alphanum/Non-Alphanum values detected in column {'.'.join(table_path)}.{col_name}. It cannot be used as a key."
244241
)
245242
else:
246-
(length,) = lens
247-
col_dict[col_name] = String_Alphanum(length=length)
248-
continue
243+
assert col_name in col_dict
244+
lens = set(map(len, alphanum_samples))
245+
if len(lens) > 1:
246+
col_dict[col_name] = String_VaryingAlphanum()
247+
else:
248+
(length,) = lens
249+
col_dict[col_name] = String_FixedAlphanum(length=length)
250+
continue
249251

250252
# @lru_cache()
251253
# def get_table_schema(self, path: DbPath) -> Dict[str, ColType]:

data_diff/databases/database_types.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,7 @@ class String_UUID(StringType, ColType_UUID):
9292
pass
9393

9494

95-
@dataclass
9695
class String_Alphanum(StringType, ColType_Alphanum):
97-
length: int
98-
9996
@staticmethod
10097
def test_value(value: str) -> bool:
10198
try:
@@ -104,6 +101,18 @@ def test_value(value: str) -> bool:
104101
except ValueError:
105102
return False
106103

104+
def make_value(self, value):
105+
return self.python_type(value)
106+
107+
108+
class String_VaryingAlphanum(String_Alphanum):
109+
pass
110+
111+
112+
@dataclass
113+
class String_FixedAlphanum(String_Alphanum):
114+
length: int
115+
107116
def make_value(self, value):
108117
if len(value) != self.length:
109118
raise ValueError(f"Expected alphanumeric value of length {self.length}, but got '{value}'.")

data_diff/databases/mysql.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class MySQL(ThreadedDatabase):
2828
"binary": Text,
2929
}
3030
ROUNDS_ON_PREC_LOSS = True
31+
SUPPORTS_ALPHANUMS = False
3132

3233
def __init__(self, *, thread_count, **kw):
3334
self._args = kw

data_diff/diff_tables.py

Lines changed: 44 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
from runtype import dataclass
1414

1515
from .utils import safezip, run_as_daemon
16-
from .databases.database_types import IKey, NumericType, PrecisionType, StringType
16+
from .thread_utils import ThreadedYielder
17+
from .databases.database_types import IKey, NumericType, PrecisionType, StringType, ColType_UUID
1718
from .table_segment import TableSegment
1819
from .tracking import create_end_event_json, create_start_event_json, send_event_json, is_tracking_enabled
1920

@@ -121,22 +122,25 @@ def diff_tables(self, table1: TableSegment, table2: TableSegment) -> DiffResult:
121122
logger.info(
122123
f"Diffing tables | segments: {self.bisection_factor}, bisection threshold: {self.bisection_threshold}. "
123124
f"key-range: {table1.min_key}..{table2.max_key}, "
124-
f"size: {table1.approximate_size()}"
125+
f"size: table1 <= {table1.approximate_size()}, table2 <= {table2.approximate_size()}"
125126
)
126127

128+
ti = ThreadedYielder(self.max_threadpool_size)
127129
# Bisect (split) the table into segments, and diff them recursively.
128-
yield from self._bisect_and_diff_tables(table1, table2)
130+
ti.submit(self._bisect_and_diff_tables, ti, table1, table2)
129131

130132
# Now we check for the second min-max, to diff the portions we "missed".
131133
min_key2, max_key2 = self._parse_key_range_result(key_type, next(key_ranges))
132134

133135
if min_key2 < min_key1:
134136
pre_tables = [t.new(min_key=min_key2, max_key=min_key1) for t in (table1, table2)]
135-
yield from self._bisect_and_diff_tables(*pre_tables)
137+
ti.submit(self._bisect_and_diff_tables, ti, *pre_tables)
136138

137139
if max_key2 > max_key1:
138140
post_tables = [t.new(min_key=max_key1, max_key=max_key2) for t in (table1, table2)]
139-
yield from self._bisect_and_diff_tables(*post_tables)
141+
ti.submit(self._bisect_and_diff_tables, ti, *post_tables)
142+
143+
yield from ti
140144

141145
except BaseException as e: # Catch KeyboardInterrupt too
142146
error = e
@@ -205,6 +209,10 @@ def _validate_and_adjust_columns(self, table1, table2):
205209
table1._schema[c1] = col1.replace(precision=lowest.precision)
206210
table2._schema[c2] = col2.replace(precision=lowest.precision)
207211

212+
elif isinstance(col1, ColType_UUID):
213+
if not isinstance(col2, ColType_UUID):
214+
raise TypeError(f"Incompatible types for column '{c1}': {col1} <-> {col2}")
215+
208216
elif isinstance(col1, StringType):
209217
if not isinstance(col2, StringType):
210218
raise TypeError(f"Incompatible types for column '{c1}': {col1} <-> {col2}")
@@ -218,16 +226,19 @@ def _validate_and_adjust_columns(self, table1, table2):
218226
"If encoding/formatting differs between databases, it may result in false positives."
219227
)
220228

221-
def _bisect_and_diff_tables(self, table1, table2, level=0, max_rows=None):
229+
def _bisect_and_diff_tables(
230+
self, ti: ThreadedYielder, table1: TableSegment, table2: TableSegment, level=0, max_rows=None
231+
):
222232
assert table1.is_bounded and table2.is_bounded
223233

234+
max_space_size = max(table1.approximate_size(), table2.approximate_size())
224235
if max_rows is None:
225-
# We can be sure that row_count <= max_rows
226-
max_rows = table1.max_key - table1.min_key
236+
# We can be sure that row_count <= max_rows iff the table key is unique
237+
max_rows = max_space_size
227238

228239
# If count is below the threshold, just download and compare the columns locally
229240
# This saves time, as bisection speed is limited by ping and query performance.
230-
if max_rows < self.bisection_threshold:
241+
if max_rows < self.bisection_threshold or max_space_size < self.bisection_factor * 2:
231242
rows1, rows2 = self._threaded_call("get_values", [table1, table2])
232243
diff = list(diff_sets(rows1, rows2))
233244

@@ -242,49 +253,51 @@ def _bisect_and_diff_tables(self, table1, table2, level=0, max_rows=None):
242253

243254
logger.info(". " * level + f"Diff found {len(diff)} different rows.")
244255
self.stats["rows_downloaded"] = self.stats.get("rows_downloaded", 0) + max(len(rows1), len(rows2))
245-
yield from diff
246-
return
256+
return diff
247257

248258
# Choose evenly spaced checkpoints (according to min_key and max_key)
249-
checkpoints = table1.choose_checkpoints(self.bisection_factor - 1)
259+
biggest_table = max(table1, table2, key=methodcaller('approximate_size'))
260+
checkpoints = biggest_table.choose_checkpoints(self.bisection_factor - 1)
250261

251262
# Create new instances of TableSegment between each checkpoint
252263
segmented1 = table1.segment_by_checkpoints(checkpoints)
253264
segmented2 = table2.segment_by_checkpoints(checkpoints)
254265

255266
# Recursively compare each pair of corresponding segments between table1 and table2
256-
diff_iters = [
257-
self._diff_tables(t1, t2, level + 1, i + 1, len(segmented1))
258-
for i, (t1, t2) in enumerate(safezip(segmented1, segmented2))
259-
]
260-
261-
for res in self._thread_map(list, diff_iters):
262-
yield from res
263-
264-
def _diff_tables(self, table1, table2, level=0, segment_index=None, segment_count=None):
267+
for i, (t1, t2) in enumerate(safezip(segmented1, segmented2)):
268+
ti.submit(self._diff_tables, ti, t1, t2, max_rows, level + 1, i + 1, len(segmented1), priority=level)
269+
270+
def _diff_tables(
271+
self,
272+
ti: ThreadedYielder,
273+
table1: TableSegment,
274+
table2: TableSegment,
275+
max_rows: int,
276+
level=0,
277+
segment_index=None,
278+
segment_count=None,
279+
):
265280
logger.info(
266281
". " * level + f"Diffing segment {segment_index}/{segment_count}, "
267282
f"key-range: {table1.min_key}..{table2.max_key}, "
268-
f"size: {table2.max_key-table1.min_key}"
283+
f"size <= {max_rows}"
269284
)
270285

271286
# When benchmarking, we want the ability to skip checksumming. This
272287
# allows us to download all rows for comparison in performance. By
273288
# default, data-diff will checksum the section first (when it's below
274289
# the threshold) and _then_ download it.
275290
if BENCHMARK:
276-
max_rows_from_keys = max(table1.max_key - table1.min_key, table2.max_key - table2.min_key)
277-
if max_rows_from_keys < self.bisection_threshold:
278-
yield from self._bisect_and_diff_tables(table1, table2, level=level, max_rows=max_rows_from_keys)
279-
return
291+
if max_rows < self.bisection_threshold:
292+
return self._bisect_and_diff_tables(ti, table1, table2, level=level, max_rows=max_rows)
280293

281294
(count1, checksum1), (count2, checksum2) = self._threaded_call("count_and_checksum", [table1, table2])
282295

283296
if count1 == 0 and count2 == 0:
284-
logger.warning(
285-
"Uneven distribution of keys detected. (big gaps in the key column). "
286-
"For better performance, we recommend to increase the bisection-threshold."
287-
)
297+
# logger.warning(
298+
# f"Uneven distribution of keys detected in segment {table1.min_key}..{table2.max_key}. (big gaps in the key column). "
299+
# "For better performance, we recommend to increase the bisection-threshold."
300+
# )
288301
assert checksum1 is None and checksum2 is None
289302
return
290303

@@ -293,7 +306,7 @@ def _diff_tables(self, table1, table2, level=0, segment_index=None, segment_coun
293306
self.stats["table2_count"] = self.stats.get("table2_count", 0) + count2
294307

295308
if checksum1 != checksum2:
296-
yield from self._bisect_and_diff_tables(table1, table2, level=level, max_rows=max(count1, count2))
309+
return self._bisect_and_diff_tables(ti, table1, table2, level=level, max_rows=max(count1, count2))
297310

298311
def _thread_map(self, func, iterable):
299312
if not self.threaded:

data_diff/table_segment.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from runtype import dataclass
66

7-
from .utils import ArithString, split_space
7+
from .utils import ArithString, split_space, ArithAlphanumeric
88

99
from .databases.base import Database
1010
from .databases.database_types import DbPath, DbKey, DbTime, Native_UUID, Schema, create_schema
@@ -149,8 +149,9 @@ def choose_checkpoints(self, count: int) -> List[DbKey]:
149149
assert self.is_bounded
150150
if isinstance(self.min_key, ArithString):
151151
assert type(self.min_key) is type(self.max_key)
152-
checkpoints = split_space(self.min_key.int, self.max_key.int, count)
153-
return [self.min_key.new(int=i) for i in checkpoints]
152+
checkpoints = self.min_key.range(self.max_key, count)
153+
assert all(self.min_key <= x <= self.max_key for x in checkpoints)
154+
return checkpoints
154155

155156
return split_space(self.min_key, self.max_key, count)
156157

0 commit comments

Comments
 (0)