Openflow Connector for Kafkaを設定する¶
注釈
コネクタには、 コネクタ利用規約 が適用されます。
前提条件¶
Openflow Connector for Kafka について を確認してください。
Openflowの設定 - BYOC または Openflowの設定 - Snowflakeデプロイメント - タスク概要 があることを確認してください。
コネクタタイプ¶
Openflow Connector for Kafkaは3つの異なる構成で利用可能です。それぞれ特定のユースケースに最適化されています。これらのコネクタ定義は、コネクタギャラリーからダウンロードできます。
- Apache Kafka for JSON データ形式
スキーマの進化とトピックからテーブルへのマッピングを備えた JSON メッセージ取り込み用の簡易コネクタ
- Apache Kafka for AVRO データ形式
スキーマの進化とトピックからテーブルへのマッピングを備えた AVRO メッセージ取り込み用の簡易コネクタ
- DLQ とメタデータを使用したApache Kafka
配信不能キュー(DLQ)サポート、メタデータ処理、機能パリティ :doc:`Kafka用Snowflakeコネクタ </user-guide/kafka-connector-overview>`のあるフル機能のコネクタ
特定のコネクタタイプの詳しい構成については、次をご参照ください。
JSON/AVRO データ形式用Apache Kafka - JSON/AVRO データ形式コネクタ
DLQ とメタデータを使用したApache Kafka - DLQ およびメタデータコネクタ
どのコネクタを選ぶべきでしょうか?¶
データ形式、運用要件、機能のニーズに最適なコネクタバリアントを選択します。
用のApache Kafka JSON または AVRO データ形式 の場合に選択します:
Kafkaメッセージは JSON または AVRO 形式です
基本的なスキーマ進化機能が必要です
最小限の構成によるシンプルな設定が必要です
高度なエラー処理や配信不能キュー機能は不要です
新しい統合を設定し、すぐに開始したい場合
形式固有の考慮事項:
JSON 形式さまざまなデータ構造に対してより柔軟で、デバッグと検査が容易です
AVRO 形式組み込みスキーマレジストリ統合による強力な型付けデータ、構造化データパイプライン向け
次の場合、:doc:` Apache Kafka DLQ およびメタデータ </user-guide/data-integration/openflow/connectors/kafka/kafka-dlq-metadata>` を選択してください:
Kafka用Snowflakeコネクタから へ移行中であり、互換性のある機能を備えた機能パラメーターが必要です。
失敗メッセージには堅牢なエラー処理と配信不能キューのサポートが必要です
メッセージのインジェスチョンに関する詳細なメタデータ(タイムスタンプ、オフセット、ヘッダー)が必要です。
移行に関する考慮事項¶
現在、Kafka用のSnowflakeコネクタを使用している場合は、機能の互換性を備えたシームレスな移行体験のため、**Apache Kafka with DLQ and metadata**コネクタを選択してください 。
フィールド名処理の違い:Kafka用Openflowコネクタは、Kafka用Snowflakeコネクタとは異なる方法で、フィールド名の特殊文字を処理します。移行後、Kafka用Openflowコネクタは、これらの命名規則の違いにより、異なる名前の新しいSnowflake列を作成する場合があります。フィールド名の変換方法の詳細については、 フィールド名のマッピングと特殊文字の処理 をご参照ください。
パフォーマンスの考慮事項¶
JSON および AVRO 形式のコネクタは、合理化された設計により、単純なユースケースのパフォーマンスが向上します
その DLQ および メタデータコネクタは、わずかに高いリソース使用コストで、より包括的なモニターとエラー処理を提供します。
Snowflakeアカウントを設定する¶
Snowflakeアカウント管理者として、以下のタスクを実行します。
タイプを SERVICE として、新しいSnowflakeサービスユーザーを作成します。
新しいロールを作成するか、既存のロールを使用して データベース権限 を付与します。
コネクタには、宛先テーブルがまだ存在しない場合に自動的に作成する機能があるため、ユーザーがSnowflakeオブジェクトの作成および管理に必要な権限を持っていることを確認してください。
オブジェクト
権限
メモ
データベース
USAGE
スキーマ
USAGE . CREATE TABLE .
スキーマレベルのオブジェクトが作成された後に、 CREATE
object権限を取り消すことができます。テーブル
OWNERSHIP
Kafkaコネクタを使用してデータを 既存の テーブルに取り込む場合にのみ必要です。. コネクタがKafkaトピックの記録の新しいターゲットテーブルを作成する場合、構成で指定されたユーザーのデフォルトのロールがテーブル所有者になります。
Snowflakeは、アクセス制御を改善するために、各Kafkaインスタンスに個別のユーザーとロールを作成することをお勧めします。
以下のスクリプトを使用して、カスタムロールを作成および構成できます(SECURITYADMIN または同等のロールが必要です)。
USE ROLE securityadmin; CREATE ROLE kafka_connector_role_1; GRANT USAGE ON DATABASE kafka_db TO ROLE kafka_connector_role_1; GRANT USAGE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1; GRANT CREATE TABLE ON SCHEMA kafka_schema TO ROLE kafka_connector_role_1; -- Only for existing tables GRANT OWNERSHIP ON TABLE existing_table1 TO ROLE kafka_connector_role_1;
権限はコネクタロールに直接付与する必要があり、継承できないことに注意してください。
Snowflakeサービスユーザーに、前の手順で作成したロールを付与します。
ロールは、ユーザーの既定のロールとして割り当てる必要があります:
GRANT ROLE kafka_connector_role_1 TO USER kafka_connector_user_1; ALTER USER kafka_connector_user_1 SET DEFAULT_ROLE = kafka_connector_role_1;
ステップ1の Snowflake SERVICEユーザーに対して、キーペア認証 を構成します。
Snowflakeではこの手順を強く推奨します。Openflowがサポートするシークレットマネージャ(AWS、Azure、Hashicorpなど)を構成し、公開キーと秘密キーを秘密ストアに格納します。
注釈
何らかの理由でシークレットマネージャを使用したくない場合は、組織のセキュリティポリシーに従って、キーペア認証に使用する公開キーと秘密キーファイルを保護する責任があります。
シークレットマネージャを構成したら、その認証方法を決定します。AWS 上では、Openflowに関連付けられた EC2 インスタンスロールが推奨されます。こうすることで、他の秘密を永続化する必要がなくなるからです。
Openflowで、右上のハンバーガーメニューから、このシークレットマネージャーに関連付けられたパラメータープロバイダーを構成します。Controller Settings » Parameter Provider に移動してから、パラメーター値を取得します。
この時点で、すべての認証情報を関連するパラメーターパスで参照することができるため、機密性の高い値をOpenflow内で永続化する必要はありません。
他のSnowflakeユーザーが、コネクタによって取り込まれた生の取り込みドキュメントやとテーブルへのアクセスを必要とする場合は(Snowflakeでのカスタム処理のためなど)、それらのユーザーにステップ1で作成したロールを付与します。
コネクタを設定する¶
データエンジニアとして、以下のタスクを実行してコネクタをインストールおよび構成します。
コネクタをインストールする¶
Openflowの概要ページに移動します。Featured connectors セクションで、 View more connectors を選択します。
Openflowのコネクタページでコネクタを探し、 Add to runtime を選択します。
Select runtime ダイアログで、 Available runtimes ドロップダウンリストからランタイムを選択します。
Add を選択します。
注釈
コネクタをインストールする前に、コネクタが取り込んだデータを格納するためのデータベースとスキーマをSnowflakeで作成したことを確認します。
Snowflakeアカウント認証情報でデプロイメントを認証し、Snowflakeアカウントへのランタイムアプリケーションのアクセスを許可するよう求められたられたら、 Allow を選択します。コネクタのインストールプロセスは数分で完了します。
Snowflakeアカウント認証情報でランタイムを認証します。
コネクタプロセスグループが追加されたOpenflowキャンバスが表示されます。
コネクタを構成する¶
プロセスグループパラメーターを入力する
インポートしたプロセスグループを右クリックし、 Parameters を選択します。
共通パラメーター の説明に従って、必要なパラメーター値を入力します。
共通パラメーター¶
すべてのKafkaコネクタのバリアントは、基本的な接続と認証のための共通パラメーターコンテキストを共有しています。
Snowflake宛先パラメーター¶
パラメーター | 説明 | 必須 |
|---|---|---|
宛先データベース | データが永続化されるデータベース。Snowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。 | 有り |
宛先スキーマ | データが永続化されるスキーマ。これはSnowflakeにすでに存在している必要があります。名前は大文字と小文字を区別します。引用符で囲まれていない識別子の場合、名前を大文字で指定します。 次の例をご参照ください。
| 有り |
Snowflakeアカウント識別子 | 以下を使用する場合:
| 有り |
Snowflake認証ストラテジー | 以下を使用する場合:
| 有り |
Snowflake秘密キー | 以下を使用する場合:
| 無し |
Snowflake秘密キーファイル | 以下を使用する場合:
| 無し |
Snowflake秘密キーパスワード | 以下を使用する場合
| 無し |
Snowflakeロール | 以下を使用する場合
| 有り |
Snowflakeのユーザー名 | 以下を使用する場合
| 有り |
Snowflakeウェアハウス | クエリの実行に使用されるSnowflakeウェアハウス。 | 有り |
Kafkaソースパラメーター(SASL 認証)¶
パラメーター | 説明 | 必須 |
|---|---|---|
Kafkaセキュリティプロトコル | ブローカーとの通信に使用されるセキュリティプロトコル。Kafka Client security.protocolプロパティに対応します。次のいずれかです: SASL_PLAINTEXT / SASL_SSL | 有り |
Kafka SASL メカニズム | 認証に使用する SASL メカニズム。Kafka Client sasl.mechanismプロパティに対応します。次のいずれかです: PLAIN / SCRAM-SHA-256 / SCRAM-SHA-512 | 有り |
Kafka SASL ユーザー名 | Kafkaを認証するためのユーザー名。 | 有り |
Kafka SASL パスワード | Kafkaを認証するためのパスワード。 | 有り |
Kafkaブートストラップサーバー | データをフェッチするKafkaブローカーのコンマ区切りリスト。kafka-broker:9092のように、ポートが含まれる必要があります。DLQ トピックにも同じインスタンスが使用されます。 | 有り |
Kafka取り込みパラメーター¶
パラメーター | 説明 | 必須 |
|---|---|---|
Kafkaトピック形式 | 次のいずれかです: names / pattern。提供される「Kafkaトピック」がコンマで区切られた名前のリストであるか、単一の正規表現であるかを指定します。 | 有り |
Kafkaトピック | Kafkaトピックのコンマ区切りリスト、または正規表現。 | 有り |
KafkaグループID | コネクタが使用するコンシューマーグループの ID。任意ですが、一意でなければなりません。 | 有り |
Kafka自動オフセットリセット | Kafka | 有り |
トピックからテーブルへのマッピング | このオプションのパラメーターにより、ユーザーはどのトピックをどのテーブルにマッピングするかを指定できます。各トピックとそのテーブル名はコロンで区切る必要があります(以下を参照)。このテーブル名は、引用符で囲まれていないSnowflakeの有効な識別子である必要があります。正規表現はあいまいであってはならず、一致するトピックは単一のターゲットテーブルのみに一致しなければなりません。空または一致するものがない場合は、トピック名がテーブル名として使用されます。注意: マッピングでは、コンマの後にスペースを含めることはできません。 | 無し |
Topic To Table Map 値の例:
topic1:low_range,topic2:low_range,topic5:high_range,topic6:high_rangetopic[0-4]:low_range,topic[5-9]:high_range.*:destination_table- すべてのトピックを destination_table にマッピングします
バリアント固有の設定を構成する¶
共通パラメーターを構成した後、選択したコネクタバリアントに固有の設定を構成する必要があります。
- Apache Kafka for JSON データ形式 および Apache Kafka for AVRO データ形式 コネクタの場合:
JSON/AVRO 固有のパラメーターについては、 JSON/AVRO データ形式用Apache Kafka をご参照ください。
- DLQ とメタデータ コネクタを持つ Apache Kafkaの場合:
DLQ 構成、スキーマ化の設定、Icebergテーブルのサポート、メッセージ形式オプションなどの高度なパラメーターについては、 DLQ とメタデータを使用したApache Kafka をご参照ください。
認証¶
Kafkaソースパラメーター(SASL 認証) で説明されているように、すべてのコネクタバリアントは、パラメーターコンテキストを介して構成された SASL 認証をサポートしています。
mTLS や AWS MSK IAM を含むその他の認証方法については、 Openflow Connector for Kafkaのその他の認証方法を構成する をご参照ください。
フローを実行する¶
プレーンを右クリックし、 Enable all Controller Services をクリックします。
プレーンを右クリックし、 Start をクリックします。コネクタがデータの取り込みを開始します。