sources/new-document/new-document.mjs
import common from "../common/base.mjs"; import { Timestamp } from "mongodb"; export default { ...common, key: "mongodb-new-document", name: "New Document", description: "Emit new an event when a new document is added to a collection", version: "0.0.12", type: "source", dedupe: "unique", props: { ...common.props, database: { propDefinition: [ common.props.mongodb, "database", ], }, collection: { propDefinition: [ common.props.mongodb, "collection", (c) => ({ database: c.database, }), ], }, timestampField: { type: "string", label: "Timestamp Field", description: "The key of a timestamp field, such as 'created_at' that is set to the current timestamp when a document is created.", }, timestampFieldType: { type: "string", label: "Timestamp Field Type", description: "The type of the timestamp field", default: "Timestamp", options: [ "Timestamp", "Integer", "ISO8601", ], }, }, hooks: { async deploy() { const client = await this.mongodb.getClient(); await this.processEvent(client, Date.now(), 25); await client.close(); }, }, methods: { ...common.methods, _getLastTs() { return this.db.get("lastTs"); }, _setLastTs(lastTs) { this.db.set("lastTs", lastTs); }, makeTs(timestamp) { if (this.timestampFieldType === "Integer") { return timestamp; } if (this.timestampFieldType === "Timestamp") { return this.convertToTimestamp(timestamp); } if (this.timestampFieldType === "ISO8601") { return new Date(timestamp); } return timestamp; }, getTs(timestamp) { if (this.timestampFieldType === "Integer") { return timestamp; } if (this.timestampFieldType === "Timestamp") { return timestamp.getHighBits() * 1_000; } if (this.timestampFieldType === "ISO8601") { return new Date(timestamp).getTime(); } return timestamp; }, convertToTimestamp(timestamp) { const bigIntValue = BigInt(timestamp.toString()); const seconds = Number(bigIntValue >> 32n); const increment = Number(bigIntValue & 0xFFFFFFFFn); return new Timestamp({ t: seconds, i: increment, }); }, async processEvent(client, eventTs, max) { const lastTs = this._getLastTs() || 0; let maxTs = lastTs; let count = 0; const collection = this.mongodb.getCollection(client, this.database, this.collection); const sort = { [this.timestampField]: -1, }; const query = { [this.timestampField]: { $gt: this.makeTs(lastTs), }, }; const documents = await collection .find(query) .sort(sort) .toArray(); const docs = []; for (const doc of documents) { const ts = this.getTs(doc[this.timestampField]); if (!(ts > lastTs) || (max && count >= max)) { break; } if (ts > maxTs) { maxTs = ts; } docs.push(doc); count++; } docs.reverse().forEach((doc) => this.emitEvent(doc, eventTs)); this._setLastTs(maxTs); }, generateMeta({ _id: id }, ts) { return { id, summary: `New Document ID ${JSON.stringify(id)}`, ts, }; }, }, };