Openflow Connector for Kafkaを設定する

注釈

コネクタには、 コネクタ利用規約 が適用されます。

前提条件

  1. Openflow Connector for Kafka について を確認してください。

  2. Openflowの設定 - BYOC または Openflowの設定 - Snowflakeデプロイメント - タスク概要 があることを確認してください。

コネクタタイプ

Openflow Connector for Kafkaは3つの異なる構成で利用可能です。それぞれ特定のユースケースに最適化されています。これらのコネクタ定義は、コネクタギャラリーからダウンロードできます。

Apache Kafka for JSON データ形式

スキーマの進化とトピックからテーブルへのマッピングを備えた JSON メッセージ取り込み用の簡易コネクタ

Apache Kafka for AVRO データ形式

スキーマの進化とトピックからテーブルへのマッピングを備えた AVRO メッセージ取り込み用の簡易コネクタ

DLQ とメタデータを使用したApache Kafka

配信不能キュー(DLQ)サポート、メタデータ処理、機能パリティ :doc:`Kafka用Snowflakeコネクタ </user-guide/kafka-connector-overview>`のあるフル機能のコネクタ

特定のコネクタタイプの詳しい構成については、次をご参照ください。

どのコネクタを選ぶべきでしょうか?

データ形式、運用要件、機能のニーズに最適なコネクタバリアントを選択します。

用のApache Kafka JSON または AVRO データ形式 の場合に選択します:

  • Kafkaメッセージは JSON または AVRO 形式です

  • 基本的なスキーマ進化機能が必要です

  • 最小限の構成によるシンプルな設定が必要です

  • 高度なエラー処理や配信不能キュー機能は不要です

  • 新しい統合を設定し、すぐに開始したい場合

形式固有の考慮事項:

  • JSON 形式さまざまなデータ構造に対してより柔軟で、デバッグと検査が容易です

  • AVRO 形式組み込みスキーマレジストリ統合による強力な型付けデータ、構造化データパイプライン向け

次の場合、:doc:` Apache Kafka DLQ およびメタデータ </user-guide/data-integration/openflow/connectors/kafka/kafka-dlq-metadata>` を選択してください:

  • Kafka用Snowflakeコネクタから へ移行中であり、互換性のある機能を備えた機能パラメーターが必要です。

  • 失敗メッセージには堅牢なエラー処理と配信不能キューのサポートが必要です

  • メッセージのインジェスチョンに関する詳細なメタデータ(タイムスタンプ、オフセット、ヘッダー)が必要です。

移行に関する考慮事項

現在、Kafka用のSnowflakeコネクタを使用している場合は、機能の互換性を備えたシームレスな移行体験のため、**Apache Kafka with DLQ and metadata**コネクタを選択してください 。

フィールド名処理の違い:Kafka用Openflowコネクタは、Kafka用Snowflakeコネクタとは異なる方法で、フィールド名の特殊文字を処理します。移行後、Kafka用Openflowコネクタは、これらの命名規則の違いにより、異なる名前の新しいSnowflake列を作成する場合があります。フィールド名の変換方法の詳細については、 フィールド名のマッピングと特殊文字の処理 をご参照ください。

パフォーマンスの考慮事項

  • JSON および AVRO 形式のコネクタは、合理化された設計により、単純なユースケースのパフォーマンスが向上します

  • その DLQ および メタデータコネクタは、わずかに高いリソース使用コストで、より包括的なモニターとエラー処理を提供します。

Snowflakeアカウントを設定する

Snowflakeアカウント管理者として、以下のタスクを実行します。

  1. タイプを SERVICE として、新しいSnowflakeサービスユーザーを作成します。

  2. 新しいロールを作成するか、既存のロールを使用して データベース権限 を付与します。

    コネクタには、宛先テーブルがまだ存在しない場合に自動的に作成する機能があるため、ユーザーがSnowflakeオブジェクトの作成および管理に必要な権限を持っていることを確認してください。

    オブジェクト

    権限

    メモ

    データベース

    USAGE

    スキーマ

    USAGE . CREATE TABLE .

    スキーマレベルのオブジェクトが作成された後に、 CREATE object 権限を取り消すことができます。

    テーブル

    OWNERSHIP

    Kafkaコネクタを使用してデータを 既存の テーブルに取り込む場合にのみ必要です。. コネクタがKafkaトピックの記録の新しいターゲットテーブルを作成する場合、構成で指定されたユーザーのデフォルトのロールがテーブル所有者になります。

    Snowflakeは、アクセス制御を改善するために、各Kafkaインスタンスに個別のユーザーとロールを作成することをお勧めします。

    以下のスクリプトを使用して、カスタムロールを作成および構成できます(SECURITYADMIN または同等のロールが必要です)。

    USE ROLE securityadmin; CREATE ROLE kafka_connector_role_1; GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1; GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1; GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1; -- Only for existing tables GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kafka_connector_role_1; 
    Copy

    権限はコネクタロールに直接付与する必要があり、継承できないことに注意してください。

  3. Snowflakeサービスユーザーに、前の手順で作成したロールを付与します。

    ロールは、ユーザーの既定のロールとして割り当てる必要があります:

    GRANT ROLE kafka_connector_role_1 TO USER kafka_connector_user_1; ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1; 
    Copy
  4. ステップ1の Snowflake SERVICEユーザーに対して、キーペア認証 を構成します。

  5. Snowflakeではこの手順を強く推奨します。Openflowがサポートするシークレットマネージャ(AWS、Azure、Hashicorpなど)を構成し、公開キーと秘密キーを秘密ストアに格納します。

    注釈

    何らかの理由でシークレットマネージャを使用したくない場合は、組織のセキュリティポリシーに従って、キーペア認証に使用する公開キーと秘密キーファイルを保護する責任があります。

    1. シークレットマネージャを構成したら、その認証方法を決定します。AWS 上では、Openflowに関連付けられた EC2 インスタンスロールが推奨されます。こうすることで、他の秘密を永続化する必要がなくなるからです。

    2. Openflowで、右上のハンバーガーメニューから、このシークレットマネージャーに関連付けられたパラメータープロバイダーを構成します。Controller Settings » Parameter Provider に移動してから、パラメーター値を取得します。

    3. この時点で、すべての認証情報を関連するパラメーターパスで参照することができるため、機密性の高い値をOpenflow内で永続化する必要はありません。

  6. 他のSnowflakeユーザーが、コネクタによって取り込まれた生の取り込みドキュメントやとテーブルへのアクセスを必要とする場合は(Snowflakeでのカスタム処理のためなど)、それらのユーザーにステップ1で作成したロールを付与します。

コネクタを設定する

データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。

コネクタをインストールする

  1. Openflowの概要ページに移動します。Featured connectors セクションで、 View more connectors を選択します。

  2. Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。

  3. Select runtime ダイアログで、 Available runtimes ドロップダウンリストからランタイムを選択します。

  4. Add を選択します。

    注釈

    コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。

  5. Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。

  6. Snowflakeアカウント認証情報でランタイムを認証します。

コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。

コネクタを構成する

  1. プロセスグループパラメーターを入力する

    1. インポートしたプロセスグループを右クリックし、 Parameters を選択します。

    2. 共通パラメーター の説明に従って、必要なパラメーター値を入力します。

共通パラメーター

すべてのKafkaコネクタのバリアントは、基本的な接続と認証のための共通パラメーターコンテキストを共有しています。

Snowflake宛先パラメーター

パラメーター

説明

必須

宛先データベース

データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。

有り

宛先スキーマ

データが永続化されるスキーマ。これはSnowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。

次の例をご参照ください。

  • CREATE SCHEMA SCHEMA_NAME または CREATE SCHEMA schema_name: SCHEMA_NAME を使用します

  • CREATE SCHEMA "schema_name" または CREATE SCHEMA "SCHEMA_NAME": それぞれ schema_name または SCHEMA_NAME を使用します

有り

Snowflakeアカウント識別子

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR: データが永続化される[organization-name]-[account-name]形式のSnowflakeアカウント名。

有り

Snowflake認証ストラテジー

以下を使用する場合:

  • Snowflake Openflow Deployment:SNOWFLAKE_SESSION_TOKEN を使用します。このトークンはSnowflakeによって自動的に管理されます。

  • BYOC: 認証戦略の値として KEY_PAIR を使用します。

有り

Snowflake秘密キー

以下を使用する場合:

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR:認証に使用される RSA プライベートキーである必要があります。

    その RSA キーは PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを持つ必要があります。SnowflakeプライベートキーファイルまたはSnowflakeプライベートキーのいずれかを定義する必要があることに注意してください。

無し

Snowflake秘密キーファイル

以下を使用する場合:

  • Session token authentication strategy:プライベートキーファイルは空白である必要があります。

  • KEY_PAIR:Snowflakeへの認証に使用される RSA プライベートキーを含むファイルをアップロードします。これは、PKCS8 標準に従ってフォーマットされ、標準の PEM ヘッダーとフッターを含んでいる必要があります。ヘッダー行は で始まります -----BEGIN PRIVATE.プライベートキーファイルをアップロードするには、Reference asset チェックボックスを選択します。

無し

Snowflake秘密キーパスワード

以下を使用する場合

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR: Snowflakeプライベートキーファイルに関連付けられたパスワードを提供します。

無し

Snowflakeロール

以下を使用する場合

  • Session Token Authentication Strategy:ランタイムロールを使用します。ランタイムの View Details に移動すると、Openflow UI でランタイムロールを見つけることができます。

  • KEY_PAIR Authentication Strategy:サービスユーザーのために構成された有効なロールを使用します。

有り

Snowflakeのユーザー名

以下を使用する場合

  • Session Token Authentication Strategy: 空白にする必要があります。

  • KEY_PAIR: Snowflakeインスタンスへの接続に使用するユーザー名を提供します。

有り

Snowflakeウェアハウス

クエリの実行に使用されるSnowflakeウェアハウス。

有り

Kafkaソースパラメーター(SASL 認証)

パラメーター

説明

必須

Kafkaセキュリティプロトコル

ブローカーとの通信に使用されるセキュリティプロトコル。Kafka Client security.protocolプロパティに対応します。次のいずれかです: SASL_PLAINTEXT / SASL_SSL

有り

Kafka SASL メカニズム

認証に使用する SASL メカニズム。Kafka Client sasl.mechanismプロパティに対応します。次のいずれかです: PLAIN / SCRAM-SHA-256 / SCRAM-SHA-512

有り

Kafka SASL ユーザー名

Kafkaを認証するためのユーザー名。

有り

Kafka SASL パスワード

Kafkaを認証するためのパスワード。

有り

Kafkaブートストラップサーバー

データをフェッチするKafkaブローカーのコンマ区切りリスト。kafka-broker:9092のように、ポートが含まれる必要があります。DLQ トピックにも同じインスタンスが使用されます。

有り

Kafka取り込みパラメーター

パラメーター

説明

必須

Kafkaトピック形式

次のいずれかです: names / pattern。提供される「Kafkaトピック」がコンマで区切られた名前のリストであるか、単一の正規表現であるかを指定します。

有り

Kafkaトピック

Kafkaトピックのコンマ区切りリスト、または正規表現。

有り

KafkaグループID

コネクタが使用するコンシューマーグループの ID。任意ですが、一意でなければなりません。

有り

Kafka自動オフセットリセット

Kafka auto.offset.reset プロパティに対応する以前のコンシューマーオフセットが見つからない場合に適用される自動オフセット構成。次のいずれかです: earliest / latest. Default: latest

有り

トピックからテーブルへのマッピング

このオプションのパラメーターにより、ユーザーはどのトピックをどのテーブルにマッピングするかを指定できます。各トピックとそのテーブル名はコロンで区切る必要があります(以下を参照)。このテーブル名は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。正規表現はあいまいであってはならず、一致するトピックは単一のターゲットテーブルのみに一致しなければなりません。空または一致するものがない場合は、トピック名がテーブル名として使用されます。注意: マッピングでは、コンマの後にスペースを含めることはできません。

無し

Topic To Table Map 値の例:

  • topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_range

  • topic[0-4]:low_range,topic[5-9]:high_range

  • .*:destination_table - すべてのトピックを destination_table にマッピングします

バリアント固有の設定を構成する

共通パラメーターを構成した後、選択したコネクタバリアントに固有の設定を構成する必要があります。

Apache Kafka for JSON データ形式 および Apache Kafka for AVRO データ形式 コネクタの場合:

JSON/AVRO 固有のパラメーターについては、 JSON/AVRO データ形式用Apache Kafka をご参照ください。

DLQ とメタデータ コネクタを持つ Apache Kafkaの場合:

DLQ 構成、スキーマ化の設定、Icebergテーブルのサポート、メッセージ形式オプションなどの高度なパラメーターについては、 DLQ とメタデータを使用したApache Kafka をご参照ください。

認証

Kafkaソースパラメーター(SASL 認証) で説明されているように、すべてのコネクタバリアントは、パラメーターコンテキストを介して構成された SASL 認証をサポートしています。

mTLS や AWS MSK IAM を含むその他の認証方法については、 Openflow Connector for Kafkaのその他の認証方法を構成する をご参照ください。

フローを実行する

  1. プレーンを右クリックし、 Enable all Controller Services をクリックします。

  2. プレーンを右クリックし、 Start をクリックします。コネクタがデータの取り込みを開始します。