Paramétrez Openflow Connector for PostgreSQL

Note

Le connecteur est soumis aux conditions d’utilisation du connecteur.

Cette rubrique décrit les étapes pour paramétrer Openflow Connector for PostgreSQL.

Conditions préalables

  1. Assurez-vous d’avoir consulté À propos de Openflow Connector for PostgreSQL.

  2. Assurez-vous d’avoir pris connaissance des versions PostgreSQL prises en charge.

  3. Recommandé : Veillez à n’ajouter qu’une seule instance du connecteur par exécution.

  4. Assurez-vous d’avoir Configuration d’Openflow - BYOC ou Configuration d’Openflow - Déploiement Snowflake - Vue d’ensemble des tâches.

  5. En tant qu’administrateur de la base de données, effectuez les tâches suivantes :

    1. Configurer wal_level

    2. Créer une publication

    3. Assurez-vous qu’il y a suffisamment d’espace disque sur votre serveur PostgreSQL pour le fichier WAL. En effet, une fois créé, un emplacement de réplication fait en sorte que PostgreSQL conserve les données WAL de la position détenue par l’emplacement de réplication, jusqu’à ce que le connecteur confirme et avance cette position.

    4. Assurez-vous que chaque table activée pour la réplication possède une clé primaire. La clé peut être une colonne unique ou composite.

    5. Paramétrez l”IDENTITY REPLICA des tables sur DEFAULT. Cela garantit que les clés primaires sont représentées dans WAL, et que le connecteur peut les lire.

    6. Créez un utilisateur pour le connecteur. Le connecteur nécessite un utilisateur possédant l’attribut REPLICATION et les autorisations nécessaires pour SELECT à partir de chaque table répliquée. Créez cet utilisateur avec un mot de passe pour entrer dans la configuration du connecteur. Pour plus d’informations sur la sécurité de la réplication, voir Sécurité.

  6. En tant qu’administrateur de compte Snowflake, effectuez les tâches suivantes :

    1. Créez un utilisateur Snowflake avec le type SERVICE. Créez une base de données pour stocker les données répliquées et définissez les privilèges permettant à l’utilisateur de Snowflake de créer des objets dans cette base de données en lui accordant les privilèges USAGE et CREATE SCHEMA.

      CREATE DATABASE <destination_database>; CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow'; CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>; CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'MEDIUM' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>; 
      Copy
    2. Créez une paire de clés sécurisées (publiques et privées). Stockez la clé privée de l’utilisateur dans un fichier à utiliser lors de la configuration du connecteur. Attribuez la clé publique à l’utilisateur du service Snowflake :

      ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey'; 
      Copy

      Pour plus d’informations sur l’authentification par paire de clés, voir Authentification par paire de clés.

    3. Désignez un entrepôt que le connecteur doit utiliser. Commencez par la taille de l’entrepôt MEDIUM, puis expérimentez la taille en fonction de la quantité de tables répliquées et de la quantité de données transférées. Un grand nombre de tables s’adaptent généralement mieux avec les entrepôts multi-clusters, plutôt que la taille de l’entrepôt.

Configurer wal_level

Openflow Connector for PostgreSQL exige que wal_level soit défini sur logical.

En fonction de l’endroit où votre serveur PostgreSQL est hébergé, vous pouvez configurer wal_level comme suit :

Sur place

Exécutez la requête suivante avec le superutilisateur ou l’utilisateur titulaire du privilège ALTER SYSTEM :

ALTER SYSTEM SET wal_level = logical; 
Copy

RDS

L’utilisateur utilisé par l’agent doit disposer du rôle rds_superuser ou rds_replication.

Vous devez également définir les éléments suivants :

  • Le paramètre statique rds.logical_replication sur 1.

  • Les paramètres max_replication_slots, max_connections et max_wal_senders en fonction de la configuration de votre base de données et de votre réplication.

AWS Aurora

Définissez le paramètre statique rds.logical_replication sur 1.

GCP

Définissez les indicateurs suivants :

  • cloudsql.logical_decoding=on.

  • cloudsql.enable_pglogical=on.

Pour plus d’informations, voir la documentation Google Cloud.

Azure

Définissez la prise en charge de la réplication sur Logical. Pour plus d’informations, voir la documentation Azure.

Créer une publication

Openflow Connector for PostgreSQL exige qu’une publication https://www.postgresql.org/docs/current/logical-replication-publication.html#LOGICAL-REPLICATION-PUBLICATION soit créée et configurée dans PostgreSQL avant le début de la réplication. Vous pouvez la créer pour toutes les tables ou un sous-ensemble de tables, ainsi que pour des tables spécifiques comportant uniquement des colonnes déterminées. Assurez-vous que toutes les tables et colonnes que vous prévoyez de faire répliquer sont incluses dans la publication. Vous pouvez également modifier la publication ultérieurement, pendant que le connecteur fonctionne. Pour créer et configurer une publication, procédez comme suit :

  1. Connectez-vous en tant qu’utilisateur avec le privilège CREATE sur la base de données et exécutez la requête suivante :

    CREATE PUBLICATION <publication name>; 
    Copy
  2. Définissez les tables que l’agent de la base de données pourra voir en utilisant :

ALTER PUBLICATION <publication name> ADD TABLE <table name>; 
Copy

Important

PostgreSQL 15 et les versions ultérieures prennent en charge la configuration des publications pour un sous-ensemble spécifié de colonnes de table. Pour que le connecteur prenne cela en charge correctement, vous devez utiliser les paramètres de filtrage des colonnes afin d’inclure les mêmes colonnes que celles définies dans la publication.

Sans ce paramètre, le connecteur se comportera comme suit :

  • Dans la table de destination, les colonnes qui ne sont pas incluses dans le filtre comporteront le suffixe __DELETED. Toutes les données répliquées lors de la phase instantanée seront conservées.

  • Une fois que vous avez ajouté de nouvelles colonnes à la publication, la table sera définitivement en état d’échec et vous devrez redémarrer sa réplication. réplication.

Pour plus d’informations, voir ALTER PUBLICATION.

Installer le connecteur

  1. Naviguez jusqu’à la page d’aperçu d’Openflow. Dans la section Featured connectors, sélectionnez View more connectors.

  2. Sur la page des connecteurs Openflow, trouvez le connecteur et sélectionnez Add to runtime.

  3. Dans la boîte de dialogue Select runtime, sélectionnez votre environnement d’exécution dans la liste déroulante Available runtimes.

  4. Sélectionnez Add.

    Note

    Avant d’installer le connecteur, assurez-vous que vous avez créé une base de données et un schéma dans Snowflake pour que le connecteur puisse stocker les données ingérées.

  5. Authentifiez-vous au déploiement avec les identifiants de votre compte Snowflake et sélectionnez Allow lorsque vous êtes invité à autoriser l’application d’exécution à accéder à votre compte Snowflake. Le processus d’installation du connecteur prend quelques minutes.

  6. Authentifiez-vous auprès de l’environnement d’exécution avec les identifiants de votre compte Snowflake.

Le canevas Openflow apparaît avec le groupe de processus du connecteur ajouté.

Configuration du connecteur

Vous pouvez configurer le connecteur pour les cas d’utilisation suivants :

Répliquer un ensemble de tables en temps réel

  1. Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Parameters.

  2. Renseignez les valeurs des paramètres requis comme décrit dans Paramètres de débit.

Paramètres de débit

Commencez par définir les paramètres du contexte des paramètres source PostgreSQL, puis du contexte des paramètres de destination PostgreSQL. Une fois cela fait, vous pouvez activer le connecteur, qui devrait se connecter à la fois à PostgreSQL et à Snowflake et commencer à fonctionner. Toutefois, il ne répliquera aucune donnée tant que des tables n’auront pas été explicitement ajoutées à sa configuration.

Pour configurer des tables spécifiques pour la réplication, modifiez le contexte des paramètres d’ingestion PostgreSQL. Peu de temps après avoir appliqué les modifications au contexte des paramètres de réplication, la configuration sera reprise par le connecteur et le cycle de vie de la réplication commencera pour chaque table.

Contexte des paramètres source PostgreSQL

Paramètre

Description

URL de connexion Postgres

L’adresse complète URL JDBC de la base de données source. Exemple : jdbc:postgresql://example.com:5432/public

Si vous vous connectez au serveur de réplique PostgreSQL, consultez Répliquer des tables à partir d’un serveur de réplique PostgreSQL.

Pilote Postgres JDBC

Le chemin vers les fichiers jar du pilote PostgreSQL JDBC. Téléchargez le jar depuis son site web, puis cochez la case Reference asset pour le télécharger et le joindre.

Mode Postgres SSL

Activez ou désactivez les connexions SSL.

Certificat racine Postgres SSL

Le contenu complet du certificat racine de la base de données. Facultatif si SSL désactivé.

Nom d’utilisateur Postgres

Le nom d’utilisateur du connecteur.

Mot de passe Postgres

Le mot de passe du connecteur.

Contexte des paramètres de destination PostgreSQL

Paramètre

Description

Obligatoire

Base de données de destination

La base de données dans laquelle les données seront conservées. Elle doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules.

Oui

Schéma de destination

Le schéma dans lequel les données seront conservées, qui doit déjà exister dans Snowflake. Le nom est sensible à la casse. Pour les identificateurs sans guillemets, indiquez le nom en majuscules.

Voir l’exemple suivant :

  • CREATE SCHEMA SCHEMA_NAME ou CREATE SCHEMA schema_name : utilisez SCHEMA_NAME.

  • CREATE SCHEMA "schema_name" ou CREATE SCHEMA "SCHEMA_NAME" : utilisez schema_name ou SCHEMA_NAME, respectivement.

Oui

Identificateur de compte Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : doit être vide.

  • KEY_PAIR : nom du compte Snowflake au format [nom-organisation]-[nom-compte] où les données seront conservées.

Oui

Stratégie d’authentification Snowflake

Lorsque vous utilisez :

  • Déploiement Snowflake Openflow : Utilisez SNOWFLAKE_SESSION_TOKEN. Ce jeton est géré automatiquement par Snowflake.

  • BYOC : utilisez KEY_PAIR comme valeur pour la stratégie d’authentification.

Oui

Clé privée de Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : doit être vide.

  • KEY_PAIR : Doit correspondre à la clé privée RSA utilisée pour l’authentification.

    La clé RSA doit être formatée conformément aux normes PKCS8 et posséder des en-têtes et des pieds de page PEM standards. Notez que le fichier de la clé privée Snowflake ou que la clé privée Snowflake doit être défini.

Non

Fichier de clé privée de Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : Le fichier de la clé privée doit être vide.

  • KEY_PAIR : Chargez le fichier qui contient la clé privée RSA utilisée pour l’authentification auprès de Snowflake, formatée conformément aux normes PKCS8 et possédant des en-têtes et des pieds de page PEM standards. La ligne d’en-tête commence par -----BEGIN PRIVATE. Pour charger le fichier de la clé privée, cochez la case Reference asset.

Non

Mot de passe de la clé privée de Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : doit être vide.

  • KEY_PAIR : fournissez le mot de passe associé au fichier de la clé privée Snowflake.

Non

Rôle Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : Utilisez votre rôle d’exécution. Vous pouvez trouver votre rôle d’exécution dans l’UI d’Openflow, en naviguant jusqu’à View Details pour votre exécution.

  • Stratégie d’authentification KEY_PAIR : Utilisez un rôle valide configuré pour votre utilisateur de service.

Oui

Nom d’utilisateur Snowflake

Lorsque vous utilisez :

  • Stratégie d’authentification par jeton de session : doit être vide.

  • KEY_PAIR : indiquez le nom d’utilisateur utilisé pour vous connecter à l’instance Snowflake.

Oui

Entrepôt Snowflake

Entrepôt Snowflake utilisé pour exécuter des requêtes.

Oui

Contexte des paramètres d’ingestion PostgreSQL

Paramètre

Description

Noms des tables incluses

Une liste de chemins d’accès aux tables séparés par des virgules, y compris leurs schémas. Exemple : public.my_table, other_schema.other_table.

Sélectionnez les tables par nom ou par Regex. Si vous utilisez les deux, toutes les tables correspondantes de l’une ou l’autre option seront incluses.

Table incluse Regex

Une expression régulière à associer aux chemins de la table. Chaque chemin correspondant à l’expression sera répliqué, et les nouvelles tables correspondant au modèle qui seront créées ultérieurement seront également incluses automatiquement. Exemple : public\.auto_.*

Sélectionnez les tables par nom ou par Regex. Si vous utilisez les deux, toutes les tables correspondantes de l’une ou l’autre option seront incluses.

Filtre de colonne JSON

En option. Un JSON contenant une liste de noms de tables complets et un modèle regex pour les noms de colonnes à inclure dans la réplication. Exemple : [ {"schema":"public", "table":"table1", "includedPattern":".*name"} ] inclura toutes les colonnes qui se terminent par name dans la table1 du schéma public.

Fusionner la planification des tâches CRON

Expression CRON définissant les périodes au cours desquelles les opérations de fusion du journal vers la table de destination seront déclenchées. Paramétrez cet élément sur * * * * * ? si vous souhaitez une fusion continue ou une planification pour limiter la durée d’exécution de l’entrepôt.

Par exemple :

  • La chaîne * 0 * * * ? indique que vous souhaitez planifier des fusions à l’heure pleine pendant une minute

  • La chaîne * 20 14 ? * MON-FRI indique que vous souhaitez planifier les fusions à 14h20 tous les lundis et vendredis.

Pour plus d’informations et d’exemples, consultez le tutoriel sur les déclencheurs cron dans la documentation de Quartz

Répliquer des tables à partir d’un serveur de réplique PostgreSQL

Le connecteur peut ingérer des données à partir d’un serveur principal, d’une réplique de substitution active, ou d’un serveur d’abonné utilisant la réplication logique. Avant de configurer le connecteur pour qu’il se connecte à une réplique PostgreSQL, assurez-vous que la réplication entre les nœuds principaux et les nœuds de réplique fonctionne correctement. Lorsque vous enquêtez sur des problèmes liés à des données manquantes dans le connecteur, assurez-vous d’abord que les lignes manquantes sont présentes dans le serveur de réplique utilisé par le connecteur.

Considérations supplémentaires lors de la connexion à une réplique de substitution :

  • Seule la connexion à une réplique de substitution active est prise en charge. Notez que les répliques de substitution actives ne peuvent pas accepter de connexions provenant de clients tant qu’elles n’ont pas été promues à une instance primaire.

  • La version PostgreSQL du serveur doit être >= 16.

  • La publication requise par le connecteur doit être créée sur le serveur principal, et non sur le serveur de substitution. Le serveur de substitution est en lecture seule et ne permet pas de créer de publication.

Si vous vous connectez à une instance de substitution active et que vous voyez Tentative de création de l’emplacement de réplication, cela signifie que “<replication slot>” a expiré. Si vous vous connectez à une instance de substitution, assurez-vous qu’il existe un trafic sur l’instance primaire PostgreSQL. Sinon, l’appel à créer un emplacement de réplication ne renverra jamais de réponse. Si vous voyez une erreur ` dans le bulletin Openflow, ou si le processeur :ui:`Read PostgreSQL CDC Stream ne démarre pas, connectez-vous à l’instance PostgreSQL principale et exécutez la requête suivante :

SELECT pg_log_standby_snapshot(); 
Copy

L’erreur se produit lorsqu’il n’y a pas de modifications de données dans le serveur principal. Ainsi, le connecteur peut se bloquer lors de la création d’un emplacement de réplication sur le serveur de réplique. Cela est lié au fait que le serveur de réplique nécessite des informations sur les transactions en cours d’exécution du serveur principal pour pouvoir créer un emplacement de réplication. Les serveurs principaux n’enverront pas les informations lorsqu’ils seront inactifs. La fonction pg_log_standby_snapshot() force le serveur principal à envoyer des informations sur les transactions en cours d’exécution au serveur de réplique.

Supprimez et ajoutez à nouveau une table à la réplication

Pour supprimer une table de la réplication, assurez-vous qu’elle est supprimée des paramètres Noms des tables incluses ou Table incluse Regex dans le contexte des paramètres de réplication.

Si vous souhaitez réajuster la table à la réplication ultérieurement, supprimez d’abord la table de destination correspondante dans Snowflake. Ensuite, ajoutez à nouveau la table aux paramètres Noms des tables incluses ou Table incluse Regex. Cela permet de garantir que le processus de réplication démarre à nouveau pour la table.

Cette approche peut également être utilisée pour récupérer un scénario de réplication de table qui a échoué.

Répliquer un sous-ensemble de colonnes dans une table

Le connecteur peut filtrer les données répliquées par table sur un sous-ensemble de colonnes configurées.

Pour appliquer des filtres aux colonnes, modifiez la propriété Filtre de colonne dans le contexte Paramètres de réplication, en ajoutant un tableau de configurations, une entrée pour chaque table à laquelle vous souhaitez appliquer un filtre.

Les colonnes peuvent être incluses ou exclues par nom ou par modèle. Vous pouvez appliquer une seule condition par table ou combiner plusieurs conditions, les exclusions ayant toujours la priorité sur les inclusions.

L’exemple suivant montre les champs disponibles. schema et table sont obligatoires, puis un ou plusieurs des champs included, excluded, includedPattern, excludedPattern sont requis.

[  {  "schema": "<source table schema>",  "table" : "<source table name>",  "included": ["<column name>", "<column name>"],  "excluded": ["<column name>", "<column name>"],  "includedPattern": "<regular expression>",  "excludedPattern": "<regular expression>",  } ] 
Copy

Suivre les changements de données dans les tables

Le connecteur réplique non seulement l’état actuel des données des tables sources, mais aussi l’état de chaque ligne de chaque jeu de modifications. Ces données sont stockées dans des tables de journal créées dans le même schéma que la table de destination.

Les noms des tables de journal sont formatés comme suit : <source table name>_JOURNAL_<timestamp>_<schema generation> where <timestamp> is the value of epoch seconds when the source table was added to replication, and <schema generation> est un nombre entier qui augmente à chaque modification du schéma sur la table source. Par conséquent, les tables sources qui subissent des modifications de schéma auront plusieurs tables de journal.

Lorsqu’une table est retirée de la réplication, puis réintroduite, la valeur de l’horodatage <> change et la génération du schéma <> reprend à partir de 1.

Important

Snowflake recommande de ne pas modifier la structure des tables de journal de quelque manière que ce soit. Elles sont utilisées par le connecteur pour mettre à jour la table de destination dans le cadre du processus de réplication.

Le connecteur ne supprime jamais les tables de journal, mais il utilise le journal le plus récent pour chaque table source répliquée, en se contentant de lire les flux d’ajout uniquement au-dessus des journaux. Pour récupérer le stockage, vous pouvez :

  • Tronquer toutes les tables de journal à tout moment.

  • Supprimer les tables de journal liées aux tables sources qui ont été retirées de la réplication.

  • Supprimer toutes les tables de journal de la dernière génération, sauf les tables activement répliquées.

Par exemple, si votre connecteur est paramétré pour répliquer activement la table source orders, et que vous avez précédemment supprimé la table customers de la réplication, vous pouvez avoir les tables de journal suivantes. Dans ce cas, vous pouvez toutes les supprimer, à l’exception de orders_5678_2.

customers_1234_1 customers_1234_2 orders_5678_1 orders_5678_2 

Configuration de la planification des tâches de fusion

Le connecteur utilise un entrepôt pour fusionner les données de capture des données de changement (CDC) dans les tables de destination. Cette opération est déclenchée par le processeur MergeSnowflakeJournalTable. S’il n’y a pas de nouvelles modifications ou si aucun nouveau fichier de flux n’est en attente dans la file d’attente MergeSnowflakeJournalTable, aucune fusion n’est déclenchée et l’entrepôt se suspend automatiquement.

Pour limiter le coût de l’entrepôt et restreindre les fusions aux seules heures planifiées, utilisez l’expression CRON dans le paramètre de planification CRON de la tâche de fusion. Elle limite les fichiers de flux arrivant au processeur MergeSnowflakeJournalTable et les fusions ne sont déclenchées qu’au cours d’une période donnée. Pour plus d’informations sur la planification, voir Stratégie de planification.

Arrêter ou supprimer le connecteur

Lors de l’arrêt ou de la suppression du connecteur, vous devez tenir compte de l’emplacement de réplication https://www.postgresql.org/docs/current/warm-standby.html#STREAMING-REPLICATION-SLOTS utilisé par le connecteur.

Le connecteur crée son propre slot de réplication avec un nom commençant par snowflake_connector_ suivi d’un suffixe aléatoire. À mesure que le connecteur lit le flux de réplication, il avance l’emplacement, de sorte que PostgreSQL puisse découper son journal WAL et libérer de l’espace disque.

Lorsque le connecteur est en pause, le slot n’est pas avancé et les modifications apportées à la base de données source continuent d’augmenter la taille du journal WAL. Vous ne devez pas laisser le connecteur en pause pendant de longues périodes, en particulier dans les bases de données à fort trafic.

Lorsque le connecteur est supprimé, que ce soit en le supprimant du canevas Openflow ou par tout autre moyen, comme la suppression de l’instance Openflow entière, le slot de réplication reste en place et doit être supprimé manuellement.

Si vous avez plusieurs instances de connecteur qui répliquent à partir de la même base de données PostgreSQL, chaque instance créera son propre slot de réplication portant un nom unique. Lorsque vous supprimez manuellement un emplacement de réplication, assurez-vous qu’il s’agit du bon. Vous pouvez voir quel emplacement de réplication est utilisé par une instance de connecteur donnée en vérifiant l’état du processeur CaptureChangePostgreSQL.

Exécutez le flux

  1. Cliquez avec le bouton droit de la souris sur l’avion et sélectionnez Enable all Controller Services.

  2. Cliquez avec le bouton droit de la souris sur le groupe de processus importé et sélectionnez Start. Le connecteur démarre l’ingestion des données.