PythonによるSnowflakeタスクとタスクグラフの管理

Pythonを使用してSnowflakeタスクを管理し、 SQL ステートメント、プロシージャ呼び出し、および Snowflakeスクリプト のロジックを実行できます。タスクの概要については、 タスクの紹介 をご参照ください。

Snowflake Python Snowflake Python APIs は、2つの別々のタイプでタスクを表します。

  • Task: スケジュール、パラメーター、先行タスクなどのタスクのプロパティを公開します。

  • TaskResource: 対応する Task オブジェクトのフェッチ、タスクの実行、タスクの変更に使用できるメソッドを公開します。

前提条件

このトピックの例では、Snowflakeと接続するコードを追加して Root オブジェクトを作成し、そこからSnowflake Python Snowflake Python APIs を使用することを想定しています。

たとえば、以下のコードでは、構成ファイルで定義された接続パラメーターを使用してSnowflakeへの接続を作成します。

from snowflake.core import Root from snowflake.snowpark import Session session = Session.builder.config("connection_name", "myconnection").create() root = Root(session) 
Copy

出来上がった Session オブジェクトを使って、コードは API のタイプとメソッドを使う Root オブジェクトを作成します。詳細については、 Snowflake Python APIs によるSnowflakeへの接続 をご参照ください。

タスクの作成

タスクを作成するには、まず Task オブジェクトを作成します。次に、タスクを作成するデータベースとスキーマを指定して TaskCollection オブジェクトを作成します。 TaskCollection.create を使用して、新しいタスクをSnowflakeに追加します。

次の例のコードは、 definition パラメーターで指定された SQL クエリを実行する my_task というタスクを表す Task オブジェクトを作成します。

from datetime import timedelta from snowflake.core.task import Task my_task = Task(name="my_task", definition="<sql query>", schedule=timedelta(hours=1)) tasks = root.databases['my_db'].schemas['my_schema'].tasks tasks.create(my_task) 
Copy

このコードは、 my_db データベースと my_schema スキーマから TaskCollection 変数 tasks を作成します。 TaskCollection.create を使用して、Snowflakeに新しいタスクを作成します。

このコード例では、タスクのスケジュールに1時間の timedelta 値も指定します。タスクのスケジュールは timedelta 値または Cron 式のいずれかを使用して定義できます。

Python関数やストアドプロシージャを実行するタスクを作成することもできます。次の例のコードは、 my_task2 というタスクを作成し、 StoredProcedureCall オブジェクトで表される関数を実行します。

from snowflake.core.task import StoredProcedureCall, Task my_task2 = Task( "my_task2", StoredProcedureCall( dosomething, stage_location="@mystage" ), warehouse="test_warehouse" ) tasks = root.databases['my_db'].schemas['my_schema'].tasks tasks.create(my_task2) 
Copy

このオブジェクトは dosomething という名前の関数を @mystage ステージのロケーションに指定します。 StoredProcedureCall オブジェクトでタスクを作成する場合は、 warehouse も指定する必要があります。

タスクの作成または更新

Task オブジェクトのプロパティを設定し、 TaskResource.create_or_alter メソッドに渡すことで、タスクが存在しない場合は作成し、存在する場合はタスク定義に従って変更することができます。 create_or_alter の動作はべき等であることを意図しています。つまり、メソッドを呼び出す前にタスクが存在したかどうかにかかわらず、結果として得られるタスクオブジェクトは同じになります。

注釈

create_or_alter メソッドは、明示的に定義していない Task プロパティのデフォルト値を使用します。例えば、 schedule を設定しない場合、そのタスクが以前別の値で存在していたとしても、その値はデフォルトで None になります。

次の例のコードは、 my_task タスクの定義とスケジュールを更新し、Snowflake 上のタスクを変更します。

from datetime import timedelta from snowflake.core.task import Task my_task = root.databases['my_db'].schemas['my_schema'].tasks['my_task'].fetch() my_task.definition = "<sql query 2>" my_task.schedule = timedelta(hours=2) my_task_res = root.databases['my_db'].schemas['my_schema'].tasks['my_task'] my_task_res.create_or_alter(my_task) 
Copy

タスクのリスト

TaskCollection.iter メソッドを使用してタスクを一覧表示できます。このメソッドは Task オブジェクトの PagedIter 反復子を返します。

次の例のコードは、名前が my で始まるタスクをリストします。

from snowflake.core.task import TaskCollection tasks: TaskCollection = root.databases['my_db'].schemas['my_schema'].tasks task_iter = tasks.iter(like="my%") # returns a PagedIter[Task] for task_obj in task_iter: print(task_obj.name) 
Copy

タスク操作の実行

タスクの実行、中断、再開といった一般的なタスク操作を TaskResource オブジェクトで行うことができます。

次の例のコードは、 my_task タスクを実行、中断、再開、ドロップします。

tasks = root.databases['my_db'].schemas['my_schema'].tasks task_res = tasks['my_task'] task_res.execute() task_res.suspend() task_res.resume() task_res.drop() 
Copy

タスクグラフでのタスクの管理

タスクグラフで集められたタスクを管理することができます。タスクグラフは、依存関係によってまとめられた単一のルートタスクと追加のタスクで構成された、一連のタスクです。

タスクグラフ内のタスクに関する詳細については、 タスクグラフで一連のタスクを作成 をご参照ください。

タスクグラフの作成

タスクグラフを作成するには、まず DAG オブジェクトを作成し、名前とスケジュールなどのオプションのプロパティを指定します。タスクグラフのスケジュールは、 timedelta 値または Cron 式のいずれかを使用して定義できます。

次の例のコードは、Python関数 dosomething を定義し、タスクグラフで dag_task2 という名前の DAGTask オブジェクトとして関数を指定します。

from snowflake.core.task import StoredProcedureCall from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation from snowflake.snowpark import Session from snowflake.snowpark.functions import sum as sum_ def dosomething(session: Session) -> None: df = session.table("target") df.group_by("a").agg(sum_("b")).save_as_table("agg_table") with DAG("my_dag", schedule=timedelta(days=1)) as dag: # Create a task that runs some SQL. dag_task1 = DAGTask( "dagtask1", "MERGE INTO target USING source_stream WHEN MATCHED THEN UPDATE SET target.v = source_stream.v" ) # Create a task that runs a Python function. dag_task2 = DAGTask( StoredProcedureCall( dosomething, stage_location="@mystage", packages=["snowflake-snowpark-python"] ), warehouse="test_warehouse" ) # Shift right and left operators can specify task relationships. dag_task1 >> dag_task2 # dag_task1 is a predecessor of dag_task2 schema = root.databases["my_db"].schemas["my_schema"] dag_op = DAGOperation(schema) dag_op.deploy(dag) 
Copy

このコードはまた、 SQL ステートメントを dag_task1 という別の DAGTask オブジェクトとして定義してから、 dag_task1dag_task2 の先行タスクとして指定します。最後に、 my_db データベースと my_schema スキーマでタスクグラフをSnowflakeに展開します。

cronスケジュール、タスクブランチ、関数の戻り値でタスクグラフを作成します。

また、タスクの戻り値として使用される指定されたcronスケジュール、タスクブランチ、関数の戻り値を持つタスクグラフを作成することもできます。

次の例のコードでは、 DAG オブジェクトを、そのスケジュールを指定する Cron オブジェクトで作成します。これは、 DAGTaskBranch オブジェクトを task1_branch という名前で他の DAGTask オブジェクトと一緒に定義し、それらの依存関係を指定します。

from snowflake.core._common import CreateMode from snowflake.core.task import Cron from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation, DAGTaskBranch from snowflake.snowpark import Session def task_handler(session: Session) -> None: pass # do something def task_branch_handler(session: Session) -> str: # do something return "task3" try: with DAG( "my_dag", schedule=Cron("10 * * * *", "America/Los_Angeles"), stage_location="@mystage", packages=["snowflake-snowpark-python"], use_func_return_value=True, ) as dag: task1 = DAGTask( "task1", task_handler, warehouse=test_warehouse, ) task1_branch = DAGTaskBranch("task1_branch", task_branch_handler, warehouse=test_warehouse) task2 = DAGTask("task2", task_handler, warehouse=test_warehouse) task3 = DAGTask("task3", task_handler, warehouse=test_warehouse, condition="1=1") task1 >> task1_branch task1_branch >> [task2, task3] schema = root.databases["my_db"].schemas["my_schema"] op = DAGOperation(schema) op.deploy(dag, mode=CreateMode.or_replace) finally: session.close() 
Copy

このコード例は、タスクハンドラー関数も定義し、タスクに割り当てられた指定のタスクハンドラーを持つ DAGTaskDAGTaskBranch の各オブジェクトを作成します。コードは DAG の use_func_return_value パラメーターを True に設定します。これはPython関数の戻り値を対応するタスクの戻り値として使うことを指定します。それ以外の場合、 use_func_return_value のデフォルト値は False となります。

タスクグラフにおけるタスクの戻り値の設定と取得

タスクの定義が StoredProcedureCall オブジェクトである場合、ストアドプロシージャ(または関数)のハンドラーは TaskContext オブジェクトを使用することで、タスクの戻り値を明示的に設定することができます。

詳細については、 SYSTEM$SET_RETURN_VALUE をご参照ください。

次の例のコードは、現在のセッションから TaskContext オブジェクトを context という名前で作成するタスクハンドラー関数を定義します。次に、 TaskContext.set_return_value メソッドを使用して、指定された文字列に戻り値を明示的に設定します。

from snowflake.core.task.context import TaskContext from snowflake.snowpark import Session def task_handler(session: Session) -> None: context = TaskContext(session) # this return value can be retrieved by successor Tasks. context.set_return_value("predecessor_return_value") 
Copy

タスクグラフで、直前のタスクをその先行タスクとして識別する直後のタスクは、先行タスクによって明示的に設定された戻り値を取得できます。

詳細については、 SYSTEM$GET_PREDECESSOR_RETURN_VALUE をご参照ください。

次の例のコードは、 TaskContext.get_predecessor_return_value メソッドを使用して pred_task_name という先行タスクの戻り値を取得するタスクハンドラー関数を定義します。

from snowflake.core.task.context import TaskContext from snowflake.snowpark import Session def task_handler(session: Session) -> None: context = TaskContext(session) pred_return_value = context.get_predecessor_return_value("pred_task_name") 
Copy