sources/bigquery-new-row/bigquery-new-row.mjs
import crypto from "crypto"; import { isString } from "lodash-es"; import googleCloud from "../../google_cloud.app.mjs"; import common from "../common/bigquery.mjs"; export default { ...common, key: "google_cloud-bigquery-new-row", name: "BigQuery - New Row", description: "Emit new events when a new row is added to a table", version: "0.1.9", dedupe: "unique", type: "source", props: { ...common.props, tableId: { propDefinition: [ googleCloud, "tableId", ({ datasetId }) => ({ datasetId, }), ], }, uniqueKey: { type: "string", label: "Unique Key", description: "The name of a column in the table to use for deduplication. See [the docs](https://github.com/PipedreamHQ/pipedream/tree/master/components/google_cloud/sources/bigquery-new-row#technical-details) for more info.", async options(context) { const { page } = context; if (page !== 0) { return []; } const columnNames = await this._getColumnNames(); return columnNames.sort(); }, }, }, hooks: { ...common.hooks, async deploy() { await this._validateColumn(this.uniqueKey); const lastResultId = await this._getIdOfLastRow(this.getInitialEventCount()); this._setLastResultId(lastResultId); }, async activate() { if (this._getLastResultId()) { return; } await this._validateColumn(this.uniqueKey); const lastResultId = await this._getIdOfLastRow(); this._setLastResultId(lastResultId); }, }, methods: { ...common.methods, _getLastResultId() { return this.db.get("lastResultId"); }, _setLastResultId(lastResultId) { this.db.set("lastResultId", lastResultId); console.log(` Next scan of table '${this.tableId}' will start at ${this.uniqueKey}=${lastResultId} `); }, async _validateColumn(columnNameToValidate) { if (!isString(columnNameToValidate)) { throw new Error("columnNameToValidate must be a string"); } const columnNames = await this._getColumnNames(); if (!columnNames.includes(columnNameToValidate)) { throw new Error(`Nonexistent column: ${columnNameToValidate}`); } }, async _getColumnNames() { const table = this.googleCloud .getBigQueryClient() .dataset(this.datasetId) .table(this.tableId); const [ metadata, ] = await table.getMetadata(); const { fields } = metadata.schema; return fields.map(({ name }) => name); }, async _getIdOfLastRow(offset = 0) { const limit = offset + 1; const query = ` SELECT * FROM \`${this.tableId}\` ORDER BY \`${this.uniqueKey}\` DESC LIMIT @limit `; const queryOpts = { query, params: { limit, }, }; const client = this.googleCloud .getBigQueryClient() .dataset(this.datasetId); const [ job, ] = await client.createQueryJob(queryOpts); const [ rows, ] = await job.getQueryResults(); if (rows.length === 0) { console.log(` No records found in the target table, will start scanning from the beginning `); return; } const startingRow = rows.pop(); return startingRow[this.uniqueKey]; }, getQueryOpts() { const lastResultId = this._getLastResultId(); let query = `SELECT * FROM \`${this.tableId}\``; if (lastResultId) { query += ` WHERE \`${this.uniqueKey}\` >= @lastResultId`; } query += ` ORDER BY \`${this.uniqueKey}\` DESC`; const params = lastResultId ? { lastResultId, } : {}; return { query, params, }; }, generateMeta(row, ts) { const id = row[this.uniqueKey]; const summary = `New row: ${id}`; return { id, summary, ts, }; }, generateMetaForCollection(rows, ts) { const hash = crypto.createHash("sha1"); rows .map((i) => i[this.uniqueKey]) .map((i) => i.toString()) .forEach((i) => hash.update(i)); const id = hash.digest("base64"); const rowCount = rows.length; const entity = rowCount === 1 ? "row" : "rows"; const summary = `${rowCount} new ${entity}`; return { id, summary, ts, }; }, }, };