Configure o Openflow Connector for PostgreSQL¶
Nota
O conector está sujeito aos termos do conector.
Este tópico descreve as etapas para configurar o Openflow Connector for PostgreSQL.
Pré-requisitos¶
- Certifique-se de ter revisado Sobre a Openflow Connector for PostgreSQL. 
- Certifique-se de ter revisado as versões compatíveis do PostgreSQL. 
- Recomendado: certifique-se de adicionar apenas uma instância de conector por tempo de execução. 
- Certifique-se de ter seguido as etapas em Configuração do Openflow - BYOC ou Configuração do Openflow - Implantação do Snowflake - Visão geral da tarefa. 
- Como administrador de banco de dados, execute as seguintes tarefas: 
- Certifique-se de que haja espaço suficiente em disco no servidor PostgreSQL para o WAL. Isso ocorre porque, uma vez criado, um slot de replicação faz com que o PostgreSQL retenha os dados WAL da posição mantida pelo slot de replicação, até que o conector confirme e avance essa posição. 
- Certifique-se de que todas as tabelas habilitadas para replicação tenham uma chave primária. A chave pode ser uma coluna única ou composta. 
- Defina o REPLICA IDENTITY das tabelas como - DEFAULT. Isso garante que as chaves primárias sejam representadas no WAL, e que o conector possa lê-las.
- Crie um usuário para o conector. O conector requer um usuário com o atributo - REPLICATIONe permissões para SELECT de cada tabela replicada. Crie esse usuário com uma senha para entrar na configuração do conector. Para obter mais informações sobre segurança de replicação, consulte Segurança.
 
- Como administrador de conta Snowflake, execute as seguintes tarefas: - Crie um usuário Snowflake com o tipo SERVICE. Crie um banco de dados para armazenar os dados replicados e configure privilégios para que o usuário Snowflake crie objetos nesse banco de dados, concedendo os privilégios USAGE e CREATE SCHEMA. - CREATE DATABASE <destination_database>; CREATE USER <openflow_user> TYPE=SERVICE COMMENT='Service user for automated access of Openflow'; CREATE ROLE <openflow_role>; GRANT ROLE <openflow_role> TO USER <openflow_user>; GRANT USAGE ON DATABASE <destination_database> TO ROLE <openflow_role>; GRANT CREATE SCHEMA ON DATABASE <destination_database> TO ROLE <openflow_role>; CREATE WAREHOUSE <openflow_warehouse> WITH WAREHOUSE_SIZE = 'MEDIUM' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE; GRANT USAGE, OPERATE ON WAREHOUSE <openflow_warehouse> TO ROLE <openflow_role>; 
- Crie um par de chaves seguras (pública e privada). Armazene a chave privada do usuário em um arquivo para usá-la durante a configuração do conector. Atribua a chave pública ao usuário do serviço Snowflake: - ALTER USER <openflow_user> SET RSA_PUBLIC_KEY = 'thekey'; - Para obter mais informações, consulte autenticação de par de chaves. 
- Designe um warehouse para o conector usar. Comece com o tamanho - MEDIUMde warehouse e, em seguida, experimente os tamanhos dependendo da quantidade de tabelas que estão sendo replicadas e da quantidade de dados transferidos. Um grande número de tabelas normalmente escalona melhor com warehouses multicluster, em vez do tamanho do warehouse.
 
Configuração de wal_level¶
Openflow Connector for PostgreSQL requer que wal_level seja definido como logical.
Dependendo de onde o servidor PostgreSQL estiver hospedado, você pode configurar o wal_level da seguinte forma:
| No local | Execute a seguinte consulta com superusuário ou usuário com privilégio  
 | 
| RDS | O usuário usado pelo agente precisa ter a função  Você também precisa definir: 
 | 
| AWS Aurora | Defina o parâmetro estático  | 
| GCP | Defina os seguintes sinalizadores: 
 | 
| Azure | Defina o suporte de replicação como  | 
Crie uma publicação¶
O Openflow Connector for PostgreSQL requer que uma publicação seja criada e configurada em PostgreSQL antes do início da replicação. Você pode criá-la para todas as tabelas ou para um subconjunto delas, bem como para tabelas específicas apenas com colunas especificadas. Certifique-se de que todas as tabelas e colunas que você planeja replicar estejam incluídas na publicação. Você também pode modificar a publicação posteriormente, enquanto o conector estiver em execução. Para criar e configurar uma publicação, faça o seguinte:
- Efetue login como um usuário com o privilégio CREATE no banco de dados e execute a seguinte consulta: - CREATE PUBLICATION <publication name>;
- Defina as tabelas que o agente do banco de dados poderá ver usando: 
ALTER PUBLICATION <publication name> ADD TABLE <table name>;Importante
PostgreSQL 15 e posteriores oferecem suporte à configuração de publicações para um subconjunto específico de colunas da tabela. Para que o conector seja corretamente compatível com isso, você deve usar as configurações de filtragem de coluna para incluir as mesmas colunas definidas na publicação.
Sem essa configuração, o conector se comportará da seguinte forma:
Na tabela de destino, as colunas que não estiverem incluídas no filtro serão sufixadas com
__DELETED. Todos os dados replicados durante a fase de instantâneo serão mantidos.
Após adicionar novas colunas à publicação, a tabela falhará permanentemente e você precisará reiniciar a replicação.
Para obter mais informações, consulte ALTER PUBLICATION.
Instalação do conector¶
- Navegue até a página Visão geral do Openflow. Na seção Featured connectors, selecione View more connectors. 
- Na página de conectores do Openflow, localize o conector e selecione Add to runtime. 
- Na caixa de diálogo Select runtime, selecione seu tempo de execução na lista suspensa Available runtimes. 
- Selecione Add. - Nota - Antes de instalar o conector, verifique se você criou um banco de dados e um esquema no Snowflake para que o conector armazene os dados ingeridos. 
- Autentique-se na implementação com as credenciais de sua conta Snowflake e selecione Allow quando solicitado para permitir que o aplicativo de tempo de execução acesse sua conta Snowflake. O processo de instalação do conector leva alguns minutos para ser concluído. 
- Autentique-se no tempo de execução com as credenciais de sua conta Snowflake. 
A tela do Openflow é exibida com o grupo de processos do conector adicionado a ela.
Configuração do conector¶
Você pode configurar o conector para os seguintes casos de uso:
Replique um conjunto de tabelas em tempo real¶
- Clique com o botão direito do mouse no grupo de processos importado e selecione Parameters. 
- Preencha os valores dos parâmetros necessários conforme descrito em Parâmetros de fluxo. 
Parâmetros de fluxo¶
Comece definindo os parâmetros do contexto de Parâmetros de origem do PostgreSQL e, em seguida, o contexto de Parâmetros de destino do PostgreSQL. Feito isso, você pode ativar o conector e ele deve se conectar ao PostgreSQL e ao Snowflake e começar a funcionar. No entanto, ele não replicará nenhum dado até que qualquer tabela seja explicitamente adicionada à sua configuração.
Para configurar tabelas específicas para replicação, edite o contexto Parâmetros de ingestão do PostgreSQL. Logo depois que você aplicar as alterações ao contexto Parâmetros de replicação, a configuração será captada pelo conector, e o ciclo de vida da replicação será iniciado para cada tabela.
Contexto dos parâmetros de origem do PostgreSQL¶
| Parâmetro | Descrição | 
|---|---|
| URL de conexão Postgres | O URL completo do JDBC para o banco de dados de origem. Exemplo:  Se você estiver se conectando ao servidor de replicação PostgreSQL, consulte Replicação de tabelas de um servidor de replicação PostgreSQL. | 
| Driver Postgres JDBC | O caminho para o jar do driver PostgreSQL JDBC. Faça o download do jar em seu site e, em seguida, marque a caixa de seleção Reference asset para fazer o upload e anexá-lo. | 
| Modo Postgres SSL | Ative ou desative as conexões SSL. | 
| Certificado SSL da raiz Postgres | O conteúdo completo do certificado raiz do banco de dados. Opcional se SSL estiver desativado. | 
| Nome de usuário do Postgres | O nome de usuário do conector. | 
| Senha do Postgres | A senha do conector. | 
Contexto dos parâmetros de destino do PostgreSQL¶
| Parâmetro | Descrição | Obrigatório | 
|---|---|---|
| Banco de dados de destino | O banco de dados onde os dados serão persistidos. Ele já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas. | Sim | 
| Esquema de destino | O esquema onde os dados serão persistidos, que já deve existir no Snowflake. O nome diferencia maiúsculas de minúsculas. Para identificadores sem aspas, forneça o nome em maiúsculas. Veja os exemplos a seguir: 
 | Sim | 
| Identificador de conta Snowflake | Ao utilizar: 
 | Sim | 
| Estratégia de autenticação Snowflake | Ao utilizar: 
 | Sim | 
| Chave privada Snowflake | Ao utilizar: 
 | Não | 
| Arquivo de chave privada Snowflake | Ao utilizar: 
 | Não | 
| Senha de chave privada Snowflake | Ao usar 
 | Não | 
| Função Snowflake | Ao usar 
 | Sim | 
| Nome de usuário do Snowflake | Ao usar 
 | Sim | 
| Warehouse Snowflake | Warehouse Snowflake usado para executar consultas. | Sim | 
Contexto dos parâmetros de ingestão do PostgreSQL¶
| Parâmetro | Descrição | 
|---|---|
| Nomes de tabela inclusos | Uma lista separada por vírgulas de caminhos de tabela, incluindo os esquemas. Exemplo:  Selecione tabelas por nome ou por Regex. Se você usar ambas, todas as tabelas correspondentes de qualquer uma das opções serão incluídas. | 
| Regex de tabela inclusa | Uma expressão regular para comparar com os caminhos da tabela. Todos os caminhos que corresponderem à expressão serão replicados, e as novas tabelas que corresponderem ao padrão e forem criadas posteriormente também serão incluídas automaticamente. Exemplo:  Selecione tabelas por nome ou por Regex. Se você usar ambas, todas as tabelas correspondentes de qualquer uma das opções serão incluídas. | 
| Filtro de coluna JSON | Opcional. Um JSON contendo uma lista de nomes de tabelas totalmente qualificados e um padrão regex para nomes de colunas que devem ser incluídos na replicação. Exemplo:  | 
| CRON do cronograma de tarefas de fusão | A expressão CRON que define os períodos em que as operações de fusão do diário para a Tabela de destino serão acionadas. Defina-o como  Por exemplo: 
 Para obter mais informações e exemplos, consulte o tutorial de acionadores do cron na documentação do Quartz | 
Replicação de tabelas de um servidor de replicação PostgreSQL¶
O conector pode ingerir dados de um servidor primário, uma réplica de espera ativa ou servidor de assinante usando replicação lógica. Antes de configurar o conector para se conectar a uma réplica do PostgreSQL, certifique-se de que a replicação entre nós primários e de réplica funcione corretamente. Ao investigar problemas de dados ausentes no conector, primeiro certifique-se de que as linhas ausentes estejam presentes no servidor de réplica usado pelo conector.
Considerações adicionais ao conectar-se a uma réplica em espera:
Somente a conexão a réplicas em espera ativa é compatível. Observe que as réplicas em espera dinâmica não podem aceitar conexões de clientes até serem promovidas a uma instância primária.
A versão do PostgreSQL do servidor deve ser >= 16.
A publicação necessária para o conector deve ser criada no servidor primário, não no servidor em espera. O servidor em espera é somente leitura e não permite criar publicação.
Se você se conectar a uma instância de espera ativa e consulte o erro Tentativa de criar o slot de replicação “<replication slot> esgotou o tempo limite. Se estiver conectando a uma instância em espera, certifique-se de que haja algum tráfego na instância PostgreSQL primária, caso contrário a chamada para criar um slot de replicação nunca retornará. no código do Openflow ou o processador Read PostgreSQL CDC Stream não está iniciando, faça login na instância PostgreSQL primária e execute a seguinte consulta:
SELECT pg_log_standby_snapshot(); O erro ocorre quando não há alterações de dados no servidor primário. Como tal, o conector pode travar enquanto cria um slot de replicação no servidor de réplica. Isso resulta no servidor de réplica que exige informações sobre a execução de transações do servidor primário para poder criar um slot de replicação. Os servidores primários não enviarão as informações enquanto estiverem ociosos. A função pg_log_standby_snapshot() força o servidor primário a enviar informações sobre transações em execução para o servidor de réplica.
Remova e adicione novamente uma tabela à replicação¶
Para remover uma tabela da replicação, certifique-se de que ela seja removida dos parâmetros Nomes de tabela inclusos ou Regex de tabela inclusa no contexto Parâmetros de replicação.
Se quiser adicionar novamente a tabela à replicação mais tarde, primeiro exclua a tabela de destino correspondente no Snowflake. Após isso, adicione a tabela novamente aos parâmetros Nomes de tabela inclusos ou Regex de tabela inclusa. Isso garante que o processo de replicação comece do zero para a tabela.
Essa abordagem também pode ser usada para se recuperar de um cenário de replicação de tabela com falha.
Replique um subconjunto de colunas em uma tabela¶
O conector pode filtrar os dados replicados por tabela para um subconjunto de colunas configuradas.
Para aplicar filtros a colunas, modifique a propriedade Filtro de coluna no contexto Parâmetros de replicação, adicionando uma matriz de configurações – uma entrada para cada tabela à qual deseja aplicar um filtro.
As colunas podem ser incluídas ou excluídas por nome ou padrão. Você pode aplicar uma única condição por tabela ou combinar várias condições, com as exclusões sempre tendo precedência sobre as inclusões.
O exemplo a seguir mostra os campos disponíveis. schema e table são obrigatórios e, em seguida, um ou mais dos campos included, excluded, includedPattern, excludedPattern são obrigatórios.
[  {  "schema": "<source table schema>",  "table" : "<source table name>",  "included": ["<column name>", "<column name>"],  "excluded": ["<column name>", "<column name>"],  "includedPattern": "<regular expression>",  "excludedPattern": "<regular expression>",  } ] Monitore as alterações de dados em tabelas¶
O conector replica não apenas o estado atual dos dados das tabelas de origem, mas também cada estado de cada linha de cada conjunto de alterações. Esses dados são armazenados em tabelas de diário criadas no mesmo esquema da tabela de destino.
Os nomes das tabelas de diário são formatados como: é um número inteiro que aumenta a cada alteração de esquema na tabela de origem. Como resultado, as tabelas de origem que passam por alterações de esquema terão vários tabelas de diário.
Quando uma tabela é removida da replicação e depois adicionada novamente, o valor de <carimbo de data/hora> será alterado e a <geração de esquema> começará novamente a partir de 1.
Importante
O Snowflake recomenda que você não altere a estrutura das tabelas de diário de forma alguma. Elas são usadas pelo conector para atualizar a tabela de destino como parte do processo de replicação.
O conector nunca descarta tabelas de diário, mas faz uso do diário mais recente para cada tabela de origem replicada, lendo apenas fluxos apenas para anexação sobre os diários. Para recuperar o armazenamento, você pode:
- Truncar todas as tabelas de diário a qualquer momento. 
- Descartar as tabelas de diário relacionadas às tabelas de origem que foram removidas da replicação. 
- Descartar todas as tabelas de diário, exceto as de última geração, para tabelas replicadas ativamente. 
Por exemplo, se o conector estiver definido para replicar ativamente a tabela de origem orders e você tiver removido anteriormente a tabela customers da replicação, poderá ter as seguintes tabelas de diário. Nesse caso, você pode descartar todas elas exceto orders_5678_2.
customers_1234_1 customers_1234_2 orders_5678_1 orders_5678_2 Configure o agendamento de tarefas de fusão¶
O conector usa um warehouse para mesclar informações de captura de dados de alteração (CDC) nas tabelas de destino. Essa operação é acionada pelo processador MergeSnowflakeJournalTable. Se não houver novas alterações ou se não houver novos FlowFiles aguardando na fila MergeSnowflakeJournalTable, nenhuma fusão será acionada e o warehouse será suspenso automaticamente.
Para limitar o custo do warehouse e limitar as mesclagens apenas ao horário programado, use a expressão CRON no parâmetro CRON Cronograma da tarefa de mesclagem. Ele controla o fluxo dos FlowFiles que chegam ao processador MergeSnowflakeJournalTable e as mesclagens são acionadas somente em um período de tempo específico. Para obter mais informações sobre agendamento, consulte Estratégia de agendamento.
Pare ou exclua o conector¶
Ao interromper ou remover o conector, você deve considerar o slot de replicação que o conector usa.
O conector cria seu próprio slot de replicação com um nome que começa com snowflake_connector_ seguido de um sufixo aleatório. À medida que o conector lê o fluxo de replicação, ele avança o slot, de modo que o PostgreSQL possa cortar o log WAL e liberar espaço em disco.
Quando o conector é pausado, o slot não é avançado e as alterações no banco de dados de origem continuam aumentando o tamanho do registro WAL. Você não deve manter o conector em pausa por longos períodos de tempo, especialmente em bancos de dados de alto tráfego.
Quando o conector é removido, seja excluindo-o da tela do Openflow ou por qualquer outro meio, como a exclusão de toda a instância do Openflow, o slot de replicação permanece no lugar e deve ser removido manualmente.
Se você tiver várias instâncias do conector replicando do mesmo banco de dados PostgreSQL, cada instância criará seu próprio slot de replicação com nome exclusivo. Ao descartar um slot de replicação manualmente, certifique-se de que seja o correto. Você pode ver qual slot de replicação é usado por uma determinada instância do conector verificando o estado do processador CaptureChangePostgreSQL.
Execute o fluxo¶
- Clique com o botão direito do mouse no plano e selecione Enable all Controller Services. 
- Clique com o botão direito do mouse no grupo de processos importado e selecione Start. O conector inicia a ingestão de dados.