sources/new-contact/new-contact.mjs
import orderBy from "lodash/orderBy.js"; import common from "../common/timer-based.mjs"; export default { ...common, key: "sendgrid-new-contact", name: "New Contact", description: "Emit new event when a new contact is created", version: "0.0.8", type: "source", dedupe: "unique", hooks: { async activate() { const currentTimestamp = Date.now(); const state = { processedItems: [], lowerTimestamp: currentTimestamp, upperTimestamp: currentTimestamp, }; this.db.set("state", state); }, }, methods: { ...common.methods, _maxDelayTime() { return 30 * 60 * 1000; }, _addDelayOffset(timestamp) { return timestamp - this._maxDelayTime(); }, _cleanupOldProcessedItems(processedItems, currentTimestamp) { return processedItems .map((item) => ({ id: item.id, created_at: item.created_at, })) .filter((item) => { const { created_at: createdAt } = item; const createdAtTimestamp = Date.parse(createdAt); const cutoffTimestamp = this._addDelayOffset(currentTimestamp); return createdAtTimestamp > cutoffTimestamp; }); }, _makeSearchQuery(processedItems, lowerTimestamp, upperTimestamp) { const idList = processedItems .map((item) => item.id) .map((id) => `'${id}'`) .join(", ") || "''"; const startTimestamp = this._addDelayOffset(lowerTimestamp); const startDate = this.toISOString(startTimestamp); const endDate = this.toISOString(upperTimestamp); return ` contact_id NOT IN (${idList}) AND created_at BETWEEN TIMESTAMP '${startDate}' AND TIMESTAMP '${endDate}' `; }, generateMeta(data) { const { item, eventTimestamp: ts, } = data; const { id, email, } = item; const slugifiedEmail = this.slugifyEmail(email); const summary = `New contact: ${slugifiedEmail}`; return { id, summary, ts, }; }, async processEvent(event) { const eventTimestamp = event.timestamp * 1000; const { processedItems, lowerTimestamp, upperTimestamp, } = this.db.get("state"); const query = this._makeSearchQuery(processedItems, lowerTimestamp, upperTimestamp); const { result: items, contact_count: contactCount, } = await this.sendgrid.searchContacts(query); if (contactCount === 0) { const newState = { processedItems: this._cleanupOldProcessedItems(processedItems, lowerTimestamp), lowerTimestamp: upperTimestamp, upperTimestamp: eventTimestamp, }; this.db.set("state", newState); return; } const itemsToProcess = orderBy(items, "created_at"); itemsToProcess .forEach((item) => { const meta = this.generateMeta({ item, eventTimestamp, }); this.$emit(item, meta); }); const newLowerTimestamp = Math.max( lowerTimestamp, Date.parse(itemsToProcess[0].created_at), ); const newUpperTimestamp = contactCount < 100 ? eventTimestamp : upperTimestamp; const newProcessedItems = this._cleanupOldProcessedItems( [ ...processedItems, ...itemsToProcess, ], newLowerTimestamp, ); const newState = { processedItems: newProcessedItems, lowerTimestamp: newLowerTimestamp, upperTimestamp: newUpperTimestamp, }; this.db.set("state", newState); }, }, };