Stream Examples

This topic provides practical examples of use cases for streams on objects.

Streams on Tables

Basic Example

The following example shows how the contents of a stream change as DML statements execute on the source table:

-- Create a table to store the names and fees paid by members of a gym CREATE OR REPLACE TABLE members ( id number(8) NOT NULL, name varchar(255) default NULL, fee number(3) NULL ); -- Create a stream to track changes to date in the MEMBERS table CREATE OR REPLACE STREAM member_check ON TABLE members; -- Create a table to store the dates when gym members joined CREATE OR REPLACE TABLE signup ( id number(8), dt DATE ); INSERT INTO members (id,name,fee) VALUES (1,'Joe',0), (2,'Jane',0), (3,'George',0), (4,'Betty',0), (5,'Sally',0); INSERT INTO signup VALUES (1,'2018-01-01'), (2,'2018-02-15'), (3,'2018-05-01'), (4,'2018-07-16'), (5,'2018-08-21'); -- The stream records the inserted rows SELECT * FROM member_check; +----+--------+-----+-----------------+-------------------+------------------------------------------+ | ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |----+--------+-----+-----------------+-------------------+------------------------------------------| | 1 | Joe | 0 | INSERT | False | d200504bf3049a7d515214408d9a804fd03b46cd | | 2 | Jane | 0 | INSERT | False | d0a551cecbee0f9ad2b8a9e81bcc33b15a525a1e | | 3 | George | 0 | INSERT | False | b98ad609fffdd6f00369485a896c52ca93b92b1f | | 4 | Betty | 0 | INSERT | False | e554e6e68293a51d8e69d68e9b6be991453cc901 | | 5 | Sally | 0 | INSERT | False | c94366cf8a4270cf299b049af68a04401c13976d | +----+--------+-----+-----------------+-------------------+------------------------------------------+ -- Apply a $90 fee to members who joined the gym after a free trial period ended: MERGE INTO members m USING ( SELECT id, dt FROM signup s WHERE DATEDIFF(day, '2018-08-15'::date, s.dt::DATE) < -30) s ON m.id = s.id WHEN MATCHED THEN UPDATE SET m.fee = 90; SELECT * FROM members; +----+--------+-----+ | ID | NAME | FEE | |----+--------+-----| | 1 | Joe | 90 | | 2 | Jane | 90 | | 3 | George | 90 | | 4 | Betty | 0 | | 5 | Sally | 0 | +----+--------+-----+ -- The stream records the updated FEE column as a set of inserts -- rather than deletes and inserts because the stream contents -- have not been consumed yet SELECT * FROM member_check; +----+--------+-----+-----------------+-------------------+------------------------------------------+ | ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |----+--------+-----+-----------------+-------------------+------------------------------------------| | 1 | Joe | 90 | INSERT | False | 957e84b34ef0f3d957470e02bddccb027810892c | | 2 | Jane | 90 | INSERT | False | b00168a4edb9fb399dd5cc015e5f78cbea158956 | | 3 | George | 90 | INSERT | False | 75206259362a7c89126b7cb039371a39d821f76a | | 4 | Betty | 0 | INSERT | False | 9b225bc2612d5e57b775feea01dd04a32ce2ad18 | | 5 | Sally | 0 | INSERT | False | 5a68f6296c975980fbbc569ce01033c192168eca | +----+--------+-----+-----------------+-------------------+------------------------------------------+ -- Create a table to store member details in production CREATE OR REPLACE TABLE members_prod ( id number(8) NOT NULL, name varchar(255) default NULL, fee number(3) NULL ); -- Insert the first batch of stream data into the production table INSERT INTO members_prod(id,name,fee) SELECT id, name, fee FROM member_check WHERE METADATA$ACTION = 'INSERT'; -- The stream position is advanced select * from member_check; +----+------+-----+-----------------+-------------------+-----------------+ | ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |----+------+-----+-----------------+-------------------+-----------------| +----+------+-----+-----------------+-------------------+-----------------+ -- Access and lock the stream BEGIN; -- Increase the fee paid by paying members UPDATE members SET fee = fee + 15 where fee > 0; +------------------------+-------------------------------------+ | number of rows updated | number of multi-joined rows updated | |------------------------+-------------------------------------| | 3 | 0 | +------------------------+-------------------------------------+ -- These changes are not visible because the change interval of the stream object starts at the current offset and ends at the current -- transactional time point, which is the beginning time of the transaction SELECT * FROM member_check; +----+------+-----+-----------------+-------------------+-----------------+ | ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |----+------+-----+-----------------+-------------------+-----------------| +----+------+-----+-----------------+-------------------+-----------------+ -- Commit changes COMMIT; -- The changes surface now because the stream object uses the current transactional time as the end point of the change interval that now -- includes the changes in the source table SELECT * FROM member_check; +----+--------+-----+-----------------+-------------------+------------------------------------------+ | ID | NAME | FEE | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |----+--------+-----+-----------------+-------------------+------------------------------------------| | 1 | Joe | 105 | INSERT | True | 123a45b67cd0e8f012345g01abcdef012345678a | | 2 | Jane | 105 | INSERT | True | 456b45b67cd1e8f123456g01ghijkl123456779b | | 3 | George | 105 | INSERT | True | 567890c89de2f9g765438j20jklmn0234567890d | | 1 | Joe | 90 | DELETE | True | 123a45b67cd0e8f012345g01abcdef012345678a | | 2 | Jane | 90 | DELETE | True | 456b45b67cd1e8f123456g01ghijkl123456779b | | 3 | George | 90 | DELETE | True | 567890c89de2f9g765438j20jklmn0234567890d | +----+--------+-----+-----------------+-------------------+------------------------------------------+ 
Copy

Differences Between Standard and Append-only Streams

The following example shows the differences in behavior between standard (delta) and append-only streams:

-- Create a source table. create or replace table t(id int, name string); -- Create a standard stream on the source table. create or replace stream delta_s on table t; -- Create an append-only stream on the source table. create or replace stream append_only_s on table t append_only=true; -- Insert 3 rows into the source table. insert into t values (0, 'charlie brown'); insert into t values (1, 'lucy'); insert into t values (2, 'linus'); -- Delete 1 of the 3 rows. delete from t where id = '0'; -- The standard stream removes the deleted row. select * from delta_s order by id; +----+-------+-----------------+-------------------+------------------------------------------+ | ID | NAME | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |----+-------+-----------------+-------------------+------------------------------------------| | 1 | lucy | INSERT | False | 7b12c9ee7af9245497a27ac4909e4aa97f126b50 | | 2 | linus | INSERT | False | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 | +----+-------+-----------------+-------------------+------------------------------------------+ -- The append-only stream does not remove the deleted row. select * from append_only_s order by id; +----+---------------+-----------------+-------------------+------------------------------------------+ | ID | NAME | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |----+---------------+-----------------+-------------------+------------------------------------------| | 0 | charlie brown | INSERT | False | e83abf629af50ccf94d1e78c547bfd8079e68d00 | | 1 | lucy | INSERT | False | 7b12c9ee7af9245497a27ac4909e4aa97f126b50 | | 2 | linus | INSERT | False | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 | +----+---------------+-----------------+-------------------+------------------------------------------+ -- Create a table to store the change data capture records in each of the streams. create or replace table t2(id int, name string, stream_type string default NULL); -- Insert the records from the streams into the new table, advancing the offset of each stream. insert into t2(id,name,stream_type) select id, name, 'delta stream' from delta_s; insert into t2(id,name,stream_type) select id, name, 'append_only stream' from append_only_s; -- Update a row in the source table. update t set name = 'sally' where name = 'linus'; -- The standard stream records the update operation. select * from delta_s order by id; +----+-------+-----------------+-------------------+------------------------------------------+ | ID | NAME | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |----+-------+-----------------+-------------------+------------------------------------------| | 2 | sally | INSERT | True | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 | | 2 | linus | DELETE | True | 461cd468d8cc2b0bd11e1e3c0d5f1133ac763d39 | +----+-------+-----------------+-------------------+------------------------------------------+ -- The append-only stream does not record the update operation. select * from append_only_s order by id; +----+------+-----------------+-------------------+-----------------+ | ID | NAME | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |----+------+-----------------+-------------------+-----------------| +----+------+-----------------+-------------------+-----------------+ 
Copy

The following example shows how streams can be used in ELT (extract, load, transform) processes. In this example, new data inserted into a staging table is tracked by a stream. A set of SQL statements transform and insert the stream contents into a set of production tables:

DML Operations in Explicit Transactions

-- Create a staging table that stores raw JSON data CREATE OR REPLACE TABLE data_staging ( raw variant); -- Create a stream on the staging table CREATE OR REPLACE STREAM data_check ON TABLE data_staging; -- Create 2 production tables to store transformed -- JSON data in relational columns CREATE OR REPLACE TABLE data_prod1 ( id number(8), ts TIMESTAMP_TZ ); CREATE OR REPLACE TABLE data_prod2 ( id number(8), color VARCHAR, num NUMBER ); -- Load JSON data into staging table -- using COPY statement, Snowpipe, -- or inserts SELECT * FROM data_staging; +--------------------------------------+ | RAW | |--------------------------------------| | { | | "id": 7077, | | "x1": "2018-08-14T20:57:01-07:00", | | "x2": [ | | { | | "y1": "green", | | "y2": "35" | | } | | ] | | } | | { | | "id": 7078, | | "x1": "2018-08-14T21:07:26-07:00", | | "x2": [ | | { | | "y1": "cyan", | | "y2": "107" | | } | | ] | | } | +--------------------------------------+ -- Stream table shows inserted data SELECT * FROM data_check; +--------------------------------------+-----------------+-------------------+------------------------------------------+ | RAW | METADATA$ACTION | METADATA$ISUPDATE | METADATA$ROW_ID | |--------------------------------------+-----------------+-------------------|------------------------------------------| | { | INSERT | False | 789012e01ef4j3k890123k35mnopqr567890124j | | "id": 7077, | | | | | "x1": "2018-08-14T20:57:01-07:00", | | | | | "x2": [ | | | | | { | | | | | "y1": "green", | | | | | "y2": "35" | | | | | } | | | | | ] | | | | | } | | | | | { | INSERT | False | 765432u89tk3l6y456789012rst7vx678912456k | | "id": 7078, | | | | | "x1": "2018-08-14T21:07:26-07:00", | | | | | "x2": [ | | | | | { | | | | | "y1": "cyan", | | | | | "y2": "107" | | | | | } | | | | | ] | | | | | } | | | | +--------------------------------------+-----------------+-------------------+------------------------------------------+ -- Access and lock the stream BEGIN; -- Transform and copy JSON elements into relational columns -- in the production tables INSERT INTO data_prod1 (id, ts) SELECT t.raw:id, to_timestamp_tz(t.raw:x1) FROM data_check t WHERE METADATA$ACTION = 'INSERT'; INSERT INTO data_prod2 (id, color, num) SELECT t.raw:id, f.value:y1, f.value:y2 FROM data_check t , lateral flatten(input => raw:x2) f WHERE METADATA$ACTION = 'INSERT'; -- Commit changes in the stream objects participating in the transaction COMMIT; SELECT * FROM data_prod1; +------+---------------------------+ | ID | TS | |------+---------------------------| | 7077 | 2018-08-14 20:57:01 -0700 | | 7078 | 2018-08-14 21:07:26 -0700 | +------+---------------------------+ SELECT * FROM data_prod2; +------+-------+-----+ | ID | COLOR | NUM | |------+-------+-----| | 7077 | green | 35 | | 7078 | cyan | 107 | +------+-------+-----+ SELECT * FROM data_check; +-----+-----------------+-------------------+ | RAW | METADATA$ACTION | METADATA$ISUPDATE | |-----+-----------------+-------------------| +-----+-----------------+-------------------+ 
Copy

Streams on Views

Stream on a View with Multi-table Joins

-- Create multiple tables with matching column values. CREATE TABLE birds ( id number, common varchar(100), class varchar(100) ); CREATE TABLE sightings ( d date, loc varchar(100), b_id number, c number ); -- Create a view that queries the tables with a join. CREATE VIEW bird_sightings AS SELECT b.id AS id, b.common AS common_name, b.class AS classification, s.d AS date, s.loc AS location, s.c AS count FROM birds b INNER JOIN sightings s ON b.id = s.b_id; -- Create a stream on the view. CREATE STREAM bird_sightings_s ON VIEW bird_sightings; -- Insert values into the tables. INSERT INTO birds VALUES (1,'Scarlet Tanager','P. olivacea'), (14,'Mallard','A. platyrhynchos'), (48,'Spotted Sandpiper','A. macularius'), (92,'Great Blue Heron','A. herodias'); INSERT INTO sightings VALUES (current_date(),'Gibson Island',1,4), (current_date(),'Lake Los Pajaro',14,12), (current_date(),'Lake Los Pajaro',92,12), (current_date(),'Gibson Island',14,21), (current_date(),'Gibson Island',92,5); -- Query the stream. -- The stream displays a record for each row added to the view. SELECT * FROM bird_sightings_s; +----+------------------+------------------+------------+-----------------+-------+------------------------------------------+-----------------+-------------------+ | ID | COMMON_NAME | CLASSIFICATION | DATE | LOCATION | COUNT | METADATA$ROW_ID | METADATA$ACTION | METADATA$ISUPDATE | |----+------------------+------------------+------------+-----------------+-------+------------------------------------------+-----------------+-------------------| | 1 | Scarlet Tanager | P. olivacea | 2021-09-07 | Gibson Island | 4 | a2522b47726ac2a922104c8e2f668d065ff6fcd0 | INSERT | False | | 14 | Mallard | A. platyrhynchos | 2021-09-07 | Lake Los Pajaro | 12 | fceb4ad5cb6d2df2865d0f572b8a2aa98f240b70 | INSERT | False | | 92 | Great Blue Heron | A. herodias | 2021-09-07 | Lake Los Pajaro | 12 | 0db99176fe8bd50749b2b48fb2befab416ff9272 | INSERT | False | | 14 | Mallard | A. platyrhynchos | 2021-09-07 | Gibson Island | 21 | 2e94ef3a33e52ba5de5d816dc41c60fedf9cb1eb | INSERT | False | | 92 | Great Blue Heron | A. herodias | 2021-09-07 | Gibson Island | 5 | a1df477ac8e388e1cf0ada77e9097c6effa346a7 | INSERT | False | +----+------------------+------------------+------------+-----------------+-------+------------------------------------------+-----------------+-------------------+ -- Consume the stream records in a DML statement (INSERT, MERGE, etc.). -- Query the stream. -- The stream is empty. +----+-------------+----------------+------+----------+-------+-----------------+-----------------+-------------------+ | ID | COMMON_NAME | CLASSIFICATION | DATE | LOCATION | COUNT | METADATA$ROW_ID | METADATA$ACTION | METADATA$ISUPDATE | |----+-------------+----------------+------+----------+-------+-----------------+-----------------+-------------------| +----+-------------+----------------+------+----------+-------+-----------------+-----------------+-------------------+ -- Delete a row from the birds table. DELETE FROM birds WHERE id = 14; -- Query the stream. -- The stream displays two records for the single DELETE operation. SELECT * FROM bird_sightings_s; +----+-------------+------------------+------------+-----------------+-------+------------------------------------------+-----------------+-------------------+ | ID | COMMON_NAME | CLASSIFICATION | DATE | LOCATION | COUNT | METADATA$ROW_ID | METADATA$ACTION | METADATA$ISUPDATE | |----+-------------+------------------+------------+-----------------+-------+------------------------------------------+-----------------+-------------------| | 14 | Mallard | A. platyrhynchos | 2021-09-07 | Lake Los Pajaro | 12 | 83c22ff4be80d65a2e9776df0e35b22079cb4430 | DELETE | False | | 14 | Mallard | A. platyrhynchos | 2021-09-07 | Gibson Island | 21 | e29cfae8c3c7d261ed903c2303f61e4d49c01ba1 | DELETE | False | +----+-------------+------------------+------------+-----------------+-------+------------------------------------------+-----------------+-------------------+ 
Copy

Stream on a View That Calls a Non-deterministic SQL Function

-- Create a table. CREATE TABLE ndf ( c1 number ); -- Create a view that queries the table and -- also returns the CURRENT_USER and CURRENT_TIMESTAMP values -- for the query transaction. CREATE VIEW ndf_v AS SELECT CURRENT_USER() AS u, CURRENT_TIMESTAMP() AS ts, c1 AS num FROM ndf; -- Create a stream on the view. CREATE STREAM ndf_s ON VIEW ndf_v; -- User peter inserts rows into table ndf. INSERT INTO ndf VALUES (1), (2), (3); -- User marie inserts rows into table ndf. INSERT INTO ndf VALUES (4), (5), (6); -- User PETER queries the stream. -- The stream returns the username for the user. -- The stream also returns the current timestamp for the query transaction in each row, -- NOT the timestamp when each row was inserted. SELECT * FROM ndf_s; +-------+-------------------------------+-----+-----------------+------------------------------------------+ | U | TS | NUM | METADATA$ACTION | METADATA$ROW_ID | |-------+-------------------------------+-----+-----------------+------------------------------------------| | PETER | 2021-08-16 11:56:33.778 -0700 | 1 | INSERT | d200504bf3049a7d515214408d9a804fd03b46cd | | PETER | 2021-08-16 11:56:33.778 -0700 | 2 | INSERT | d0a551cecbee0f9ad2b8a9e81bcc33b15a525a1e | | PETER | 2021-08-16 11:56:33.778 -0700 | 3 | INSERT | b98ad609fffdd6f00369485a896c52ca93b92b1f | | PETER | 2021-08-16 11:56:33.778 -0700 | 4 | INSERT | 62d34abc3fac85c037fb9f47f7758f08d025d9ed | | PETER | 2021-08-16 11:56:33.778 -0700 | 5 | INSERT | e554e6e68293a51d8e69d68e9b6be991453cc901 | | PETER | 2021-08-16 11:56:33.778 -0700 | 6 | INSERT | f6fa32c498a28b2349d2c6f6be55c30eb1d5310f | +-------+-------------------------------+-----+-----------------+------------------------------------------+ -- User MARIE queries the stream. -- The stream returns the username for the user -- and the current timestamp for the query transaction in each row. SELECT * FROM ndf_s; +-------+-------------------------------+-----+-----------------+------------------------------------------+ | U | TS | NUM | METADATA$ACTION | METADATA$ROW_ID | |-------+-------------------------------+-----+-----------------+------------------------------------------| | MARIE | 2021-08-16 12:04:21.768 -0700 | 1 | INSERT | d200504bf3049a7d515214408d9a804fd03b46cd | | MARIE | 2021-08-16 12:04:21.768 -0700 | 2 | INSERT | d0a551cecbee0f9ad2b8a9e81bcc33b15a525a1e | | MARIE | 2021-08-16 12:04:21.768 -0700 | 3 | INSERT | b98ad609fffdd6f00369485a896c52ca93b92b1f | | MARIE | 2021-08-16 12:04:21.768 -0700 | 4 | INSERT | 62d34abc3fac85c037fb9f47f7758f08d025d9ed | | MARIE | 2021-08-16 12:04:21.768 -0700 | 5 | INSERT | e554e6e68293a51d8e69d68e9b6be991453cc901 | | MARIE | 2021-08-16 12:04:21.768 -0700 | 6 | INSERT | f6fa32c498a28b2349d2c6f6be55c30eb1d5310f | +-------+-------------------------------+-----+-----------------+------------------------------------------+ 
Copy