PythonでSnowflake関数とストアドプロシージャを管理する

Pythonを使用して、Snowflakeでユーザー定義関数(UDFs)とストアドプロシージャを管理することができます。UDF またはプロシージャを記述する場合は、サポートされているハンドラー言語のいずれかでそのロジックを記述し、 Snowflake Python APIs を使用して作成します。UDFs とプロシージャの詳細については、 関数とプロシージャによるSnowflakeの拡張 をご参照ください。

前提条件

このトピックの例では、Snowflakeと接続するコードを追加して Root オブジェクトを作成し、そこからSnowflake Python Snowflake Python APIs を使用することを想定しています。

たとえば、以下のコードでは、構成ファイルで定義された接続パラメーターを使用してSnowflakeへの接続を作成します。

from snowflake.core import Root from snowflake.snowpark import Session session = Session.builder.config("connection_name", "myconnection").create() root = Root(session) 
Copy

出来上がった Session オブジェクトを使って、コードは API のタイプとメソッドを使う Root オブジェクトを作成します。詳細については、 Snowflake Python APIs によるSnowflakeへの接続 をご参照ください。

ユーザー定義関数の管理(UDFs)

ユーザー定義関数(UDFs)を記述して、Snowflakeが提供する組み込みのシステム定義関数では利用できない操作を実行するようにシステムを拡張できます。UDF を作成すると、何度でも再利用できます。詳細については、 ユーザー定義関数の概要 をご参照ください。

注釈

API を使った UDFs の呼び出しは現在サポートされていません。

Snowflake Python APIs は、 UDFs を2つの別々のタイプで表します。

  • UserDefinedFunction: 名前、引数のリスト、戻り値の型、関数の定義など、 UDF のプロパティを公開します。

  • UserDefinedFunctionResource: 対応する UserDefinedFunction オブジェクトを取得し、 UDF の名前を変更し、 UDF をドロップするために使用できるメソッドを公開します。

UDF の作成

UDF を作成するには、まず UserDefinedFunction オブジェクトを作成して、 API Root オブジェクトから UserDefinedFunctionCollection オブジェクトを作成します。 UserDefinedFunctionCollection.create を使って、新しい UDF をSnowflakeに追加します。

UDF を作成するときは、次のサポートされている言語の1つでコードが記述されているハンドラーを指定します。

Python

次の例のコードは、 my_db データベースと my_schema スキーマの my_python_function という UDF を表す UserDefinedFunction オブジェクトを、指定された引数、戻り値型、言語、 UDF Python 定義で作成します。

from snowflake.core.user_defined_function import ( PythonFunction, ReturnDataType, UserDefinedFunction ) function_of_python = UserDefinedFunction( "my_python_function", arguments=[], return_type=ReturnDataType(datatype="VARIANT"), language_config=PythonFunction(runtime_version="3.9", packages=[], handler="udf"), body=""" def udf():  return {"key": "value"}  """, ) root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_python) 
Copy

Java

次の例のコードは、 my_db データベースと my_schema スキーマの my_java_function という名前の UDF を表す UserDefinedFunction オブジェクトを、指定された引数、戻り値型、言語、 UDF Java 定義で作成します。

from snowflake.core.user_defined_function import ( Argument, JavaFunction, ReturnDataType, UserDefinedFunction ) function_body = """  class TestFunc {  public static String echoVarchar(String x) {  return x;  }  } """ function_of_java = UserDefinedFunction( name="my_java_function", arguments=[Argument(name="x", datatype="STRING")], return_type=ReturnDataType(datatype="VARCHAR", nullable=True), language_config=JavaFunction( handler="TestFunc.echoVarchar", runtime_version="11", target_path=target_path, packages=[], called_on_null_input=True, is_volatile=True, ), body=function_body, comment="test_comment", ) root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_java) 
Copy

JavaScript

次の例のコードは、 my_db データベースと my_schema スキーマの my_javascript_function という名前の UDF を表す UserDefinedFunction オブジェクトを、指定された引数、戻り値型、言語、 UDF JavaScript 定義で作成します。

from snowflake.core.user_defined_function import ( Argument, ReturnDataType, JavaScriptFunction, UserDefinedFunction ) function_body = """  if (D <= 0) {  return 1;  } else {  var result = 1;  for (var i = 2; i <= D; i++) {  result = result * i;  }  return result;  } """ function_of_javascript = UserDefinedFunction( name="my_javascript_function", arguments=[Argument(name="d", datatype="DOUBLE")], return_type=ReturnDataType(datatype="DOUBLE"), language_config=JavaScriptFunction(), body=function_body, ) root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_javascript) 
Copy

Scala

次の例のコードは、 my_db データベースと my_schema スキーマの my_scala_function という UDF を表す UserDefinedFunction オブジェクトを、指定された引数、戻り値型、言語、 UDF Scala 定義で作成します。

from snowflake.core.user_defined_function import ( Argument, ReturnDataType, ScalaFunction, UserDefinedFunction ) function_body = """  class Echo {  def echoVarchar(x : String): String = {  return x  }  } """ function_of_scala = UserDefinedFunction( name="my_scala_function", arguments=[Argument(name="x", datatype="VARCHAR")], return_type=ReturnDataType(datatype="VARCHAR"), language_config=ScalaFunction( runtime_version="2.12", handler="Echo.echoVarchar", target_path=target_path, packages=[] ), body=function_body, comment="test_comment", ) root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_scala) 
Copy

SQL

次の例のコードは、 my_db データベースと my_schema スキーマの my_sql_function という名前の UDF を表す UserDefinedFunction オブジェクトを、指定された引数、戻り値型、言語、 UDF SQL 定義で作成します。

from snowflake.core.user_defined_function import ( ReturnDataType, SQLFunction, UserDefinedFunction ) function_body = """3.141592654::FLOAT""" function_of_sql = UserDefinedFunction( name="my_sql_function", arguments=[], return_type=ReturnDataType(datatype="FLOAT"), language_config=SQLFunction(), body=function_body, ) root.databases["my_db"].schemas["my_schema"].user_defined_functions.create(function_of_sql) 
Copy

UDF の詳細の取得

UserDefinedFunction オブジェクトを返す UserDefinedFunctionResource.fetch メソッドを呼び出すことで、 UDF に関する情報を取得できます。

次の例のコードは、 my_db データベースの my_javascript_function(DOUBLE) UDF と my_schema スキーマの情報を取得します。

注釈

UDF リソースオブジェクトを取得する場合、 UDFs がオーバーロードされる可能性があるため、完全なシグネチャ(UDF の名前とそのパラメーターのデータ型)を指定する必要があります。

my_udf = root.databases["my_db"].schemas["my_schema"].user_defined_functions["my_javascript_function(DOUBLE)"].fetch() print(my_udf.to_dict()) 
Copy

UDFs のリスト

UserDefinedFunction オブジェクトの PagedIter 反復子を返す UserDefinedFunctionCollection.iter メソッドを使用して、 UDFs を一覧表示することができます。

次の例のコードは、 my_db データベースと my_schema スキーマの my_java で始まる名前を持つ UDFs をリストアップし、それぞれの名前を表示します。

udf_iter = root.databases["my_db"].schemas["my_schema"].user_defined_functions.iter(like="my_java%") for udf_obj in udf_iter: print(udf_obj.name) 
Copy

UDF の名前の変更

UserDefinedFunctionResource オブジェクトで UDF の名前を変更できます。

次の例のコードは、 my_db データベースと my_schema スキーマにある my_javascript_function(DOUBLE) UDF リソース・オブジェクトを取得し、 UDF の名前を my_other_js_function に変更すると同時に、 my_other_db データベースと my_other_schema スキーマに移動します。

root.databases["my_db"].schemas["my_schema"].user_defined_functions["my_javascript_function(DOUBLE)"].rename( "my_other_js_function", target_database = "my_other_database", target_schema = "my_other_schema" ) 
Copy

UDF のドロップ

UserDefinedFunctionResource オブジェクトで UDF をドロップできます。

次の例のコードは、 my_javascript_function(DOUBLE) UDF リソース・オブジェクトを取得し、 UDF をドロップします。

my_udf_res = root.databases["my_db"].schemas["my_schema"].user_defined_functions["my_javascript_function(DOUBLE)"] my_udf_res.drop() 
Copy

ストアドプロシージャの管理

ストアドプロシージャを記述して、 SQL を実行するプロシージャ型コードでシステムを拡張できます。ストアドプロシージャでは、プログラムによる構成を使用して、分岐とループを実行できます。ストアドプロシージャを作成すると、何度でも再利用できます。詳細については、 ストアドプロシージャの概要 をご参照ください。

Snowflake Python APIs は、プロシージャを2つの別々のタイプで表します。

  • Procedure: プロシージャの名前、引数のリスト、戻り値の型、プロシージャの定義などのプロシージャのプロパティを公開します。

  • ProcedureResource: 対応する Procedure オブジェクトの取得、プロシージャの呼び出し、プロシージャのドロップに使用できるメソッドを公開します。

ストアドプロシージャの作成

プロシージャを作成するには、まず Procedure オブジェクトを作成して、 API Root オブジェクトから ProcedureCollection オブジェクトを作成します。 ProcedureCollection.create を使って、新しいプロシージャをSnowflakeに追加します。

次の例のコードは、 my_db データベースと my_schema スキーマの my_procedure という名前のプロシージャを表す Procedure オブジェクトを、指定された引数、戻り値型、 SQL プロシージャ定義で作成します。

from snowflake.core.procedure import Argument, ColumnType, Procedure, ReturnTable, SQLFunction procedure = Procedure( name="my_procedure", arguments=[Argument(name="id", datatype="VARCHAR")], return_type=ReturnTable( column_list=[ ColumnType(name="id", datatype="NUMBER"), ColumnType(name="price", datatype="NUMBER"), ] ), language_config=SQLFunction(), body="""  DECLARE  res RESULTSET DEFAULT (SELECT * FROM invoices WHERE id = :id);  BEGIN  RETURN TABLE(res);  END;  """, ) procedures = root.databases["my_db"].schemas["my_schema"].procedures procedures.create(procedure) 
Copy

プロシージャの呼び出し

ProcedureResource オブジェクトを使ってプロシージャを呼び出すことができます。

次の例のコードは、 my_procedure(VARCHAR) プロシージャ・リソース・オブジェクトを取得し、 CallArgumentList オブジェクトを作成し、その引数リストを使用してプロシージャを呼び出します。

注釈

プロシージャ・リソース・オブジェクトを取得する場合、プロシージャはオーバーロードされる可能性があるため、完全なシグネチャ(プロシージャ名とそのパラメーターのデータ型)を指定する必要があります。

from snowflake.core.procedure import CallArgument, CallArgumentList procedure_reference = root.databases["my_db"].schemas["my_schema"].procedures["my_procedure(VARCHAR)"] call_argument_list = CallArgumentList(call_arguments=[ CallArgument(name="id", datatype="VARCHAR", value="1"), ]) procedure_reference.call(call_argument_list=call_argument_list, extract=False) 
Copy

プロシージャの詳細の取得

Procedure オブジェクトを返す ProcedureResource.fetch メソッドを呼び出すことで、プロシージャに関する情報を取得できます。

次の例のコードは、 my_db データベースと my_schema スキーマの my_procedure(VARCHAR) プロシージャに関する情報を取得します。

my_procedure = root.databases["my_db"].schemas["my_schema"].procedures["my_procedure(VARCHAR)"].fetch() print(my_procedure.to_dict()) 
Copy

プロシージャのリスト

Procedure オブジェクトの PagedIter 反復子を返す ProcedureCollection.iter メソッドを使用して、プロシージャを一覧表示することができます。

次の例のコードは、 my_db データベースと my_schema スキーマの my で始まる名前のプロシージャをリストアップし、それぞれの名前を表示します。

procedure_iter = root.databases["my_db"].schemas["my_schema"].procedures.iter(like="my%") for procedure_obj in procedure_iter: print(procedure_obj.name) 
Copy

プロシージャのドロップ

ProcedureResource オブジェクトでプロシージャをドロップできます。

次の例のコードは、 my_db データベースと my_schema スキーマの my_procedure(VARCHAR) プロシージャ・リソース・オブジェクトを取得し、プロシージャをドロップします。

my_procedure_res = root.databases["my_db"].schemas["my_schema"].procedures["my_procedure(VARCHAR)"] my_procedure_res.drop() 
Copy