연속 데이터 파이프라인 예시¶
이 항목에서는 데이터 파이프라인과 관련된 사용 사례의 실제 예를 제공합니다.
이 항목의 내용:
전제 조건¶
이 예에서 SQL 문을 실행하기 위해 사용되는 역할에는 다음과 같은 액세스 제어 권한이 필요합니다.
EXECUTE TASK
작업을 실행하기 위한 전역 EXECUTE TASK 권한
USAGE
SQL 문이 실행되는 데이터베이스 및 스키마에 대한 USAGE 권한 및 이러한 예시에서 태스크를 실행하는 웨어하우스에 대한 권한.
CREATE object
테이블, 스트림 및 태스크와 같은 오브젝트를 생성하기 위해 SQL 문이 실행되는 스키마에 대한 다양한
CREATE object
권한.
Snowflake에서의 액세스 제어에 대한 자세한 내용은 액세스 제어의 개요 를 참조하십시오.
일정에 따라 로드된 JSON 데이터 변환하기¶
다음 예에서는 원시 JSON 데이터를 이름이 raw
인 단일 랜딩 테이블에 로드합니다. 두 개의 작업은 raw
테이블에 생성된 테이블 스트림을 쿼리하고 행의 하위 세트를 여러 테이블에 삽입합니다. 각 작업은 테이블 스트림의 변경 데이터 캡처 레코드를 사용하므로 여러 스트림이 필요합니다.
-- Create a landing table to store raw JSON data. -- Snowpipe could load data into this table. create or replace table raw (var variant); -- Create a stream to capture inserts to the landing table. -- A task will consume a set of columns from this stream. create or replace stream rawstream1 on table raw; -- Create a second stream to capture inserts to the landing table. -- A second task will consume another set of columns from this stream. create or replace stream rawstream2 on table raw; -- Create a table that stores the names of office visitors identified in the raw data. create or replace table names (id int, first_name string, last_name string); -- Create a table that stores the visitation dates of office visitors identified in the raw data. create or replace table visits (id int, dt date); -- Create a task that inserts new name records from the rawstream1 stream into the names table -- every minute when the stream contains records. -- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on. create or replace task raw_to_names warehouse = mywh schedule = '1 minute' when system$stream_has_data('rawstream1') as merge into names n using (select var:id id, var:fname fname, var:lname lname from rawstream1) r1 on n.id = to_number(r1.id) when matched then update set n.first_name = r1.fname, n.last_name = r1.lname when not matched then insert (id, first_name, last_name) values (r1.id, r1.fname, r1.lname) ; -- Create another task that merges visitation records from the rawstream2 stream into the visits table -- every minute when the stream contains records. -- Records with new IDs are inserted into the visits table; -- Records with IDs that exist in the visits table update the DT column in the table. -- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on. create or replace task raw_to_visits warehouse = mywh schedule = '1 minute' when system$stream_has_data('rawstream2') as merge into visits v using (select var:id id, var:visit_dt visit_dt from rawstream2) r2 on v.id = to_number(r2.id) when matched then update set v.dt = r2.visit_dt when not matched then insert (id, dt) values (r2.id, r2.visit_dt) ; -- Resume both tasks. alter task raw_to_names resume; alter task raw_to_visits resume; -- Insert a set of records into the landing table. insert into raw select parse_json(column1) from values ('{"id": "123","fname": "Jane","lname": "Smith","visit_dt": "2019-09-17"}'), ('{"id": "456","fname": "Peter","lname": "Williams","visit_dt": "2019-09-17"}'); -- Query the change data capture record in the table streams select * from rawstream1; select * from rawstream2; -- Wait for the tasks to run. -- A tiny buffer is added to the wait time -- because absolute precision in task scheduling is not guaranteed. call system$wait(70); -- Query the table streams again. -- Records should be consumed and no longer visible in streams. -- Verify the records were inserted into the target tables. select * from names; select * from visits; -- Insert another set of records into the landing table. -- The records include both new and existing IDs in the target tables. insert into raw select parse_json(column1) from values ('{"id": "456","fname": "Peter","lname": "Williams","visit_dt": "2019-09-25"}'), ('{"id": "789","fname": "Ana","lname": "Glass","visit_dt": "2019-09-25"}'); -- Wait for the tasks to run. call system$wait(70); -- Records should be consumed and no longer visible in streams. select * from rawstream1; select * from rawstream2; -- Verify the records were inserted into the target tables. select * from names; select * from visits;
일정에 따라 데이터 언로드하기¶
다음 예는 스트림의 변경 데이터 캡처 레코드를 내부(예: Snowflake) 스테이지로 언로드합니다.
-- Use the landing table from the previous example. -- Alternatively, create a landing table. -- Snowpipe could load data into this table. create or replace table raw (id int, type string); -- Create a stream on the table. We will use this stream to feed the unload command. create or replace stream rawstream on table raw; -- Create a task that executes the COPY statement every minute. -- The COPY statement reads from the stream and loads into the table stage for the landing table. -- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on. create or replace task unloadtask warehouse = mywh schedule = '1 minute' when system$stream_has_data('RAWSTREAM') as copy into @%raw/rawstream from rawstream overwrite=true; ; -- Resume the task. alter task unloadtask resume; -- Insert raw data into the landing table. insert into raw values (3,'processed'); -- Query the change data capture record in the table stream select * from rawstream; -- Wait for the tasks to run. -- A tiny buffer is added to the wait time -- because absolute precision in task scheduling is not guaranteed. call system$wait(70); -- Records should be consumed and no longer visible in the stream. select * from rawstream; -- Verify the COPY statement unloaded a data file into the table stage. ls @%raw;
일정에 따라 외부 테이블 메타데이터 새로 고치기¶
다음 예에서는 일정에 따라 이름이 mydb.myschema.exttable
인 외부 테이블의 메타데이터를 새로 고칩니다(ALTER EXTERNAL TABLE … REFRESH 사용).
참고
외부 테이블이 생성될 때 AUTO_REFRESH 매개 변수는 기본적으로 TRUE
로 설정됩니다. Amazon S3 또는 Microsoft Azure 스테이지에서 데이터 파일을 참조하는 외부 테이블에 대해 이 기본값을 수락하는 것이 좋습니다. 그러나 현재 Google Cloud Storage 스테이지를 참조하는 외부 테이블에서는 자동 새로 고침 옵션을 사용할 수 없습니다. 이러한 외부 테이블의 경우 일정에 따라 수동으로 메타데이터를 새로 고치는 것이 유용할 수 있습니다.
-- Create a task that executes an ALTER EXTERNAL TABLE ... REFRESH statement every 5 minutes. -- Replace the 'mywh' warehouse with a warehouse that your role has USAGE privilege on. CREATE TASK exttable_refresh_task WAREHOUSE=mywh SCHEDULE='5 minutes' AS ALTER EXTERNAL TABLE mydb.myschema.exttable REFRESH;