sources/changes-to-collection/changes-to-collection.mjs
import fauna from "../../faunadb.app.mjs"; import _ from "lodash"; import { DEFAULT_POLLING_SOURCE_TIMER_INTERVAL } from "@pipedream/platform"; export default { type: "source", key: "faunadb-changes-to-collection", name: "New or Removed Documents in a Collection", description: "Emit new event each time you add or remove a document from a specific collection, with the details of the document.", version: "0.0.8", dedupe: "unique", props: { timer: { type: "$.interface.timer", default: { intervalSeconds: DEFAULT_POLLING_SOURCE_TIMER_INTERVAL, }, }, db: "$.service.db", fauna, collection: { propDefinition: [ fauna, "collections", ], }, emitEventsInBatch: { type: "boolean", label: "Emit changes as a single event", description: "If `true`, all events are emitted as an array, within a single Pipedream event. Defaults to `false`, emitting each event in Fauna as its own event in Pipedream", optional: true, default: false, }, }, async run() { const ts = +new Date() * 1000; const cursor = this.db.get("cursor") || ts; const events = await this.fauna.getEventsInCollectionAfterTs( this.collection, cursor, ); if (!events.length) { console.log(`No new events in collection ${this.collection}`); this.db.set("cursor", ts); return; } console.log(`${events.length} new events in collection ${this.collection}`); if (this.emitEventsInBatch) { this.$emit({ events, }, { summary: `${events.length} new event${events.length > 1 ? "s" : ""}`, id: cursor, }); } else { for (const event of events) { this.$emit(event, { summary: `${event.action.toUpperCase()} - ${event.document.id}`, id: `${event.action}-${event.document.id}`, }); } } const maxEventTs = _.maxBy(events, (event) => event.ts).ts + 1; this.db.set("cursor", maxEventTs); }, };