Tutorial 1: Criar um serviço Snowpark Container Services

Introdução

Após concluir a configuração comum, você estará pronto para criar um serviço. Neste tutorial, você cria um serviço (denominado echo_service) que simplesmente retorna o texto fornecido como entrada. Por exemplo, se a cadeia de caracteres de entrada for “Hello World”, o serviço retornará “Eu disse, Hello World”.

Existem duas partes neste tutorial:

Parte 1: Criar e testar um serviço. Faça download do código fornecido para este tutorial e siga as instruções passo a passo:

  1. Baixe o código de serviço deste tutorial.

  2. Crie uma imagem Docker para Snowpark Container Services e carregue a imagem em um repositório em sua conta.

  3. Crie um serviço, fornecendo o arquivo de especificação de serviço e o pool de computação no qual executar o serviço.

  4. Crie uma função de serviço para se comunicar com o serviço.

  5. Use o serviço. Você envia solicitações de eco ao serviço e verifica a resposta.

Parte 2: Entender o serviço. Esta seção fornece uma visão geral do código de serviço e destaca como diferentes componentes colaboram.

1: Baixe o código de serviço

O código (um aplicativo Python) é fornecido para criar o serviço Echo.

  1. Baixe SnowparkContainerServices-Tutorials.zip.

  2. Descompacte o conteúdo, que inclui um diretório para cada tutorial. O diretório Tutorial-1 possui os seguintes arquivos:

    • Dockerfile

    • echo_service.py

    • templates/basic_ui.html

2: Crie uma imagem e carregue

Crie uma imagem para a plataforma Linux/AMD64 compatível com o Snowpark Container Services e, em seguida, faça upload da imagem para o repositório de imagens da sua conta (consulte Configuração comum).

Você precisará de informações sobre o repositório (o URL do repositório e o nome do host do registro) antes de poder construir e fazer upload da imagem. Para obter mais informações, consulte Registro e repositórios.

Obter informações sobre o repositório

  1. Para obter o URL do repositório, execute o comando SHOW IMAGE REPOSITORIES SQL.

    SHOW IMAGE REPOSITORIES; 
    Copy
    • A coluna repository_url na saída fornece o URL. Abaixo um exemplo:

      <orgname>-<acctname>.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository 
    • O nome do host no URL do repositório é o nome do host de registro. Abaixo um exemplo:

      <orgname>-<acctname>.registry.snowflakecomputing.com 

Criar a imagem e carregá-la no repositório

  1. Abra uma janela de terminal e mude para o diretório que contém os arquivos que você descompactou.

  2. Para criar uma imagem do Docker, execute o seguinte comando docker build usando o Docker CLI. Observe que o comando especifica o diretório de trabalho atual (.) como PATH para arquivos a serem usados na construção da imagem.

    docker build --rm --platform linux/amd64 -t <repository_url>/<image_name> . 
    Copy
    • Para image_name, use my_echo_service_image:latest:

    Exemplo

    docker build --rm --platform linux/amd64 -t myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest . 
    Copy
  3. Faça upload da imagem para o repositório em sua conta Snowflake.

    1. Para que o Docker faça upload de uma imagem em seu nome para o repositório, primeiro autentique o Docker com o registro.

      1. Recomendamos usar a CLI do Snowflake para autenticar sua instância local do Docker com o registro da imagem para sua conta Snowflake. Certifique-se de que configurou a Snowflake CLI para se conectar ao Snowflake. Para obter mais informações, consulte Configuração de Snowflake CLI e conexão ao Snowflake.

      2. Para autenticar, execute o seguinte comando Snowflake CLI:

        snow spcs image-registry login 
        Copy
    2. Para fazer upload da imagem, execute o seguinte comando:

      docker push <repository_url>/<image_name> 
      Copy

      Exemplo

      docker push myorg-myacct.registry.snowflakecomputing.com/tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest 
      Copy

3: Criar um serviço

Nesta seção você cria um serviço e também uma função de serviço para se comunicar com o serviço.

Para criar um serviço, você precisa do seguinte:

  • Um pool de computação. Snowflake executa seu serviço no pool de computação especificado. Você criou um pool de computação como parte da configuração comum.

  • Uma especificação de serviço. Esta especificação fornece ao Snowflake as informações necessárias para configurar e executar seu serviço. Para obter mais informações, consulte Snowpark Container Services: como trabalhar com serviços. Neste tutorial, você fornece a especificação inline, no comando CREATE SERVICE. Você também pode salvar a especificação em um arquivo no estágio Snowflake e fornecer informações do arquivo no comando CREATE SERVICE, conforme mostrado no Tutorial 2.

Uma função de serviço é um dos métodos disponíveis para comunicação com seu serviço. Uma função de serviço é uma função definida pelo usuário (UDF) que você associa ao ponto de extremidade do serviço. Quando a função de serviço é executada, ela envia uma solicitação ao ponto de extremidade de serviço e recebe uma resposta.

  1. Verifique se o pool de computação está pronto e se você está no contexto certo para criar o serviço.

    1. Anteriormente, você definia o contexto na etapa Configuração comum. Para garantir que você esteja no contexto correto para as instruções SQL nesta etapa, execute o seguinte:

    USE ROLE test_role; USE DATABASE tutorial_db; USE SCHEMA data_schema; USE WAREHOUSE tutorial_warehouse; 
    Copy
    1. Para garantir que o pool de computação criado na configuração comum esteja pronto, execute DESCRIBE COMPUTE POOL e verifique se state é ACTIVE ou IDLE. Se state for STARTING, será necessário aguardar até que state mude para ACTIVE ou IDLE.

    DESCRIBE COMPUTE POOL tutorial_compute_pool; 
    Copy
  2. Para criar o serviço, execute o seguinte comando usando test_role:

    CREATE SERVICE echo_service IN COMPUTE POOL tutorial_compute_pool FROM SPECIFICATION $$ spec:  containers: - name: echo image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest env:  SERVER_PORT: 8000 CHARACTER_NAME: Bob readinessProbe:  port: 8000 path: /healthcheck endpoints: - name: echoendpoint port: 8000 public: true  $$ MIN_INSTANCES=1 MAX_INSTANCES=1; 
    Copy

    Nota

    Se já existir um serviço com esse nome, use o comando DROP SERVICE para excluir o serviço criado anteriormente e, em seguida, crie esse serviço.

  3. Execute os seguintes comandos SQL para obter informações detalhadas sobre o serviço que você acabou de criar. Para obter mais informações, consulte Snowpark Container Services: como trabalhar com serviços.

    • Para listar os serviços da sua conta, execute o comando SHOW SERVICES:

      SHOW SERVICES; 
      Copy
    • Para obter informações sobre seu serviço, inclusive o status do serviço, execute o comando DESCRIBE SERVICE.

      DESC SERVICE echo_service; 
      Copy

      Verifique se a coluna status mostra o status do serviço como RUNNING; se o status for PENDING, isso indica que o serviço ainda está sendo iniciado. Para investigar por que o serviço não é RUNNING, execute o comando SHOW SERVICE CONTAINERS IN SERVICE e examine o status de contêineres individuais:

      SHOW SERVICE CONTAINERS IN SERVICE echo_service; 
      Copy
  4. Para criar uma função de serviço, execute o seguinte comando:

    CREATE FUNCTION my_echo_udf (InputText varchar) RETURNS varchar SERVICE=echo_service ENDPOINT=echoendpoint AS '/echo'; 
    Copy

    Observe o seguinte:

    • A propriedade SERVICE associa a UDF ao serviço echo_service.

    • A propriedade ENDPOINT associa a UDF ao ponto de extremidade echoendpoint dentro do serviço.

    • AS “/echo” especifica o caminho HTTP para o servidor de eco. Você pode encontrar esse caminho no código de serviço (echo_service.py).

4: Use o serviço

Primeiro, configure o contexto para as instruções SQL nesta seção e execute o seguinte:

USE ROLE test_role; USE DATABASE tutorial_db; USE SCHEMA data_schema; USE WAREHOUSE tutorial_warehouse; 
Copy

Agora você pode se comunicar com o serviço Echo.

  1. Usando uma função de serviço: você pode invocar a função de serviço em uma consulta. A função de serviço de exemplo (my_echo_udf) pode receber uma única cadeia de caracteres ou uma lista de cadeias de caracteres como entrada.

    Exemplo 1.1: passe uma única cadeia de caracteres

    • Para chamar a função de serviço my_echo_udf, execute a seguinte instrução SELECT, passando uma cadeia de caracteres de entrada ('hello'):

      SELECT my_echo_udf('hello!'); 
      Copy

      Snowflake envia uma solicitação POST ao ponto de extremidade do serviço (echoendpoint). Ao receber a solicitação, o serviço ecoa a cadeia de caracteres de entrada na resposta.

      +--------------------------+ | **MY_ECHO_UDF('HELLO!')**| |------------------------- | | Bob said hello! | +--------------------------+ 

    Exemplo 1.2: passe uma lista de cadeias de caracteres

    Quando você passa uma lista de cadeias de caracteres para a função de serviço, o Snowflake agrupa essas cadeias de caracteres de entrada e envia uma série de solicitações POST para o serviço. Depois que o serviço processa todas as cadeias de caracteres, o Snowflake combina os resultados e os retorna.

    O exemplo a seguir passa uma coluna da tabela como entrada para a função de serviço.

    1. Crie uma tabela com várias cadeias de caracteres:

      CREATE TABLE messages (message_text VARCHAR) AS (SELECT * FROM (VALUES ('Thank you'), ('Hello'), ('Hello World'))); 
      Copy
    2. Verifique se a tabela foi criada:

      SELECT * FROM messages; 
      Copy
    3. Para chamar a função de serviço, execute a seguinte instrução SELECT, passando as linhas da tabela como entrada:

      SELECT my_echo_udf(message_text) FROM messages; 
      Copy

      Saída:

      +---------------------------+ | MY_ECHO_UDF(MESSAGE_TEXT) | |---------------------------| | Bob said Thank you | | Bob said Hello | | Bob said Hello World | +---------------------------+ 
  2. Usando um navegador da Web: o serviço expõe o ponto de extremidade publicamente (consulte a especificação embutida fornecida no comando CREATE SERVICE). Portanto, você pode fazer login em uma UI da web que o serviço expõe à Internet e, em seguida, enviar solicitações ao serviço a partir de um navegador da web.

    1. Encontre o URL do ponto de extremidade público que o serviço expõe:

      SHOW ENDPOINTS IN SERVICE echo_service; 
      Copy

      A coluna ingress_url na resposta fornece o URL.

      Exemplo

      p6bye-myorg-myacct.snowflakecomputing.app 
    2. Anexe /ui ao URL do ponto de extremidade e cole-o no navegador da web. Isso faz com que o serviço execute a função ui() (consulte echo_service.py).

      Observe que na primeira vez que você acessar o URL do ponto de extremidade, será solicitado a fazer login no Snowflake. Para este teste, use o mesmo usuário usado para criar o serviço para garantir que o usuário tenha os privilégios necessários.

      Formulário da web para comunicação com o serviço Echo.
    3. Insira a cadeia de caracteres “Olá” na caixa Entrada e pressione Retornar.

      Formulário da Web mostrando a resposta do serviço Echo.

    Nota

    É possível acessar o ponto de extremidade público programaticamente. Para obter um código de amostra, consulte Autenticação de entrada. Observe que é necessário anexar /ui ao URL do ponto de extremidade no código para que o Snowflake possa rotear a solicitação para a função ui() no código de serviço.

5: (Opcional) Acesse o ponto de extremidade público programaticamente

Na seção anterior, você testou o serviço Echo usando um navegador da Web. No navegador, você acessou o ponto de extremidade público (ponto de extremidade de entrada) e enviou solicitações usando a UI da Web que o serviço expôs. Nesta seção, você testa o mesmo ponto de extremidade público programaticamente.

O exemplo usa autenticação de par de chaves. Usando o par de chaves fornecido, o código de amostra primeiro gera um JSON Web Token (JWT) e depois troca o token com o Snowflake por um token OAuth. O código então usa o token OAuth para autenticar ao se comunicar com o ponto de extremidade público do serviço Echo.

Pré-requisitos

Certifique-se de ter as seguintes informações:

  • URL de entrada do ponto de extremidade público. Execute o comando SHOW ENDPOINTS IN SERVICEpara obter o URL:

    SHOW ENDPOINTS IN SERVICE echo_service; 
    Copy
  • Nome de sua conta Snowflake. Para obter mais informações, consulte Configuração comum: Verifique se você está pronto para continuar.

  • URL de sua conta Snowflake: É <acctname >.snowflakecomputing.com.

  • Nome de usuário na conta Snowflake. Este é o usuário escolhido em Configuração comum: Criação de objetos Snowflake. Você faz login no Snowflake como este usuário e testa o acesso programático.

  • Nome da função: você criou uma função (test_role) como parte da configuração comum. O usuário assume essa função para executar ações.

Configuração

Siga as etapas para se comunicar com o serviço Echo programaticamente. Como usar o código Python fornecido, você envia solicitações ao ponto de extremidade público que o serviço Echo expõe.

  1. Em um prompt de comando, crie um diretório e navegue até ele.

  2. Configure a autenticação do par de chaves para o usuário.

    1. Gerar um par de chaves:

      1. Gere uma chave privada. Para simplificar as etapas do exercício, você gera uma chave privada não criptografada. Você também pode usar uma chave privada criptografada, mas será necessário digitar a senha.

        openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt 
        Copy
      2. Gere uma chave pública (rsa_key.pub) referenciando a chave privada que você criou.

        openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub 
        Copy
    2. Verifique se você tem a chave privada e a chave pública geradas no diretório.

    3. Atribua a chave pública ao usuário que você está usando para testar o acesso programático. Isso permite que o usuário especifique a chave para autenticação.

      ALTER USER <user-name> SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...'; 
      Copy
  3. Salve o código de amostra fornecido em arquivos Python.

    1. Salve o código a seguir em generateJWT.py.

      # To run this on the command line, enter: # python3 generateJWT.py --account=<account_identifier> --user=<username> --private_key_file_path=<path_to_private_key_file> from cryptography.hazmat.primitives.serialization import load_pem_private_key from cryptography.hazmat.primitives.serialization import Encoding from cryptography.hazmat.primitives.serialization import PublicFormat from cryptography.hazmat.backends import default_backend from datetime import timedelta, timezone, datetime import argparse import base64 from getpass import getpass import hashlib import logging import sys # This class relies on the PyJWT module (https://pypi.org/project/PyJWT/). import jwt logger = logging.getLogger(__name__) try: from typing import Text except ImportError: logger.debug('# Python 3.5.0 and 3.5.1 have incompatible typing modules.', exc_info=True) from typing_extensions import Text ISSUER = "iss" EXPIRE_TIME = "exp" ISSUE_TIME = "iat" SUBJECT = "sub" # If you generated an encrypted private key, implement this method to return # the passphrase for decrypting your private key. As an example, this function # prompts the user for the passphrase. def get_private_key_passphrase(): return getpass('Passphrase for private key: ') class JWTGenerator(object):  """  Creates and signs a JWT with the specified private key file, username, and account identifier. The JWTGenerator keeps the  generated token and only regenerates the token if a specified period of time has passed.  """ LIFETIME = timedelta(minutes=59) # The tokens will have a 59-minute lifetime RENEWAL_DELTA = timedelta(minutes=54) # Tokens will be renewed after 54 minutes ALGORITHM = "RS256" # Tokens will be generated using RSA with SHA256 def __init__(self, account: Text, user: Text, private_key_file_path: Text, lifetime: timedelta = LIFETIME, renewal_delay: timedelta = RENEWAL_DELTA):  """  __init__ creates an object that generates JWTs for the specified user, account identifier, and private key.  :param account: Your Snowflake account identifier. See https://docs.snowflake.com/en/user-guide/admin-account-identifier.html. Note that if you are using the account locator, exclude any region information from the account locator.  :param user: The Snowflake username.  :param private_key_file_path: Path to the private key file used for signing the JWTs.  :param lifetime: The number of minutes (as a timedelta) during which the key will be valid.  :param renewal_delay: The number of minutes (as a timedelta) from now after which the JWT generator should renew the JWT.  """ logger.info(  """Creating JWTGenerator with arguments  account : %s, user : %s, lifetime : %s, renewal_delay : %s""", account, user, lifetime, renewal_delay) # Construct the fully qualified name of the user in uppercase. self.account = self.prepare_account_name_for_jwt(account) self.user = user.upper() self.qualified_username = self.account + "." + self.user self.lifetime = lifetime self.renewal_delay = renewal_delay self.private_key_file_path = private_key_file_path self.renew_time = datetime.now(timezone.utc) self.token = None # Load the private key from the specified file. with open(self.private_key_file_path, 'rb') as pem_in: pemlines = pem_in.read() try: # Try to access the private key without a passphrase. self.private_key = load_pem_private_key(pemlines, None, default_backend()) except TypeError: # If that fails, provide the passphrase returned from get_private_key_passphrase(). self.private_key = load_pem_private_key(pemlines, get_private_key_passphrase().encode(), default_backend()) def prepare_account_name_for_jwt(self, raw_account: Text) -> Text:  """  Prepare the account identifier for use in the JWT.  For the JWT, the account identifier must not include the subdomain or any region or cloud provider information.  :param raw_account: The specified account identifier.  :return: The account identifier in a form that can be used to generate the JWT.  """ account = raw_account if not '.global' in account: # Handle the general case. idx = account.find('.') if idx > 0: account = account[0:idx] else: # Handle the replication case. idx = account.find('-') if idx > 0: account = account[0:idx] # Use uppercase for the account identifier. return account.upper() def get_token(self) -> Text:  """  Generates a new JWT. If a JWT has already been generated earlier, return the previously generated token unless the  specified renewal time has passed.  :return: the new token  """ now = datetime.now(timezone.utc) # Fetch the current time # If the token has expired or doesn't exist, regenerate the token. if self.token is None or self.renew_time <= now: logger.info("Generating a new token because the present time (%s) is later than the renewal time (%s)", now, self.renew_time) # Calculate the next time we need to renew the token. self.renew_time = now + self.renewal_delay # Prepare the fields for the payload. # Generate the public key fingerprint for the issuer in the payload. public_key_fp = self.calculate_public_key_fingerprint(self.private_key) # Create our payload payload = { # Set the issuer to the fully qualified username concatenated with the public key fingerprint. ISSUER: self.qualified_username + '.' + public_key_fp, # Set the subject to the fully qualified username. SUBJECT: self.qualified_username, # Set the issue time to now. ISSUE_TIME: now, # Set the expiration time, based on the lifetime specified for this object. EXPIRE_TIME: now + self.lifetime } # Regenerate the actual token token = jwt.encode(payload, key=self.private_key, algorithm=JWTGenerator.ALGORITHM) # If you are using a version of PyJWT prior to 2.0, jwt.encode returns a byte string instead of a string. # If the token is a byte string, convert it to a string. if isinstance(token, bytes): token = token.decode('utf-8') self.token = token logger.info("Generated a JWT with the following payload: %s", jwt.decode(self.token, key=self.private_key.public_key(), algorithms=[JWTGenerator.ALGORITHM])) return self.token def calculate_public_key_fingerprint(self, private_key: Text) -> Text:  """  Given a private key in PEM format, return the public key fingerprint.  :param private_key: private key string  :return: public key fingerprint  """ # Get the raw bytes of public key. public_key_raw = private_key.public_key().public_bytes(Encoding.DER, PublicFormat.SubjectPublicKeyInfo) # Get the sha256 hash of the raw bytes. sha256hash = hashlib.sha256() sha256hash.update(public_key_raw) # Base64-encode the value and prepend the prefix 'SHA256:'. public_key_fp = 'SHA256:' + base64.b64encode(sha256hash.digest()).decode('utf-8') logger.info("Public key fingerprint is %s", public_key_fp) return public_key_fp def main(): logging.basicConfig(stream=sys.stdout, level=logging.INFO) cli_parser = argparse.ArgumentParser() cli_parser.add_argument('--account', required=True, help='The account identifier (e.g. "myorganization-myaccount" for "myorganization-myaccount.snowflakecomputing.com").') cli_parser.add_argument('--user', required=True, help='The user name.') cli_parser.add_argument('--private_key_file_path', required=True, help='Path to the private key file used for signing the JWT.') cli_parser.add_argument('--lifetime', type=int, default=59, help='The number of minutes that the JWT should be valid for.') cli_parser.add_argument('--renewal_delay', type=int, default=54, help='The number of minutes before the JWT generator should produce a new JWT.') args = cli_parser.parse_args() token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token() print('JWT:') print(token) if __name__ == "__main__": main() 
      Copy
    2. Salve o código a seguir em access-via-keypair.py.

      from generateJWT import JWTGenerator from datetime import timedelta import argparse import logging import sys import requests logger = logging.getLogger(__name__) def main(): args = _parse_args() token = _get_token(args) snowflake_jwt = token_exchange(token,endpoint=args.endpoint, role=args.role, snowflake_account_url=args.snowflake_account_url, snowflake_account=args.account) spcs_url=f'https://{args.endpoint}{args.endpoint_path}' connect_to_spcs(snowflake_jwt, spcs_url) def _get_token(args): token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token() logger.info("Key Pair JWT: %s" % token) return token def token_exchange(token, role, endpoint, snowflake_account_url, snowflake_account): scope_role = f'session:role:{role}' if role is not None else None scope = f'{scope_role} {endpoint}' if scope_role is not None else endpoint data = { 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', 'scope': scope, 'assertion': token, } logger.info(data) url = f'https://{snowflake_account}.snowflakecomputing.com/oauth/token' if snowflake_account_url: url = f'{snowflake_account_url}/oauth/token' logger.info("oauth url: %s" %url) response = requests.post(url, data=data) logger.info("snowflake jwt : %s" % response.text) assert 200 == response.status_code, "unable to get snowflake token" return response.text def connect_to_spcs(token, url): # Create a request to the ingress endpoint with authz. headers = {'Authorization': f'Snowflake Token="{token}"'} response = requests.post(f'{url}', headers=headers) logger.info("return code %s" % response.status_code) logger.info(response.text) def _parse_args(): logging.basicConfig(stream=sys.stdout, level=logging.INFO) cli_parser = argparse.ArgumentParser() cli_parser.add_argument('--account', required=True, help='The account identifier (for example, "myorganization-myaccount" for ' '"myorganization-myaccount.snowflakecomputing.com").') cli_parser.add_argument('--user', required=True, help='The user name.') cli_parser.add_argument('--private_key_file_path', required=True, help='Path to the private key file used for signing the JWT.') cli_parser.add_argument('--lifetime', type=int, default=59, help='The number of minutes that the JWT should be valid for.') cli_parser.add_argument('--renewal_delay', type=int, default=54, help='The number of minutes before the JWT generator should produce a new JWT.') cli_parser.add_argument('--role', help='The role we want to use to create and maintain a session for. If a role is not provided, ' 'use the default role.') cli_parser.add_argument('--endpoint', required=True, help='The ingress endpoint of the service') cli_parser.add_argument('--endpoint-path', default='/', help='The url path for the ingress endpoint of the service') cli_parser.add_argument('--snowflake_account_url', default=None, help='The account url of the account for which we want to log in. Type of ' 'https://myorganization-myaccount.snowflakecomputing.com') args = cli_parser.parse_args() return args if __name__ == "__main__": main() 
      Copy

Como enviar de solicitações ao ponto de extremidade do servidor programaticamente

Execute o código Python access-via-keypair.py para fazer a chamada de entrada para o ponto de extremidade público do serviço Echo.

python3 access-via-keypair.py \ --account <account-identifier> \ --user <user-name> \ --role TEST_ROLE \ --private_key_file_path rsa_key.p8 \ --endpoint <ingress-hostname> \ --endpoint-path /ui 
Copy

Observe que o nome especificado pelo sinalizador --role deve corresponder exatamente ao caso do nome da função mostrado por SHOW ROLES.

Para obter mais informações sobre account-identifier, consulte Identificadores de conta.

Como funciona a autenticação

O código primeiro converte o par de chaves fornecido em um token JWT. Em seguida, ele envia o token JWT para o Snowflake para obter um token OAuth. Por fim, o código usa o token OAuth para se conectar ao Snowflake e acessar o ponto de extremidade público. Especificamente, o código faz o seguinte:

  1. Chama a função _get_token(args) para gerar um token JWT a partir do par de chaves fornecido. A implementação da função é mostrada:

    def _get_token(args): token = JWTGenerator(args.account, args.user, args.private_key_file_path, timedelta(minutes=args.lifetime), timedelta(minutes=args.renewal_delay)).get_token() logger.info("Key Pair JWT: %s" % token) return token 
    Copy

    JWTGenerator é uma classe auxiliar fornecida a você. Observe o seguinte sobre os parâmetros que você fornece ao criar este objeto:

    • Parâmetros args.account e args.user: Um token JWT tem vários campos (consulte o formato de token); iss é um dos campos. Este valor de campo inclui o nome da conta Snowflake e um nome de usuário. Portanto, você fornece esses valores como parâmetros.

    • Os dois parâmetros timedelta fornecem as seguintes informações:

      • lifetime especifica o número de minutos durante os quais a chave será válida (60 minutos).

      • renewal_delay especifica o número de minutos a partir de agora após o qual o gerador JWT deve renovar o JWT.

  2. Chama a função token_exchange() para se conectar ao Snowflake e trocar o token JWT por um token OAuth.

    scope_role = f'session:role:{role}' if role is not None else None scope = f'{scope_role} {endpoint}' if scope_role is not None else endpoint data = { 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', 'scope': scope, 'assertion': token, } 
    Copy

    O código anterior constrói uma configuração JSON do escopo para o token OAuth, o ponto de extremidade público que pode ser acessado usando a função especificada. Este código então faz uma solicitação POST ao Snowflake passando o JSON para trocar o token JWT por um token OAuth (consulte Troca de tokens) conforme mostrado:

    url = f'{snowflake_account_url}/oauth/token' response = requests.post(url, data=data) assert 200 == response.status_code, "unable to get Snowflake token" return response.text 
    Copy
  3. O código então chama a função connect_to_spcs() para se conectar ao ponto de extremidade público do serviço Echo. Ele fornece o URL (https://<URL de entrada>/ui) do ponto de extremidade e o token OAuth para autenticação.

    headers = {'Authorization': f'Snowflake Token="{token}"'} response = requests.post(f'{url}', headers=headers) 
    Copy

    O url é o spcs_url que você forneceu ao programa, e o token é o token OAuth.

    O serviço Echo neste exemplo atende uma página HTML (conforme explicado na seção anterior). Este código de amostra simplesmente imprime o HTML na resposta.

6: Limpeza

Se você não planeja continuar com o Tutorial 2 ou o Tutorial 3, remova os recursos faturáveis que você criou. Para obter mais informações, consulte a Etapa 5 no Tutorial 3.

7: Análise do código de serviço

Esta seção cobre os seguintes tópicos:

Análise do código do tutorial 1

O arquivo zip baixado na etapa 1 inclui os seguintes arquivos:

  • Dockerfile

  • echo_service.py

  • templates/basic_ui.html

Você também usa a especificação de serviço ao criar o serviço. A seção a seguir explica como esses componentes de código funcionam juntos para criar o serviço.

Arquivo echo_service.py

Este arquivo Python contém o código que implementa um servidor HTTP mínimo que retorna (ecoa) o texto de entrada. O código executa principalmente duas tarefas: manipular solicitações de eco das funções de serviço do Snowflake e fornecer uma interface de usuário da web (UI) para enviar solicitações de eco.

from flask import Flask from flask import request from flask import make_response from flask import render_template import logging import os import sys SERVICE_HOST = os.getenv('SERVER_HOST', '0.0.0.0') SERVER_PORT = os.getenv('SERVER_PORT', 8080) CHARACTER_NAME = os.getenv('CHARACTER_NAME', 'I') def get_logger(logger_name): logger = logging.getLogger(logger_name) logger.setLevel(logging.DEBUG) handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.DEBUG) handler.setFormatter( logging.Formatter( '%(name)s [%(asctime)s] [%(levelname)s] %(message)s')) logger.addHandler(handler) return logger logger = get_logger('echo-service') app = Flask(__name__) @app.get("/healthcheck") def readiness_probe(): return "I'm ready!" @app.post("/echo") def echo():  '''  Main handler for input data sent by Snowflake.  ''' message = request.json logger.debug(f'Received request: {message}') if message is None or not message['data']: logger.info('Received empty message') return {} # input format: # {"data": [ # [row_index, column_1_value, column_2_value, ...], # ... # ]} input_rows = message['data'] logger.info(f'Received {len(input_rows)} rows') # output format: # {"data": [ # [row_index, column_1_value, column_2_value, ...}], # ... # ]} output_rows = [[row[0], get_echo_response(row[1])] for row in input_rows] logger.info(f'Produced {len(output_rows)} rows') response = make_response({"data": output_rows}) response.headers['Content-type'] = 'application/json' logger.debug(f'Sending response: {response.json}') return response @app.route("/ui", methods=["GET", "POST"]) def ui():  '''  Main handler for providing a web UI.  ''' if request.method == "POST": # getting input in HTML form input_text = request.form.get("input") # display input and output return render_template("basic_ui.html", echo_input=input_text, echo_reponse=get_echo_response(input_text)) return render_template("basic_ui.html") def get_echo_response(input): return f'{CHARACTER_NAME} said {input}' if __name__ == '__main__': app.run(host=SERVICE_HOST, port=SERVER_PORT) 
Copy

No código:

  • A função echo permite que uma função de serviço Snowflake se comunique com o serviço. Esta função especifica a decoração @app.post() conforme mostrado:

    @app.post("/echo") def echo(): 
    Copy

    Quando o servidor de eco recebe sua solicitação HTTP POST com o caminho /echo, o servidor encaminha a solicitação para esta função. A função é executada e ecoa as cadeias de caracteres do corpo da solicitação na resposta.

    Para oferecer suporte à comunicação de uma função de serviço Snowflake, este servidor implementa as funções externas. Ou seja, a implementação do servidor segue um determinado formato de dados de entrada/saída para servir uma função SQL, e este é o mesmo formato de dados de entrada/saída usado pelas funções externas .

  • A seção de função ui do código exibe um formulário da web e lida com solicitações de eco enviadas do formulário da web. Esta função usa o decorador @app.route() para especificar que as solicitações de /ui serão tratadas por esta função:

    @app.route("/ui", methods=["GET", "POST"]) def ui(): 
    Copy

    O serviço Echo expõe o ponto de extremidade echoendpoint publicamente (consulte a especificação do serviço), permitindo a comunicação com o serviço pela web. Quando você carrega o URL do ponto de extremidade público com /ui anexado em seu navegador, o navegador envia uma solicitação HTTP GET para esse caminho e o servidor encaminha a solicitação para essa função. A função é executada e retorna um formulário HTML simples para o usuário inserir uma cadeia de caracteres.

    Depois que o usuário insere uma cadeia de caracteres e envia o formulário, o navegador envia uma solicitação HTTP para esse caminho e o servidor encaminha a solicitação para essa mesma função. A função é executada e retorna uma resposta HTTP contendo a cadeia de caracteres original.

  • A função readiness_probe usa o decorador @app.get() para especificar que as solicitações de /healthcheck serão tratadas por esta função:

    @app.get("/healthcheck") def readiness_probe(): 
    Copy

    Esta função permite que o Snowflake verifique a prontidão do serviço. Quando o contêiner é iniciado, o Snowflake deseja confirmar se o aplicativo está funcionando e se o serviço está pronto para atender às solicitações. O Snowflake envia uma solicitação HTTP GET com esse caminho (como uma análise de integridade, análise de prontidão) para garantir que apenas contêineres íntegros tenham tráfego. A função pode fazer o que você quiser.

  • A função get_logger ajuda a configurar a geração de registros.

Arquivo Docker

Este arquivo contém todos os comandos para criar uma imagem usando Docker.

ARG BASE_IMAGE=python:3.10-slim-buster FROM $BASE_IMAGE COPY echo_service.py ./ COPY templates/ ./templates/ RUN pip install --upgrade pip && \\ pip install flask CMD ["python", "echo_service.py"] 
Copy

O Dockerfile contém instruções para instalar a biblioteca Flask no contêiner Docker. O código em echo_service.py depende da biblioteca Flask para lidar com solicitações HTTP.

/template/basic_ui.html

O serviço Echo expõe o ponto de extremidade echoendpoint publicamente (consulte a especificação do serviço), permitindo a comunicação com o serviço pela web. Quando você carrega o URL do ponto de extremidade público com /ui anexado em seu navegador, o serviço Echo exibe este formulário. Você pode inserir uma cadeia de caracteres no formulário e enviá-lo, e o serviço retornará a cadeia de caracteres em uma resposta HTTP.

<!DOCTYPE html> <html lang="en"> <head> <title>Welcome to echo service!</title> </head> <body> <h1>Welcome to echo service!</h1> <form action="{{ url_for("ui") }}" method="post"> <label for="input">Input:<label><br> <input type="text" id="input" name="input"><br> </form> <h2>Input:</h2> {{ echo_input }} <h2>Output:</h2> {{ echo_reponse }} </body> </html> 
Copy

Especificação de serviço

Snowflake usa as informações fornecidas nesta especificação para configurar e executar seu serviço.

spec:  containers:  - name: echo  image: /tutorial_db/data_schema/tutorial_repository/my_echo_service_image:latest  env:  SERVER_PORT: 8000  CHARACTER_NAME: Bob  readinessProbe:  port: 8000  path: /healthcheck  endpoints:  - name: echoendpoint  port: 8000  public: true 
Copy

Na especificação do serviço:

  • O containers.image especifica a imagem do Snowflake para iniciar um contêiner.

  • O campo opcional endpoints especifica o ponto de extremidade que o serviço expõe.

    • O name especifica um nome amigável para a porta de rede TCP na qual o contêiner está escutando. Você usa esse nome de ponto de extremidade amigável para enviar solicitações para a porta correspondente. Observe que env.SERVER_PORT controla esse número de porta.

    • O ponto de extremidade também está configurado como public. Isto permite o tráfego para este ponto de extremidade a partir da web pública.

  • O campo opcional containers.env é adicionado para ilustrar como você pode substituir variáveis de ambiente que o Snowflake passa para todos os processos em seu contêiner. Por exemplo, o código de serviço (echo_service.py) lê as variáveis de ambiente com valores padrão conforme mostrado:

    CHARACTER_NAME = os.getenv('CHARACTER_NAME', 'I') SERVER_PORT = os.getenv('SERVER_PORT', 8080) 
    Copy

    Funciona da seguinte maneira:

    • Quando o serviço Echo recebe uma solicitação HTTP POST com uma cadeia de caracteres (por exemplo, “Olá”) no corpo da solicitação, o serviço retorna “Eu disse Olá” por padrão. O código usa a variável de ambiente CHARACTER_NAME para determinar a palavra antes de «disse». Por padrão, CHARACTER_NAME é definido como “eu».

      Você pode substituir o valor padrão CHARACTER_NAME na especificação de serviço. Por exemplo, se você definir o valor como «Bob»; o serviço Echo retorna uma resposta «Bob disse Olá».

    • Da mesma forma, a especificação do serviço substitui a porta (SERVER_PORT) que o serviço escuta por 8000, substituindo a porta padrão 8080.

  • O campo readinessProbe identifica port e path que o Snowflake pode usar para enviar uma solicitação HTTP GET à análise de prontidão para verificar se o serviço está pronto para lidar com o tráfego.

    O código de serviço (echo_python.py) implementa a sonda de prontidão da seguinte forma:

    @app.get("/healthcheck") def readiness_probe(): 
    Copy

    Portanto, o arquivo de especificação inclui o campo container.readinessProbe adequadamente.

Para obter mais informações sobre especificações de serviço, consulte Referência de especificação de serviço.

Como entender a função de serviço

Uma função de serviço é um dos métodos de comunicação com seu serviço (consulte Como usar um serviço). Uma função de serviço é uma função definida pelo usuário (UDF) que você associa a um ponto de extremidade de serviço. Quando a função de serviço é executada, ela envia uma solicitação ao ponto de extremidade de serviço associado e recebe uma resposta.

Você cria a seguinte função de serviço executando o comando CREATE FUNCTION com os seguintes parâmetros:

CREATE FUNCTION my_echo_udf (InputText VARCHAR) RETURNS VARCHAR SERVICE=echo_service ENDPOINT=echoendpoint AS '/echo'; 
Copy

Observe o seguinte:

  • A função my_echo_udf usa uma cadeia de caracteres como entrada e retorna uma cadeia de caracteres.

  • A propriedade SERVICE identifica o serviço (echo_service) e a propriedade ENDPOINT identifica o nome do ponto de extremidade amigável (echoendpoint).

  • O AS “/echo” especifica o caminho para o serviço. Em echo_service.py, o decorador @app.post associa este caminho à função echo.

Esta função se conecta ao ENDPOINT específico do SERVICE especificado. Quando você invoca esta função, o Snowflake envia uma solicitação para o caminho /echo dentro do contêiner de serviço.

Criação e teste de uma imagem localmente

Você pode testar a imagem do Docker localmente antes de carregá-la em um repositório em sua conta Snowflake. Nos testes locais, seu contêiner é executado de forma independente (não é um serviço executado pelo Snowflake).

Para testar a imagem do Docker do Tutorial 1:

  1. Para criar uma imagem do Docker, no Docker CLI, execute o seguinte comando:

    docker build --rm -t my_service:local . 
    Copy
  2. Para iniciar seu código, execute o seguinte comando:

    docker run --rm -p 8080:8080 my_service:local 
    Copy
  3. Envie uma solicitação de eco ao serviço usando um dos seguintes métodos:

    • Usando o comando cURL:

      Em outra janela do terminal, usando cURL, envie a seguinte solicitação POST para a porta 8080:

      curl -X POST http://localhost:8080/echo \  -H "Content-Type: application/json" \  -d '{"data":[[0, "Hello friend"], [1, "Hello World"]]}' 
      Copy

      Observe que o corpo da solicitação inclui duas cadeias de caracteres. Este comando cURL envia uma solicitação POST para a porta 8080 na qual o serviço está escutando. O 0 nos dados é o índice da cadeia de caracteres de entrada na lista. O serviço Echo ecoa as cadeias de caracteres de entrada na resposta, conforme mostrado:

      {"data":[[0,"I said Hello Friend"],[1,"I said Hello World"]]} 
    • Usando um navegador da web:

      1. No seu navegador, no mesmo computador, abra http://localhost:8080/ui.

        Isso envia uma solicitação GET para a porta 8080, na qual o serviço está escutando. O serviço executa a função ui(), que renderiza um formulário HTML conforme mostrado:

        Formulário da web para comunicação com o serviço Echo.
      2. Insira a cadeia de caracteres “Olá” na caixa Entrada e pressione Retornar.

        Formulário da Web mostrando a resposta do serviço Echo.

Qual é o próximo passo?

Agora você pode testar o Tutorial 2 que executa um trabalho.