sources/redshift-updated-row/redshift-updated-row.mjs
import common from "../common/redshift-polling.mjs"; export default { ...common, key: "aws-redshift-updated-row", name: "Redshift - Updated Row", description: "Emit new event when a row is updated, based on a selected timestamp column. [See the documentation](https://docs.aws.amazon.com/redshift/latest/APIReference/API_ExecuteStatement.html)", type: "source", version: "0.0.1", dedupe: "unique", props: { ...common.props, info: { propDefinition: [ common.props.app, "info", ], }, timestampColumn: { label: "Timestamp Column", description: "A column with a timestamp that is updated when a row is modified.", propDefinition: [ common.props.app, "uniqueColumn", ({ region, database, workgroupName, schema, table, }) => ({ region, database, workgroupName, schema, table, filter: (column) => column.typeName === "timestamp", }), ], }, }, async run() { const { region, workgroupName, database, schema, table, timestampColumn, getValue, } = this; const lastTimestamp = this.db.get("lastTimestamp"); let sql = `SELECT * FROM ${schema}.${table}`; if (lastTimestamp) { sql += ` WHERE ${timestampColumn} > :lastTimestamp`; } sql += ` ORDER BY ${timestampColumn} DESC LIMIT 100`; const parameters = lastTimestamp ? [ { name: "lastTimestamp", value: lastTimestamp, }, ] : undefined; const response = await this.app.executeStatement({ region, workgroupName, database, sql, parameters, }); if (response?.Records?.length > 0) { const columnNames = response.ColumnMetadata.map((c) => c.name); const records = response.Records.map((record) => record.reduce((row, field, index) => ({ ...row, [columnNames[index]]: getValue(field), }), {})); records.reverse().forEach((record) => { const id = record[timestampColumn]; const ts = new Date(record[timestampColumn]).getTime(); this.$emit(record, { id, ts, summary: `Row Updated ${id}`, }); }); if (records?.length > 0) { const [ newLastTimestamp, ] = records.map((record) => record[timestampColumn]).reverse(); this.db.set("lastTimestamp", newLastTimestamp); } } }, };