Skip to content

Commit 118ae92

Browse files
authored
Added runtime validation of sql expressions (#625)
## Changes * Added runtime validation of expressions in `sql_expression` check. Note that this covers a subset of error conditions. Some issues only appear during a Spark action. * Fixed tests: * Improved clean up of lakebase orphaned instances * Fixed tests for storing checks in volume, after sdk upgrade * Removed test for listing tables with pattern matching all catalogs and schemas as it causes failures when other tests are run in parallel ### Tests <!-- How is this tested? Please see the checklist below and also describe any other relevant tests --> - [ ] manually tested - [ ] added unit tests - [x] added integration tests - [ ] added end-to-end tests - [ ] added performance tests
1 parent 6fbf5ae commit 118ae92

File tree

10 files changed

+160
-58
lines changed

10 files changed

+160
-58
lines changed

docs/dqx/docs/guide/quality_checks_apply.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ Checks can be applied to the input data by one of the following methods of the `
3232

3333
You can see the full list of `DQEngine` methods [here](/docs/reference/engine/#dqx-engine-methods).
3434

35-
The engine ensures that the specified `column`, `columns`, or `filter` fields can be resolved in the input DataFrame. If any of these fields are invalid, the check evaluation is skipped, and the results include the check failure with a message identifying the invalid fields.
35+
The engine ensures that the specified `column`, `columns`, `filter`, or sql 'expression' fields can be resolved in the input DataFrame. If any of these fields are invalid, the check evaluation is skipped, and the results include the check failure with a message identifying the invalid fields.
3636
The engine will raise an error if you try to apply checks with invalid definition (e.g. wrong syntax).
3737
In addition, you can also perform a standalone syntax validation of the checks as described [here](/docs/guide/quality_checks_definition#validating-syntax-of-quality-checks).
3838

docs/dqx/docs/reference/quality_checks.mdx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ You can also define your own custom checks (see [Creating custom checks](#creati
4848
| `is_ipv4_address_in_cidr` | Checks whether the values in the input column have valid IPv4 address format and fall within the given CIDR block. | `column`: column to check (can be a string column name or a column expression); `cidr_block`: CIDR block string |
4949
| `is_valid_ipv6_address` | Checks whether the values in the input column have valid IPv6 address format. | `column` to check (can be a string column name or a column expression) |
5050
| `is_ipv6_address_in_cidr` | Checks whether the values in the input column have valid IPv6 address format and fall within the given CIDR block. | `column`: column to check (can be a string column name or a column expression); `cidr_block`: CIDR block string |
51-
| `sql_expression` | Checks whether the values meet the condition provided as an SQL expression, e.g. `a = 'str1' and a > b`. SQL expressions are evaluated at runtime, so ensure that the expression is safe and that functions used within it (e.g. h3_ischildof, division) do not throw exceptions. You can achieve this by validating input arguments or columns beforehand using guards such as CASE WHEN, IS NOT NULL, RLIKE, or type try casts. | `expression`: sql expression to check on a DataFrame (fail the check if expression evaluates to False, pass if it evaluates to True); `msg`: optional message to output; `name`: optional name of the resulting column (it can be overwritten by `name` specified at the check level); `negate`: if the condition should be negated; `columns`: optional list of columns used in the sql expression to validate they can be resolved in the input DataFrame, also used for reporting purposes and as a name prefix when a check name is not provided |
51+
| `sql_expression` | Checks whether the values meet the condition provided as an SQL expression, e.g. `a = 'str1' and a > b`. If the SQL expression is invalid (for example, references non-existent columns), the check evaluation is skipped. The results will include a check failure with a message identifying the invalid columns. However, when using functions within the SQL expression which are generally opaque for Spark, you must ensure they do not raise exceptions (for instance, you must pass valid arguments to 'h3_ischildof'), as such errors are not automatically handled. Some errors will only appear during Spark action (e.g. when displaying or saving the results) and you must validate the input arguments using guards such as CASE WHEN, IS NOT NULL, RLIKE, or type try casts. | `expression`: sql expression to check on a DataFrame (fail the check if expression evaluates to False, pass if it evaluates to True); `msg`: optional message to output; `name`: optional name of the resulting column (it can be overwritten by `name` specified at the check level); `negate`: if the condition should be negated; `columns`: optional list of columns used in the sql expression to validate they can be resolved in the input DataFrame, also used for reporting purposes and as a name prefix when a check name is not provided |
5252
| `is_data_fresh` | Checks whether the values in the input timestamp column are not older than the specified number of minutes from the base timestamp column. This is useful for identifying stale data due to delayed pipelines and helps catch upstream issues early. | `column`: column of type timestamp/date to check (can be a string column name or a column expression); `max_age_minutes`: maximum age in minutes before data is considered stale; `base_timestamp`: optional base timestamp column from which the stale check is calculated. This can be a string, column expression, datetime value or literal value ex:F.lit(datetime(2024,1,1)). If not provided current_timestamp() function is used |
5353
| `does_not_contain_pii` | Checks whether the values in the input column contain Personally Identifiable Information (PII). Uses Microsoft Presidio to detect various named entities (e.g. PERSON, ADDRESS, EMAIL_ADDRESS). Requires installation of PII detection extras: `pip install 'databricks-labs-dqx[pii-detection]'`. See more details [here](#detecting-personally-identifiable-information-pii). | `column`: column to check (can be a string column name or a column expression); `threshold`: confidence threshold for PII detection (0.0 to 1.0, default: 0.7); `language`: optional language of the text (default: 'en'); `entities`: optional list of entities to detect; `nlp_engine_config`: optional dictionary configuring the NLP engine used for PII detection, see the [Presidio documentation](https://microsoft.github.io/presidio/analyzer/customizing_nlp_models/) for more information |
5454
| `is_latitude` | Checks whether the values in the input column are valid latitude values (i.e. between -90 and 90 degrees). | `column`: column to check (can be a string column name or a column expression) |

src/databricks/labs/dqx/config.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
from functools import cached_property
33
from datetime import datetime, timezone
44
from dataclasses import dataclass, field
5+
6+
from databricks.labs.dqx.checks_serializer import FILE_SERIALIZERS
57
from databricks.labs.dqx.errors import InvalidConfigError, InvalidParameterError
68

79
__all__ = [
@@ -277,7 +279,24 @@ class VolumeFileChecksStorageConfig(BaseChecksStorageConfig):
277279

278280
def __post_init__(self):
279281
if not self.location:
280-
raise InvalidConfigError("The Unity Catalog volume file path ('location' field) must not be empty or None.")
282+
raise InvalidParameterError(
283+
"The Unity Catalog volume file path ('location' field) must not be empty or None."
284+
)
285+
286+
# Expected format: /Volumes/{catalog}/{schema}/{volume}/{path/to/file}
287+
if not self.location.startswith("/Volumes/"):
288+
raise InvalidParameterError("The volume path must start with '/Volumes/'.")
289+
290+
parts = self.location.split("/")
291+
# After split need at least: ['', 'Volumes', 'catalog', 'schema', 'volume', optional 'dir', 'file']
292+
if len(parts) < 3 or not parts[2]:
293+
raise InvalidParameterError("Invalid path: Path is missing a catalog name")
294+
if len(parts) < 4 or not parts[3]:
295+
raise InvalidParameterError("Invalid path: Path is missing a schema name")
296+
if len(parts) < 5 or not parts[4]:
297+
raise InvalidParameterError("Invalid path: Path is missing a volume name")
298+
if len(parts) < 6 or not parts[-1].lower().endswith(tuple(FILE_SERIALIZERS.keys())):
299+
raise InvalidParameterError("Invalid path: Path must include a file name after the volume")
281300

282301

283302
@dataclass

src/databricks/labs/dqx/manager.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from pyspark.errors import AnalysisException
88
from pyspark.sql import DataFrame, Column, SparkSession
99

10+
from databricks.labs.dqx import check_funcs
1011
from databricks.labs.dqx.executor import DQCheckResult, DQRuleExecutorFactory
1112
from databricks.labs.dqx.rule import (
1213
DQRule,
@@ -94,6 +95,23 @@ def has_invalid_filter(self) -> bool:
9495
"""
9596
return self._is_invalid_column(self.filter_condition)
9697

98+
@cached_property
99+
def invalid_sql_expression(self) -> str | None:
100+
"""
101+
Returns an invalid expression for sql expression check.
102+
"""
103+
if self.check.check_func is check_funcs.sql_expression:
104+
if "expression" in self.check.check_func_kwargs:
105+
field_value = self.check.check_func_kwargs["expression"]
106+
elif self.check.check_func_args:
107+
field_value = self.check.check_func_args[0]
108+
else:
109+
return None # should never happen, as it is validated for correct args when building rules
110+
111+
if self._is_invalid_column(field_value):
112+
return field_value
113+
return None
114+
97115
def process(self) -> DQCheckResult:
98116
"""
99117
Process the data quality rule (check) and return results as DQCheckResult containing:
@@ -151,6 +169,14 @@ def _get_invalid_cols_message(self) -> str:
151169
f"Check evaluation skipped due to invalid check filter: '{self.check.filter}'"
152170
)
153171

172+
if self.invalid_sql_expression:
173+
logger.warning(
174+
f"Skipping check '{self.check.name}' due to invalid sql expression: '{self.invalid_sql_expression}'"
175+
)
176+
invalid_cols_message_parts.append(
177+
f"Check evaluation skipped due to invalid sql expression: '{self.invalid_sql_expression}'"
178+
)
179+
154180
invalid_cols_message = "; ".join(invalid_cols_message_parts)
155181

156182
return invalid_cols_message
@@ -164,7 +190,8 @@ def _is_invalid_column(self, column: str | Column) -> bool:
164190
col_expr = F.expr(column) if isinstance(column, str) else column
165191
_ = self.df.select(col_expr).schema # perform logical plan validation without triggering computation
166192
except AnalysisException as e:
167-
# if column is not accessible or column expression cannot be evaluated, an AnalysisException is thrown
193+
# If column is not accessible or column expression cannot be evaluated, an AnalysisException is thrown.
194+
# Note: This does not cover all error conditions. Some issues only appear during a Spark action.
168195
logger.debug(
169196
f"Invalid column '{column}' provided in the check '{self.check.name}'",
170197
exc_info=e,

tests/conftest.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import os
22
from datetime import timedelta
3+
from io import BytesIO
34
from typing import Any
45
import re
56
import logging
@@ -32,7 +33,7 @@
3233

3334
@pytest.fixture(scope="session")
3435
def debug_env_name():
35-
return "ws2" # Specify the name of the debug environment from ~/.databricks/debug-env.json
36+
return "ws" # Specify the name of the debug environment from ~/.databricks/debug-env.json
3637

3738

3839
@pytest.fixture
@@ -613,7 +614,7 @@ def create(**kwargs):
613614
folder = make_directory()
614615
volume_file_path = str(folder.absolute()) + "/checks.yaml"
615616

616-
ws.files.upload(volume_file_path, checks_yaml_content.encode(), overwrite=True)
617+
ws.files.upload(volume_file_path, BytesIO(checks_yaml_content.encode()), overwrite=True)
617618

618619
return volume_file_path
619620

@@ -632,7 +633,7 @@ def create(**kwargs):
632633
folder = make_directory()
633634
volume_file_path = str(folder.absolute()) + "/checks.json"
634635

635-
ws.files.upload(volume_file_path, checks_json_content.encode(), overwrite=True)
636+
ws.files.upload(volume_file_path, BytesIO(checks_json_content.encode()), overwrite=True)
636637

637638
return volume_file_path
638639

@@ -651,7 +652,7 @@ def create(**kwargs):
651652
folder = make_directory()
652653
volume_file_path = str(folder.absolute()) + "/checks.yaml"
653654

654-
ws.files.upload(volume_file_path, checks_yaml_invalid_content.encode(), overwrite=True)
655+
ws.files.upload(volume_file_path, BytesIO(checks_yaml_invalid_content.encode()), overwrite=True)
655656

656657
return volume_file_path
657658

@@ -670,7 +671,7 @@ def create(**kwargs):
670671
folder = make_directory()
671672
volume_file_path = str(folder.absolute()) + "/checks.json"
672673

673-
ws.files.upload(volume_file_path, checks_json_invalid_content.encode(), overwrite=True)
674+
ws.files.upload(volume_file_path, BytesIO(checks_json_invalid_content.encode()), overwrite=True)
674675

675676
return volume_file_path
676677

tests/integration/test_apply_checks.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7899,6 +7899,16 @@ def test_apply_checks_skip_checks_with_missing_columns(ws, spark):
78997899
columns=["missing_col"],
79007900
filter="missing_col > 0",
79017901
),
7902+
# invalid sql expression column
7903+
DQRowRule(
7904+
name="invalid_col_sql_expression",
7905+
criticality="error",
7906+
check_func=check_funcs.sql_expression,
7907+
check_func_args=["missing_col > 0"], # verify validation works when using args
7908+
check_func_kwargs={
7909+
"msg": "missing_col is less than 0",
7910+
},
7911+
),
79027912
]
79037913

79047914
checked = dq_engine.apply_checks(test_df, checks)
@@ -7933,7 +7943,8 @@ def test_apply_checks_skip_checks_with_missing_columns(ws, spark):
79337943
},
79347944
{
79357945
"name": "missing_col_sql_expression",
7936-
"message": "Check evaluation skipped due to invalid check columns: ['missing_col']",
7946+
"message": "Check evaluation skipped due to invalid check columns: ['missing_col']; "
7947+
"Check evaluation skipped due to invalid sql expression: 'missing_col > 0'",
79377948
"columns": ["missing_col"],
79387949
"filter": None,
79397950
"function": "sql_expression",
@@ -7950,6 +7961,15 @@ def test_apply_checks_skip_checks_with_missing_columns(ws, spark):
79507961
"run_time": RUN_TIME,
79517962
"user_metadata": {},
79527963
},
7964+
{
7965+
"name": "invalid_col_sql_expression",
7966+
"message": "Check evaluation skipped due to invalid sql expression: 'missing_col > 0'",
7967+
"columns": None,
7968+
"filter": None,
7969+
"function": "sql_expression",
7970+
"run_time": RUN_TIME,
7971+
"user_metadata": {},
7972+
},
79537973
],
79547974
[
79557975
{
@@ -8050,6 +8070,17 @@ def test_apply_checks_by_metadata_skip_checks_with_missing_columns(ws, spark):
80508070
"arguments": {"columns": ["missing_col"]},
80518071
},
80528072
},
8073+
{
8074+
"name": "invalid_col_sql_expression",
8075+
"criticality": "error",
8076+
"check": {
8077+
"function": "sql_expression",
8078+
"arguments": {
8079+
"expression": "missing_col > 0",
8080+
"msg": "missing_col is less than 0",
8081+
},
8082+
},
8083+
},
80538084
]
80548085

80558086
checked = dq_engine.apply_checks_by_metadata(test_df, checks)
@@ -8084,7 +8115,8 @@ def test_apply_checks_by_metadata_skip_checks_with_missing_columns(ws, spark):
80848115
},
80858116
{
80868117
"name": "missing_col_sql_expression",
8087-
"message": "Check evaluation skipped due to invalid check columns: ['missing_col']",
8118+
"message": "Check evaluation skipped due to invalid check columns: ['missing_col']; "
8119+
"Check evaluation skipped due to invalid sql expression: 'missing_col > 0'",
80888120
"columns": ["missing_col"],
80898121
"filter": None,
80908122
"function": "sql_expression",
@@ -8101,6 +8133,15 @@ def test_apply_checks_by_metadata_skip_checks_with_missing_columns(ws, spark):
81018133
"run_time": RUN_TIME,
81028134
"user_metadata": {},
81038135
},
8136+
{
8137+
"name": "invalid_col_sql_expression",
8138+
"message": "Check evaluation skipped due to invalid sql expression: 'missing_col > 0'",
8139+
"columns": None,
8140+
"filter": None,
8141+
"function": "sql_expression",
8142+
"run_time": RUN_TIME,
8143+
"user_metadata": {},
8144+
},
81048145
],
81058146
[
81068147
{

0 commit comments

Comments
 (0)