sources/new-records-returned-by-cloudwatch-logs-insights-query/new-records-returned-by-cloudwatch-logs-insights-query.mjs
import common from "../../common/common-cloudwatch-logs.mjs"; import { DEFAULT_POLLING_SOURCE_TIMER_INTERVAL } from "@pipedream/platform"; export default { ...common, key: "aws-new-records-returned-by-cloudwatch-logs-insights-query", name: "New Records Returned by CloudWatch Logs Insights Query", description: "Executes a CloudWatch Logs Insights query on a schedule, and emits the records as invidual events (default) or in batch", version: "0.2.3", type: "source", props: { aws: common.props.aws, region: common.props.region, db: "$.service.db", logGroupNames: { ...common.props.logGroupName, type: "string[]", description: "The log groups you'd like to query", }, queryString: { label: "Logs Insights Query", description: "The query you'd like to run. See [this AWS doc](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_QuerySyntax.html) for help with query syntax", type: "string", }, emitResultsInBatch: { type: "boolean", label: "Emit query results as a single event", description: "If `true`, all events are emitted as an array, within a single Pipedream event. If `false`, each row of results is emitted as its own event. Defaults to `true`", optional: true, default: true, }, timer: { label: "Polling schedule", description: "How often you want to query CloudWatch Logs Insights for results", type: "$.interface.timer", default: { intervalSeconds: DEFAULT_POLLING_SOURCE_TIMER_INTERVAL, }, }, }, async run() { const now = +new Date(); const startTime = this.db.get("startTime") || now - 1000 * 60 * 60; const params = { queryString: this.queryString, startTime, endTime: now, logGroupNames: this.logGroupNames, }; const { queryId } = await this.startQuery(params); async function sleep(ms) { return new Promise((r) => setTimeout(r, ms)); } let result, res; do { await sleep(1000); res = await this.getQueryResults({ queryId, }); result = res.status; } while (result === "Running" || result === "Scheduled"); if (result !== "Complete") { throw new Error(`Query ${queryId} failed with status ${result}`); } console.log(JSON.stringify(res, null, 2)); const { results } = res; if (!results || !results.length) { console.log("No results, exiting"); this.db.set("startTime", now); return; } if (this.emitResultsInBatch === true) { this.$emit(results, { summary: JSON.stringify(results), }); } else { for (const item of results) { this.$emit(item, { summary: JSON.stringify(item), }); } } this.db.set("startTime", now); }, };