Skip to content
4 changes: 2 additions & 2 deletions .github/workflows/run-operators-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
test:
name: python ${{ matrix.python-version }}
runs-on: ubuntu-latest
timeout-minutes: 60
timeout-minutes: 180

strategy:
fail-fast: false
Expand All @@ -49,7 +49,7 @@ jobs:
name: "Test config setup"

- name: "Run Operators Tests"
timeout-minutes: 60
timeout-minutes: 180
shell: bash
run: |
set -x # print commands that are executed
Expand Down
8 changes: 7 additions & 1 deletion ads/opctl/operator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
import click
import fsspec
import yaml
import logging
from ads.opctl.operator.common.utils import default_signer
from ads.common.auth import AuthType
from ads.common.object_storage_details import ObjectStorageDetails
from ads.opctl.constants import BACKEND_NAME, RUNTIME_TYPE
from ads.opctl.decorator.common import click_options, with_auth, with_click_unknown_args
from ads.opctl.utils import suppress_traceback
from ads.opctl import logger

from .__init__ import __operators__
from .cmd import run as cmd_run
Expand Down Expand Up @@ -311,10 +313,14 @@ def publish_conda(debug: bool, **kwargs: Dict[str, Any]) -> None:
@click.pass_context
@with_click_unknown_args
@with_auth
def run(ctx: click.core.Context, debug: bool, **kwargs: Dict[str, Any]) -> None:
def run(ctx: click.core.Context, debug: bool = False, **kwargs: Dict[str, Any]) -> None:
"""
Runs the operator with the given specification on the targeted backend.
"""
if debug:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.CRITICAL)
operator_spec = {}
backend = kwargs.pop("backend")

Expand Down
4 changes: 2 additions & 2 deletions ads/opctl/operator/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from .common.errors import (
OperatorCondaNotFoundError,
OperatorImageNotFoundError,
OperatorSchemaYamlError,
InvalidParameterError,
)
from .common.operator_loader import _operator_info_list

Expand Down Expand Up @@ -415,7 +415,7 @@ def verify(
run_name="verify",
)
operator_module.get("verify")(config, **kwargs)
except OperatorSchemaYamlError as ex:
except InvalidParameterError as ex:
logger.debug(ex)
raise ValueError(
f"The operator's specification is not valid for the `{operator_info.type}` operator. "
Expand Down
3 changes: 2 additions & 1 deletion ads/opctl/operator/common/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
from ads.opctl.operator import __operators__


class OperatorSchemaYamlError(Exception):
class InvalidParameterError(Exception):
"""Exception raised when there is an issue with the schema."""

def __init__(self, error: str):
super().__init__(
"Invalid operator specification. Check the YAML structure and ensure it "
Expand Down
4 changes: 2 additions & 2 deletions ads/opctl/operator/common/operator_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from ads.common.serializer import DataClassSerializable

from ads.opctl.operator.common.utils import OperatorValidator
from ads.opctl.operator.common.errors import OperatorSchemaYamlError
from ads.opctl.operator.common.errors import InvalidParameterError


@dataclass(repr=True)
Expand Down Expand Up @@ -65,7 +65,7 @@ def _validate_dict(cls, obj_dict: Dict) -> bool:
result = validator.validate(obj_dict)

if not result:
raise OperatorSchemaYamlError(json.dumps(validator.errors, indent=2))
raise InvalidParameterError(json.dumps(validator.errors, indent=2))
return True

@classmethod
Expand Down
20 changes: 0 additions & 20 deletions ads/opctl/operator/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,23 +172,3 @@ def default_signer(**kwargs):
from ads.common.auth import default_signer

return default_signer(**kwargs)


def human_time_friendly(seconds):
TIME_DURATION_UNITS = (
("week", 60 * 60 * 24 * 7),
("day", 60 * 60 * 24),
("hour", 60 * 60),
("min", 60),
)
if seconds == 0:
return "inf"
accumulator = []
for unit, div in TIME_DURATION_UNITS:
amount, seconds = divmod(float(seconds), div)
if amount > 0:
accumulator.append(
"{} {}{}".format(int(amount), unit, "" if amount == 1 else "s")
)
accumulator.append("{} secs".format(round(seconds, 2)))
return ", ".join(accumulator)
6 changes: 4 additions & 2 deletions ads/opctl/operator/lowcode/anomaly/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@
from ads.opctl.operator.common.const import ENV_OPERATOR_ARGS
from ads.opctl.operator.common.utils import _parse_input_args

from .model.anomaly_dataset import AnomalyDatasets
from .model.anomaly_dataset import AnomalyDatasets, AnomalyData
from .operator_config import AnomalyOperatorConfig


def operate(operator_config: AnomalyOperatorConfig) -> None:
"""Runs the anomaly detection operator."""
from .model.factory import AnomalyOperatorModelFactory

datasets = AnomalyDatasets(operator_config)
datasets = AnomalyDatasets(operator_config.spec)
datasets2 = AnomalyData(operator_config.spec)
print(f"d1: {datasets.data}\n\n d2: {datasets2.data}")
AnomalyOperatorModelFactory.get_model(operator_config, datasets).generate_report()


Expand Down
4 changes: 4 additions & 0 deletions ads/opctl/operator/lowcode/anomaly/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

from ads.common.extended_enum import ExtendedEnumMeta
from ads.opctl.operator.lowcode.common.const import DataColumns


class SupportedModels(str, metaclass=ExtendedEnumMeta):
Expand Down Expand Up @@ -44,6 +45,7 @@ class TODSSubModels(str, metaclass=ExtendedEnumMeta):
TODSSubModels.KNN: "KNNSKI",
}


class SupportedMetrics(str, metaclass=ExtendedEnumMeta):
UNSUPERVISED_UNIFY95 = "unsupervised_unify95"
UNSUPERVISED_UNIFY95_LOG_LOSS = "unsupervised_unify95_log_loss"
Expand Down Expand Up @@ -75,9 +77,11 @@ class SupportedMetrics(str, metaclass=ExtendedEnumMeta):
MEDIAN_MCC = "Median MCC"
ELAPSED_TIME = "Elapsed Time"


class OutputColumns(str, metaclass=ExtendedEnumMeta):
ANOMALY_COL = "anomaly"
SCORE_COL = "score"
Series = DataColumns.Series


TODS_DEFAULT_MODEL = "ocsvm"
158 changes: 87 additions & 71 deletions ads/opctl/operator/lowcode/anomaly/model/anomaly_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,73 +4,91 @@
# Copyright (c) 2023 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

from ..operator_config import AnomalyOperatorConfig
from .. import utils
from ads.opctl.operator.common.utils import default_signer
from ..operator_config import AnomalyOperatorSpec
from ads.opctl.operator.lowcode.common.utils import (
default_signer,
load_data,
merge_category_columns,
)
from ads.opctl.operator.lowcode.common.data import AbstractData
from ads.opctl.operator.lowcode.common.data import AbstractData
from ads.opctl.operator.lowcode.anomaly.utils import get_frequency_of_datetime
from ads.opctl import logger
import pandas as pd
from ads.opctl.operator.lowcode.anomaly.const import OutputColumns


class AnomalyData(AbstractData):
def __init__(self, spec: AnomalyOperatorSpec):
super().__init__(spec=spec, name="input_data")


class TestData(AbstractData):
def __init__(self, spec: AnomalyOperatorSpec):
super().__init__(spec=spec, name="test_data")


class AnomalyDatasets:
def __init__(self, config: AnomalyOperatorConfig):
def __init__(self, spec: AnomalyOperatorSpec):
"""Instantiates the DataIO instance.

Properties
----------
config: AnomalyOperatorConfig
The anomaly operator configuration.
spec: AnomalyOperatorSpec
The anomaly operator spec.
"""
self.original_user_data = None
self.data = None
self.test_data = None
self.target_columns = None
self.full_data_dict = None
self._load_data(config.spec)

def _load_data(self, spec):
"""Loads anomaly input data."""

self.data = utils._load_data(
filename=spec.input_data.url,
format=spec.input_data.format,
storage_options=default_signer(),
columns=spec.input_data.columns,
)
self.original_user_data = self.data.copy()
date_col = spec.datetime_column.name
self.data[date_col] = pd.to_datetime(self.data[date_col])
try:
spec.freq = utils.get_frequency_of_datetime(self.data, spec)
except TypeError as e:
logger.warn(
f"Error determining frequency: {e.args}. Setting Frequency to None"
)
logger.debug(f"Full traceback: {e}")
spec.freq = None

if spec.target_category_columns is None:
if spec.target_column is None:
target_col = [
col
for col in self.data.columns
if col not in [spec.datetime_column.name]
]
spec.target_column = target_col[0]
self.full_data_dict = {spec.target_column: self.data}
else:
# Merge target category columns

self.data["__Series__"] = utils._merge_category_columns(self.data, spec.target_category_columns)
unique_categories = self.data["__Series__"].unique()
self.full_data_dict = dict()

for cat in unique_categories:
data_by_cat = (
self.data[self.data["__Series__"] == cat].drop(spec.target_category_columns + ["__Series__"],
axis=1)
)
self.full_data_dict[cat] = data_by_cat
self._data = AnomalyData(spec)
self.data = self._data.get_data_long()
# self.test_data = None
# self.target_columns = None
self.full_data_dict = self._data.get_dict_by_series()
# self._load_data(spec)

# def _load_data(self, spec):
# """Loads anomaly input data."""
# try:
# self.data = load_data(
# filename=spec.input_data.url,
# format=spec.input_data.format,
# columns=spec.input_data.columns,
# )
# except InvalidParameterError as e:
# e.args = e.args + ("Invalid Parameter: input_data",)
# raise e
# date_col = spec.datetime_column.name
# self.data[date_col] = pd.to_datetime(self.data[date_col])
# try:
# spec.freq = get_frequency_of_datetime(self.data, spec)
# except TypeError as e:
# logger.warn(
# f"Error determining frequency: {e.args}. Setting Frequency to None"
# )
# logger.debug(f"Full traceback: {e}")
# spec.freq = None

# if spec.target_category_columns is None:
# if spec.target_column is None:
# target_col = [
# col
# for col in self.data.columns
# if col not in [spec.datetime_column.name]
# ]
# spec.target_column = target_col[0]
# self.full_data_dict = {spec.target_column: self.data}
# else:
# # Merge target category columns

# self.data[OutputColumns.Series] = merge_category_columns(
# self.data, spec.target_category_columns
# )
# unique_categories = self.data[OutputColumns.Series].unique()
# self.full_data_dict = dict()

# for cat in unique_categories:
# data_by_cat = self.data[self.data[OutputColumns.Series] == cat].drop(
# spec.target_category_columns + [OutputColumns.Series], axis=1
# )
# self.full_data_dict[cat] = data_by_cat


class AnomalyOutput:
Expand All @@ -93,11 +111,7 @@ def get_inliers_by_cat(self, category: str, data: pd.DataFrame):
inlier_indices = anomaly.index[anomaly[OutputColumns.ANOMALY_COL] == 0]
inliers = data.iloc[inlier_indices]
if scores is not None and not scores.empty:
inliers = pd.merge(
inliers,
scores,
on=self.date_column,
how='inner')
inliers = pd.merge(inliers, scores, on=self.date_column, how="inner")
return inliers

def get_outliers_by_cat(self, category: str, data: pd.DataFrame):
Expand All @@ -106,11 +120,7 @@ def get_outliers_by_cat(self, category: str, data: pd.DataFrame):
outliers_indices = anomaly.index[anomaly[OutputColumns.ANOMALY_COL] == 1]
outliers = data.iloc[outliers_indices]
if scores is not None and not scores.empty:
outliers = pd.merge(
outliers,
scores,
on=self.date_column,
how='inner')
outliers = pd.merge(outliers, scores, on=self.date_column, how="inner")
return outliers

def get_inliers(self, data):
Expand All @@ -120,9 +130,12 @@ def get_inliers(self, data):
inliers = pd.concat(
[
inliers,
self.get_inliers_by_cat(
category, data[data['__Series__'] == category].reset_index(drop=True).drop('__Series__', axis=1)
)
self.get_inliers_by_cat(
category,
data[data[OutputColumns.Series] == category]
.reset_index(drop=True)
.drop(OutputColumns.Series, axis=1),
),
],
axis=0,
ignore_index=True,
Expand All @@ -137,8 +150,11 @@ def get_outliers(self, data):
[
outliers,
self.get_outliers_by_cat(
category, data[data['__Series__'] == category].reset_index(drop=True).drop('__Series__', axis=1)
)
category,
data[data[OutputColumns.Series] == category]
.reset_index(drop=True)
.drop(OutputColumns.Series, axis=1),
),
],
axis=0,
ignore_index=True,
Expand Down
Loading