Skip to content
This repository was archived by the owner on May 17, 2024. It is now read-only.
8 changes: 6 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
python-version: ${{ matrix.python-version }}

- name: Build the stack
run: docker-compose up -d mysql postgres presto
run: docker-compose up -d mysql postgres presto trino

- name: Install Poetry
run: pip install poetry
Expand All @@ -46,4 +46,8 @@ jobs:
env:
DATADIFF_SNOWFLAKE_URI: '${{ secrets.DATADIFF_SNOWFLAKE_URI }}'
DATADIFF_PRESTO_URI: '${{ secrets.DATADIFF_PRESTO_URI }}'
run: poetry run unittest-parallel -j 16
DATADIFF_TRINO_URI: '${{ secrets.DATADIFF_TRINO_URI }}'
run: |
echo $DATADIFF_PRESTO_URI
echo $DATADIFF_TRINO_URI
poetry run unittest-parallel -j 16 -f
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,6 @@ benchmark_*.png

# Mac
.DS_Store

# IntelliJ
.idea
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ $ data-diff \
| Redshift | `redshift://<username>:<password>@<hostname>:5439/<database>` | 💛 |
| Presto | `presto://<username>:<password>@<hostname>:8080/<database>` | 💛 |
| Databricks | `databricks://<http_path>:<access_token>@<server_hostname>/<catalog>/<schema>` | 💛 |
| ElasticSearch | | 📝 | | 📝 |
| Trino | `trino://<username>:<password>@<hostname>:8080/<database>` | 💛 |
| ElasticSearch | | 📝 |
| Databricks | | 📝 |
| Planetscale | | 📝 |
| Clickhouse | | 📝 |
| Pinot | | 📝 |
Expand Down Expand Up @@ -163,6 +165,8 @@ While you may install them manually, we offer an easy way to install them along

- `pip install 'data-diff[oracle]'`

- `pip install 'data-diff[trino]'`

- For BigQuery, see: https://pypi.org/project/google-cloud-bigquery/


Expand Down Expand Up @@ -505,7 +509,7 @@ Now you can insert it into the testing database(s):
```shell-session
# It's optional to seed more than one to run data-diff(1) against.
$ poetry run preql -f dev/prepare_db.pql mysql://mysql:Password1@127.0.0.1:3306/mysql
$ poetry run preql -f dev/prepare_db.pql postgresql://postgres:Password1@127.0.0.1:5432/postgres
$ poetry run preql -f dev/prepare_db.pql postgres://postgres:Password1@127.0.0.1:5432/postgres

# Cloud databases
$ poetry run preql -f dev/prepare_db.pql snowflake://<uri>
Expand Down
1 change: 1 addition & 0 deletions data_diff/databases/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@
from .redshift import Redshift
from .presto import Presto
from .databricks import Databricks
from .trino import Trino

from .connect import connect_to_uri
5 changes: 4 additions & 1 deletion data_diff/databases/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from .redshift import Redshift
from .presto import Presto
from .databricks import Databricks
from .trino import Trino


@dataclass
Expand Down Expand Up @@ -80,7 +81,8 @@ def match_path(self, dsn):
"bigquery": MatchUriPath(BigQuery, ["dataset"], help_str="bigquery://<project>/<dataset>"),
"databricks": MatchUriPath(
Databricks, ["catalog", "schema"], help_str="databricks://:access_token@server_name/http_path",
)
),
"trino": MatchUriPath(Trino, ["catalog", "schema"], help_str="trino://<user>@<host>/<catalog>/<schema>"),
}


Expand All @@ -105,6 +107,7 @@ def connect_to_uri(db_uri: str, thread_count: Optional[int] = 1) -> Database:
- redshift
- presto
- databricks
- trino
"""

dsn = dsnparse.parse(db_uri)
Expand Down
123 changes: 123 additions & 0 deletions data_diff/databases/trino.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import re

from .database_types import *
from .base import Database, import_helper
from .base import (
MD5_HEXDIGITS,
CHECKSUM_HEXDIGITS,
TIMESTAMP_PRECISION_POS,
DEFAULT_DATETIME_PRECISION,
)


@import_helper("trino")
def import_trino():
import trino

return trino


class Trino(Database):
default_schema = "public"
TYPE_CLASSES = {
# Timestamps
"timestamp with time zone": TimestampTZ,
"timestamp without time zone": Timestamp,
"timestamp": Timestamp,
# Numbers
"integer": Integer,
"bigint": Integer,
"real": Float,
"double": Float,
# Text
"varchar": Text,
}
ROUNDS_ON_PREC_LOSS = True

def __init__(self, **kw):
trino = import_trino()

self._conn = trino.dbapi.connect(**kw)

def quote(self, s: str):
return f'"{s}"'

def md5_to_int(self, s: str) -> str:
return f"cast(from_base(substr(to_hex(md5(to_utf8({s}))), {1 + MD5_HEXDIGITS - CHECKSUM_HEXDIGITS}), 16) as decimal(38, 0))"

def to_string(self, s: str):
return f"cast({s} as varchar)"

def _query(self, sql_code: str) -> list:
"""Uses the standard SQL cursor interface"""
c = self._conn.cursor()
c.execute(sql_code)
if sql_code.lower().startswith("select"):
return c.fetchall()
if re.match(r"(insert|create|truncate|drop)", sql_code, re.IGNORECASE):
return c.fetchone()

def close(self):
self._conn.close()

def normalize_timestamp(self, value: str, coltype: TemporalType) -> str:
if coltype.rounds:
s = f"date_format(cast({value} as timestamp({coltype.precision})), '%Y-%m-%d %H:%i:%S.%f')"
else:
s = f"date_format(cast({value} as timestamp(6)), '%Y-%m-%d %H:%i:%S.%f')"

return f"RPAD(RPAD({s}, {TIMESTAMP_PRECISION_POS + coltype.precision}, '.'), {TIMESTAMP_PRECISION_POS + 6}, '0')"

def normalize_number(self, value: str, coltype: FractionalType) -> str:
return self.to_string(f"cast({value} as decimal(38,{coltype.precision}))")

def select_table_schema(self, path: DbPath) -> str:
schema, table = self._normalize_table_path(path)

return (
f"SELECT column_name, data_type, 3 as datetime_precision, 3 as numeric_precision FROM INFORMATION_SCHEMA.COLUMNS "
f"WHERE table_name = '{table}' AND table_schema = '{schema}'"
)

def _parse_type(
self,
table_path: DbPath,
col_name: str,
type_repr: str,
datetime_precision: int = None,
numeric_precision: int = None,
) -> ColType:
timestamp_regexps = {
r"timestamp\((\d)\)": Timestamp,
r"timestamp\((\d)\) with time zone": TimestampTZ,
}
for regexp, t_cls in timestamp_regexps.items():
m = re.match(regexp + "$", type_repr)
if m:
datetime_precision = int(m.group(1))
return t_cls(
precision=datetime_precision
if datetime_precision is not None
else DEFAULT_DATETIME_PRECISION,
rounds=self.ROUNDS_ON_PREC_LOSS,
)

number_regexps = {r"decimal\((\d+),(\d+)\)": Decimal}
for regexp, n_cls in number_regexps.items():
m = re.match(regexp + "$", type_repr)
if m:
prec, scale = map(int, m.groups())
return n_cls(scale)

string_regexps = {r"varchar\((\d+)\)": Text, r"char\((\d+)\)": Text}
for regexp, n_cls in string_regexps.items():
m = re.match(regexp + "$", type_repr)
if m:
return n_cls()

return super()._parse_type(
table_path, col_name, type_repr, datetime_precision, numeric_precision
)

def normalize_uuid(self, value: str, coltype: ColType_UUID) -> str:
return f"TRIM({value})"
2 changes: 1 addition & 1 deletion data_diff/diff_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ class TableDiffer:
The algorithm uses hashing to quickly check if the tables are different, and then applies a
bisection search recursively to find the differences efficiently.

Works best for comparing tables that are mostly the name, with minor discrepencies.
Works best for comparing tables that are mostly the same, with minor discrepencies.

Parameters:
bisection_factor (int): Into how many segments to bisect per iteration.
Expand Down
Empty file added debug.py
Empty file.
1 change: 1 addition & 0 deletions dev/trino-conf/etc/catalog/jms.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
connector.name=jmx
2 changes: 2 additions & 0 deletions dev/trino-conf/etc/catalog/memory.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
connector.name=memory
memory.max-data-per-node=128MB
4 changes: 4 additions & 0 deletions dev/trino-conf/etc/catalog/postgresql.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
connector.name=postgresql
connection-url=jdbc:postgresql://postgres:5432/postgres
connection-user=postgres
connection-password=Password1
1 change: 1 addition & 0 deletions dev/trino-conf/etc/catalog/tpcds.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
connector.name=tpcds
1 change: 1 addition & 0 deletions dev/trino-conf/etc/catalog/tpch.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
connector.name=tpch
5 changes: 5 additions & 0 deletions dev/trino-conf/etc/config.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8080
discovery.uri=http://localhost:8080
discovery-server.enabled=true
12 changes: 12 additions & 0 deletions dev/trino-conf/etc/jvm.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-server
-Xmx1G
-XX:-UseBiasedLocking
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+UseGCOverheadLimit
-XX:+ExitOnOutOfMemoryError
-XX:ReservedCodeCacheSize=256M
-Djdk.attach.allowAttachSelf=true
-Djdk.nio.maxCachedBufferSize=2000000
3 changes: 3 additions & 0 deletions dev/trino-conf/etc/node.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
node.environment=docker
node.data-dir=/data/trino
plugin.dir=/usr/lib/trino/plugin
10 changes: 10 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ services:
networks:
- local

trino:
image: 'trinodb/trino:389'
hostname: trino
ports:
- '8081:8080'
volumes:
- ./dev/trino-conf/etc:/etc/trino:ro
networks:
- local

volumes:
postgresql-data:
mysql-data:
Expand Down
Loading