sources/new-scheduled-tasks/new-scheduled-tasks.mjs
import pipedream from "../../pipedream.app.mjs"; import sampleEmit from "./test-event.mjs"; import { uuid } from "uuidv4"; export default { key: "pipedream-new-scheduled-tasks", name: "New Scheduled Tasks", type: "source", description: "Exposes an HTTP API for scheduling messages to be emitted at a future time", version: "0.3.1", dedupe: "unique", props: { pipedream, secret: { type: "string", secret: true, label: "Secret", optional: true, description: "**Optional but recommended**: if you enter a secret here, you must pass this value in the `x-pd-secret` HTTP header when making requests", }, http: { label: "Endpoint", description: "The endpoint where you'll send task scheduler requests", type: "$.interface.http", customResponse: true, }, db: "$.service.db", }, methods: { selfChannel() { return "self"; }, queuedEventsChannel() { return "$in"; }, httpRespond({ status, body, }) { this.http.respond({ headers: { "content-type": "application/json", }, status, body, }); }, async selfSubscribe() { const isSubscribedToSelf = this.db.get("isSubscribedToSelf"); if (!isSubscribedToSelf) { const componentId = process.env.PD_COMPONENT; const selfChannel = this.selfChannel(); console.log(`Subscribing to ${selfChannel} channel for event source`); console.log( await this.pipedream.subscribe(componentId, componentId, selfChannel), ); this.db.set("isSubscribedToSelf", true); } }, validateEventBody(event, operation) { const errors = []; if (this.secret && event.headers["x-pd-secret"] !== this.secret) { errors.push( "Secret on incoming request doesn't match the configured secret", ); } if (operation === "schedule") { const { timestamp, message, } = event.body; const epoch = Date.parse(timestamp); if (!timestamp) { errors.push( "No timestamp included in payload. Please provide an ISO8601 timestamp in the 'timestamp' field", ); } if (timestamp && !epoch) { errors.push("Timestamp isn't a valid ISO 8601 string"); } if (!message) { errors.push("No message passed in payload"); } } return errors; }, scheduleTask(event) { const errors = this.validateEventBody(event, "schedule"); let status, body; if (errors.length) { console.log(errors); status = 400; body = { errors, }; } else { const id = this.emitScheduleEvent(event.body, event.body.timestamp); status = 200; body = { msg: "Successfully scheduled task", id, }; } this.httpRespond({ status, body, }); }, emitScheduleEvent(event, timestamp) { const selfChannel = this.selfChannel(); const epoch = Date.parse(timestamp); const $id = uuid(); console.log(`Scheduled event to emit on: ${new Date(epoch)}`); this.$emit( { ...event, $channel: selfChannel, $id, }, { name: selfChannel, id: $id, delivery_ts: epoch, }, ); return $id; }, async cancelTask(event) { const errors = this.validateEventBody(event, "cancel"); let status, msg; if (errors.length) { console.log(errors); status = 400; msg = "Secret on incoming request doesn't match the configured secret"; } else { try { const id = event.body.id; const isCanceled = await this.deleteEvent(event); if (isCanceled) { status = 200; msg = `Cancelled scheduled task for event ${id}`; } else { status = 404; msg = `No event with ${id} found`; } } catch (error) { console.log(error); status = 500; msg = "Failed to schedule task. Please see the logs"; } } this.httpRespond({ status, body: { msg, }, }); }, async deleteEvent(event) { const componentId = process.env.PD_COMPONENT; const inChannel = this.queuedEventsChannel(); const { id } = event.body; const events = await this.pipedream.listEvents( componentId, inChannel, ); console.log("Events: ", events); const eventToCancel = events.data.find((e) => { const { metadata } = e; return metadata.id === id; }); console.log("Event to cancel: ", eventToCancel); if (!eventToCancel) { console.log(`No event with ${id} found`); return false; } await this.pipedream.deleteEvent( componentId, eventToCancel.id, inChannel, ); return true; }, emitEvent(event, summary) { const id = event.$id; delete event.$channel; delete event.$id; this.$emit(event, { summary: summary ?? JSON.stringify(event), id, ts: +new Date(), }); }, }, async run(event) { await this.selfSubscribe(); const { path } = event; if (path === "/schedule") { this.scheduleTask(event); } else if (path === "/cancel") { await this.cancelTask(event); } else if (event.$channel === this.selfChannel()) { this.emitEvent(event); } }, sampleEmit, };