チュートリアル2:Snowpark Container Servicesジョブサービスを作成する

概要

チュートリアル共通セットアップ を完了すると、ジョブサービスを作成する準備が整います。このチュートリアルでは、Snowflakeに接続し、 SQL SELECT クエリを実行し、結果をテーブルに保存する単純なジョブサービスを作成します。

このチュートリアルには2つのパートがあります。

パート1: ジョブサービスを作成してテストする。 このチュートリアルで提供されるコードをダウンロードし、ステップバイステップの手順に従います。

  1. このチュートリアルのジョブサービスコードをダウンロードします。

  2. Snowpark Container Services用のDockerイメージをビルドし、アカウントのリポジトリにイメージをアップロードします。

  3. Snowflakeにコンテナ構成情報を提供するサービス仕様ファイルをステージします。仕様ファイルには、コンテナの起動に使用するイメージ名に加えて、以下の3つの引数を指定します: SELECT クエリ、クエリを実行する仮想ウェアハウス、および結果を保存するテーブル名。

  4. ジョブサービスを実行します。EXECUTE JOBSERVICE コマンドを使用して、仕様ファイルとSnowflakeがコンテナを実行できるコンピューティングプールを指定すると、ジョブサービスを実行できます。そして最後に、サービスの結果を確認します。

パート2: ジョブサービスコードを理解する。このセクションでは、ジョブサービスコードの概要を説明し、さまざまなコンポーネントがどのように連携しているかを明らかにします。

1: ジョブサービスコードのダウンロード

ジョブサービスを作成するためのコード(Pythonアプリケーション)が提供されます。

  1. ダウンロード: SnowparkContainerServices -Tutorials.zip </samples/spcs/SnowparkContainerServices-Tutorials.zip> をダウンロードします。

  2. ファイルを解凍します。チュートリアルごとに1つのディレクトリが含まれています。Tutorial-2 ディレクトリには以下のファイルがあります。

    • main.py

    • Dockerfile

    • my_job_spec.yaml

2: イメージをビルドしてアップロードする

Snowpark Container Servicesがサポートするlinux/amd64プラットフォーム用のイメージをビルドし、アカウントのイメージリポジトリにイメージをアップロードします(共通セットアップ を参照)。

イメージをビルドしてアップロードする前に、リポジトリに関する情報(リポジトリ URL とレジストリのホスト名)が必要です。詳細については、 レジストリおよびリポジトリ をご参照ください。

リポジトリに関する情報を取得する

  1. リポジトリ URL を取得するには、 SHOW IMAGE REPOSITORIES SQL コマンドを実行します。

    SHOW IMAGE REPOSITORIES; 
    Copy
    • 出力の repository_url 列は、 URL を提供します。以下に例を示します。

      <orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository 
    • リポジトリ URL のホスト名はレジストリのホスト名です。以下に例を示します。

      <orgname>-<acctname>.registry.snowflakecomputing.com 

イメージをビルドし、リポジトリにアップロードする

  1. ターミナルウィンドウを開き、解凍したファイルのあるディレクトリに移動します。

  2. Dockerイメージをビルドするには、Docker CLI を使用して以下の docker build コマンドを実行します。このコマンドは、イメージのビルドに使用するファイルの PATH として、現在の作業ディレクトリ(.)を指定していることに注意してください。

    docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> . 
    Copy
    • image_name には、 my_job_image:latest を使用します。

    docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest . 
    Copy
  3. Snowflakeアカウントのリポジトリにイメージをアップロードします。Dockerがあなたの代わりにリポジトリにイメージをアップロードするには、まず レジストリでDockerを認証する 必要があります。

    1. Dockerがあなたの代わりにリポジトリにイメージをアップロードするには、まず レジストリでDockerを認証する 必要があります。

      1. の使用をお勧めします。 Snowflake CLI して、SnowflakeアカウントのイメージレジストリでローカルDockerインスタンスを認証します。が構成されていることを確認してください Snowflake CLI Snowflakeに接続するための。詳細については、 Snowflake CLI の構成とSnowflake への接続 をご参照ください。

      2. 認証するには、次を実行します Snowflake CLI コマンド:

        snow spcs image-registry login 
        Copy
    2. イメージをアップロードするには、以下のコマンドを実行します。

      docker push <repository_url>/<image_name> 
      Copy

      docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_job_image:latest 
      Copy

3: 仕様ファイルをステージする

  • サービス仕様ファイル(my_job_spec.yaml)をステージにアップロードするには、以下のオプションのいずれかを使用します。

    • Snowsightウェブインターフェイス: 手順については、 ローカルファイルに対する内部ステージの選択 をご参照ください。

    • SnowSQL CLI: 次の PUT コマンドを実行します。

      PUT file://<file-path>[/\]my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE; 
      Copy

      例:

      • Linuxまたは macOS

        PUT file:///tmp/my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE; 
        Copy
      • Windows

        PUT file://C:\temp\my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE; 
        Copy

      相対パスを指定することもできます。

      PUT file://./my_job_spec.yaml @tutorial_stage AUTO_COMPRESS=FALSE OVERWRITE=TRUE; 
      Copy

      このコマンドは OVERWRITE=TRUE を設定するため、必要な場合(例: 仕様ファイルのエラーを修正した場合)は、ファイルを再度アップロードすることができます。PUT コマンドが正常に実行されると、アップロードされたファイルに関する情報がプリントアウトされます。

4: ジョブサービスを実行する

これでジョブを作成する準備が整いました。

  1. ジョブサービスを開始するには、 EXECUTE JOBSERVICE コマンドを実行します。

    EXECUTE JOB SERVICE IN COMPUTE POOL tutorial_compute_pool NAME=tutorial_2_job_service FROM @tutorial_stage SPEC='my_job_spec.yaml'; 
    Copy

    次の点に注意してください。

    • FROM と SPEC は、ステージ名とジョブサービス仕様ファイル名を提供します。ジョブサービスが実行されると、 SQL ステートメントが実行され、 my_job_spec.yaml で指定されたテーブルに結果が保存されます。

      SQL ステートメントは、Dockerコンテナ内では実行されません。代わりに、実行中のコンテナーはSnowflakeに接続し、Snowflakeウェアハウスで SQL ステートメントを実行します。

    • COMPUTE_POOL は、Snowflakeがジョブサービスを実行するコンピューティングリソースを提供します。

    • オプションで QUERY_WAREHOUSE パラメーターを指定すると、 SQL ステートメントを実行するコンテナーのデフォルトウェアハウスを指定できます。しかし、このチュートリアルのジョブサービスコードでは、ウェアハウスを定義する環境変数が指定されているため、前述のコマンドではデフォルトを省略しています。

    • EXECUTE JOBSERVICE は、次のサンプル出力に示すように、ジョブ名を含む出力を返します。

      +------------------------------------------------------------------------------------+ | status | -------------------------------------------------------------------------------------+ | Job TUTORIAL_2_JOB_SERVICE completed successfully with status: DONE. | +------------------------------------------------------------------------------------+ 
  2. このジョブサービスは単純なクエリを実行し、結果を結果テーブルに保存します。ジョブサービス正常に完了したことは、結果テーブルをクエリすると確認できます。

    SELECT * FROM results; 
    Copy

    サンプル出力:

    +----------+-----------+ | TIME | TEXT | |----------+-----------| | 10:56:52 | hello | +----------+-----------+ 
  3. ジョブサービスの実行をデバッグしたい場合は、 SHOW SERVICE CONTAINERS IN SERVICE を実行して、ジョブサービスがまだ実行されているかどうか、起動に失敗したかどうか、失敗した場合はなぜ失敗したのかを調べます。また、コードが有用なログを標準出力または標準エラーに出力すると想定し、 SYSTEM$GET_SERVICE_LOGS を使用してログにアクセスできます。

    1. ジョブ・サービスのステータスを取得するには、 SHOW SERVICE CONTAINERS IN SERVICE を実行します。

      SHOW SERVICE CONTAINERS IN SERVICE tutorial_2_job_service; 
      Copy

      サンプル出力:

      +---------------+-------------+------------------------+-------------+----------------+--------+------------------------+----------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------+---------------+------------+ | database_name | schema_name | service_name | instance_id | container_name | status | message | image_name | image_digest | restart_count | start_time | |---------------+-------------+------------------------+-------------+----------------+--------+------------------------+----------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------+---------------+------------| | TUTORIAL_DB | DATA_SCHEMA | TUTORIAL_2_JOB_SERVICE | 0 | main | DONE | Completed successfully | myorg-myacct.registry.snowflakecomputing.com/tutorial_db/tutorial_db/data_schema/tutorial_repository/my_job_image:latest | sha256:aa3fa2e5c1552d16904a5bbc97d400316ebb4a608bb110467410485491d9d8d0 | 0 | | +---------------+-------------+------------------------+-------------+----------------+--------+------------------------+----------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------+---------------+------------+ 
    2. ジョブサービスのログ情報を取得するには、システム関数 SYSTEM$GET_SERVICE_LOGS を使用します。

      SELECT SYSTEM$GET_SERVICE_LOGS('tutorial_2_job_service', 0, 'main') 
      Copy
      job-tutorial - INFO - Job started job-tutorial - INFO - Connection succeeded. Current session context: database="TUTORIAL_DB", schema="DATA_SCHEMA", warehouse="TUTORIAL_WAREHOUSE", role="TEST_ROLE" job-tutorial - INFO - Executing query [select current_time() as time,'hello'] and writing result to table [results] job-tutorial - INFO - Job finished 

5: クリーンアップする

チュートリアル3 に進む予定がない場合は、作成した請求対象リソースを削除する必要があります。詳細については、 チュートリアル3 のステップ5をご参照ください。

6: ジョブサービスコードを見直す

このセクションでは、以下のトピックを取り上げます。

提供されたファイルの調査

チュートリアルの最初にダウンロードしたzipファイルには、以下のファイルが含まれています。

  • main.py

  • Dockerfile

  • my_job_spec.yaml

このセクションでは、コードの概要を説明します。

main.pyファイル

#!/opt/conda/bin/python3 import argparse import logging import os import sys from snowflake.snowpark import Session from snowflake.snowpark.exceptions import * # Environment variables below will be automatically populated by Snowflake. SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT") SNOWFLAKE_HOST = os.getenv("SNOWFLAKE_HOST") SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE") SNOWFLAKE_SCHEMA = os.getenv("SNOWFLAKE_SCHEMA") # Custom environment variables SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER") SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD") SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE") SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE") def get_arg_parser():  """  Input argument list.  """ parser = argparse.ArgumentParser() parser.add_argument("--query", required=True, help="query text to execute") parser.add_argument( "--result_table", required=True, help="name of the table to store result of query specified by flag --query") return parser def get_logger():  """  Get a logger for local logging.  """ logger = logging.getLogger("job-tutorial") logger.setLevel(logging.DEBUG) handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.DEBUG) formatter = logging.Formatter("%(name)s - %(levelname)s - %(message)s") handler.setFormatter(formatter) logger.addHandler(handler) return logger def get_login_token():  """  Read the login token supplied automatically by Snowflake. These tokens  are short lived and should always be read right before creating any new connection.  """ with open("/snowflake/session/token", "r") as f: return f.read() def get_connection_params():  """  Construct Snowflake connection params from environment variables.  """ if os.path.exists("/snowflake/session/token"): return { "account": SNOWFLAKE_ACCOUNT, "host": SNOWFLAKE_HOST, "authenticator": "oauth", "token": get_login_token(), "warehouse": SNOWFLAKE_WAREHOUSE, "database": SNOWFLAKE_DATABASE, "schema": SNOWFLAKE_SCHEMA } else: return { "account": SNOWFLAKE_ACCOUNT, "host": SNOWFLAKE_HOST, "user": SNOWFLAKE_USER, "password": SNOWFLAKE_PASSWORD, "role": SNOWFLAKE_ROLE, "warehouse": SNOWFLAKE_WAREHOUSE, "database": SNOWFLAKE_DATABASE, "schema": SNOWFLAKE_SCHEMA } def run_job():  """  Main body of this job.  """ logger = get_logger() logger.info("Job started") # Parse input arguments args = get_arg_parser().parse_args() query = args.query result_table = args.result_table # Start a Snowflake session, run the query and write results to specified table with Session.builder.configs(get_connection_params()).create() as session: # Print out current session context information. database = session.get_current_database() schema = session.get_current_schema() warehouse = session.get_current_warehouse() role = session.get_current_role() logger.info( f"Connection succeeded. Current session context: database={database}, schema={schema}, warehouse={warehouse}, role={role}" ) # Execute query and persist results in a table. logger.info( f"Executing query [{query}] and writing result to table [{result_table}]" ) res = session.sql(query) # If the table already exists, the query result must match the table scheme. # If the table does not exist, this will create a new table. res.write.mode("append").save_as_table(result_table) logger.info("Job finished") if __name__ == "__main__": run_job() 
Copy

コードで、

  • Pythonコードが main で実行され、次に run_job() 関数が実行されます。

    if __name__ == "__main__": run_job() 
    Copy
  • run_job() 関数は環境変数を読み取り、それを使用してさまざまなパラメーターのデフォルト値を設定します。コンテナーは、これらのパラメーターを使用してSnowflakeに接続します。次に注意してください。

    • サービス仕様の containers.env および containers.args フィールドを使用して、サービスで使用されるパラメーター値を上書きできます。詳細については、 サービス仕様リファレンス をご参照ください。

    • イメージがSnowflakeで実行されると、Snowflakeはこれらのパラメーターの一部(ソースコードを参照)を自動的に入力します。しかし、イメージをローカルでテストする場合は、これらのパラメーターを明示的に指定する必要があります(次のセクション ローカルでのイメージの構築とテスト に示すように)。

Dockerfile

このファイルには、Dockerを使用してイメージを構築するためのすべてのコマンドが含まれています。

ARG BASE_IMAGE=continuumio/miniconda3:4.12.0 FROM $BASE_IMAGE RUN conda install python=3.9 && \  conda install snowflake-snowpark-python COPY main.py ./ ENTRYPOINT ["python3", "main.py"] 
Copy

my_job_spec.yamlファイル(サービス仕様)

Snowflakeは、この仕様で提供された情報を使用して、ジョブサービスを構成および実行します。

spec:  containers:  - name: main  image: /tutorial_db/data_schema/tutorial_repository/my_job_image:latest  env:  SNOWFLAKE_WAREHOUSE: tutorial_warehouse  args:  - "--query=select current_time() as time,'hello'"  - "--result_table=results" 
Copy

container.namecontainer.image の必須フィールド(サービス仕様リファレンス を参照)に加えて、この仕様には引数をリストするためのオプションの container.args フィールドがあります。

  • --query は、サービス実行時に実行するクエリを提供します。

  • --result_table は、クエリ結果を保存するテーブルを識別します。

ローカルでのイメージの構築とテスト

Snowflakeアカウントのリポジトリにアップロードする前に、ローカルでDockerイメージをテストすることができます。ローカルテストでは、コンテナはスタンドアロンで実行されます(Snowflakeが実行するジョブサービスではありません)。

以下のステップを使用して、チュートリアル2Dockerイメージをテストします。

  1. Dockerイメージを作成するには、Docker CLIで docker build コマンドを実行します。

    docker build --rm -t my_service:local . 
    Copy
  2. コードを起動するには、 docker run コマンドを実行し、 <組織名>-<アカウント名><ユーザー名><パスワード> を提供します。

    docker run --rm \  -e SNOWFLAKE_ACCOUNT=<orgname>-<acctname> \  -e SNOWFLAKE_HOST=<orgname>-<acctname>.snowflakecomputing.com \  -e SNOWFLAKE_DATABASE=tutorial_db \  -e SNOWFLAKE_SCHEMA=data_schema \  -e SNOWFLAKE_ROLE=test_role \  -e SNOWFLAKE_USER=<username> \  -e SNOWFLAKE_PASSWORD=<password> \  -e SNOWFLAKE_WAREHOUSE=tutorial_warehouse \  my_job:local \  --query="select current_time() as time,'hello'" \  --result_table=tutorial_db.data_schema.results 
    Copy

    ローカルでイメージをテストする場合は、3つの引数(クエリ、クエリを実行するウェアハウス、結果を保存するテーブル)に加えて、ローカルで実行するコンテナーがSnowflakeに接続するための接続パラメーターも提供することに注意してください。

    コンテナをサービスとして実行すると、Snowflakeはこれらのパラメーターを環境変数としてコンテナに提供します。詳細については、 Snowflakeクライアントを構成する をご参照ください。

    ジョブサービスはクエリ(select current_time() as time,'hello')を実行し、結果をテーブル(tutorial_db.data_schema.results)に書き込みます。テーブルが存在しない場合は作成されます。テーブルが存在する場合、ジョブサービスは行を追加します。

    結果テーブルのクエリ結果の例:

    +----------+----------+ | TIME | TEXT | |----------+----------| | 10:56:52 | hello | +----------+----------+ 

次の内容

これで チュートリアル3 をテストし、サービス間の通信方法を表示することができます。