|
| 1 | +/* |
| 2 | +For processing data sent to Firehose by Cloudwatch Logs subscription filters. |
| 3 | +
|
| 4 | +Cloudwatch Logs sends to Firehose records that look like this: |
| 5 | +
|
| 6 | +{ |
| 7 | + "messageType": "DATA_MESSAGE", |
| 8 | + "owner": "123456789012", |
| 9 | + "logGroup": "log_group_name", |
| 10 | + "logStream": "log_stream_name", |
| 11 | + "subscriptionFilters": [ |
| 12 | + "subscription_filter_name" |
| 13 | + ], |
| 14 | + "logEvents": [ |
| 15 | + { |
| 16 | + "id": "01234567890123456789012345678901234567890123456789012345", |
| 17 | + "timestamp": 1510109208016, |
| 18 | + "message": "log message 1" |
| 19 | + }, |
| 20 | + { |
| 21 | + "id": "01234567890123456789012345678901234567890123456789012345", |
| 22 | + "timestamp": 1510109208017, |
| 23 | + "message": "log message 2" |
| 24 | + } |
| 25 | + ... |
| 26 | + ] |
| 27 | +} |
| 28 | +
|
| 29 | +The data is additionally compressed with GZIP. |
| 30 | +
|
| 31 | +The code below will: |
| 32 | +
|
| 33 | +1) Gunzip the data |
| 34 | +2) Parse the json |
| 35 | +3) Set the result to ProcessingFailed for any record whose messageType is not DATA_MESSAGE, thus redirecting them to the |
| 36 | + processing error output. Such records do not contain any log events. You can modify the code to set the result to |
| 37 | + Dropped instead to get rid of these records completely. |
| 38 | +4) For records whose messageType is DATA_MESSAGE, extract the individual log events from the logEvents field, and pass |
| 39 | + each one to the transformLogEvent method. You can modify the transformLogEvent method to perform custom |
| 40 | + transformations on the log events. |
| 41 | +5) Concatenate the result from (4) together and set the result as the data of the record returned to Firehose. Note that |
| 42 | + this step will not add any delimiters. Delimiters should be appended by the logic within the transformLogEvent |
| 43 | + method. |
| 44 | +6) Any additional records which exceed 6MB will be re-ingested back into Firehose. |
| 45 | +*/ |
| 46 | +const zlib = require('zlib'); |
| 47 | +const AWS = require('aws-sdk'); |
| 48 | + |
| 49 | +/** |
| 50 | + * logEvent has this format: |
| 51 | + * |
| 52 | + * { |
| 53 | + * "id": "01234567890123456789012345678901234567890123456789012345", |
| 54 | + * "timestamp": 1510109208016, |
| 55 | + * "message": "log message 1" |
| 56 | + * } |
| 57 | + * |
| 58 | + * The default implementation below just extracts the message and appends a newline to it. |
| 59 | + * |
| 60 | + * The result must be returned in a Promise. |
| 61 | + */ |
| 62 | +function transformLogEvent(logEvent) { |
| 63 | + return Promise.resolve(`${logEvent.message}\n`); |
| 64 | +} |
| 65 | + |
| 66 | +function putRecordsToFirehoseStream(streamName, records, client, resolve, reject, attemptsMade, maxAttempts) { |
| 67 | + client.putRecordBatch({ |
| 68 | + DeliveryStreamName: streamName, |
| 69 | + Records: records, |
| 70 | + }, (err, data) => { |
| 71 | + const codes = []; |
| 72 | + let failed = []; |
| 73 | + let errMsg = err; |
| 74 | + |
| 75 | + if (err) { |
| 76 | + failed = records; |
| 77 | + } else { |
| 78 | + for (let i = 0; i < data.RequestResponses.length; i++) { |
| 79 | + const code = data.RequestResponses[i].ErrorCode; |
| 80 | + if (code) { |
| 81 | + codes.push(code); |
| 82 | + failed.push(records[i]); |
| 83 | + } |
| 84 | + } |
| 85 | + errMsg = `Individual error codes: ${codes}`; |
| 86 | + } |
| 87 | + |
| 88 | + if (failed.length > 0) { |
| 89 | + if (attemptsMade + 1 < maxAttempts) { |
| 90 | + console.log('Some records failed while calling PutRecordBatch, retrying. %s', errMsg); |
| 91 | + putRecordsToFirehoseStream(streamName, failed, client, resolve, reject, attemptsMade + 1, maxAttempts); |
| 92 | + } else { |
| 93 | + reject(`Could not put records after ${maxAttempts} attempts. ${errMsg}`); |
| 94 | + } |
| 95 | + } else { |
| 96 | + resolve(''); |
| 97 | + } |
| 98 | + }); |
| 99 | +} |
| 100 | + |
| 101 | +function putRecordsToKinesisStream(streamName, records, client, resolve, reject, attemptsMade, maxAttempts) { |
| 102 | + client.putRecords({ |
| 103 | + StreamName: streamName, |
| 104 | + Records: records, |
| 105 | + }, (err, data) => { |
| 106 | + const codes = []; |
| 107 | + let failed = []; |
| 108 | + let errMsg = err; |
| 109 | + |
| 110 | + if (err) { |
| 111 | + failed = records; |
| 112 | + } else { |
| 113 | + for (let i = 0; i < data.Records.length; i++) { |
| 114 | + const code = data.Records[i].ErrorCode; |
| 115 | + if (code) { |
| 116 | + codes.push(code); |
| 117 | + failed.push(records[i]); |
| 118 | + } |
| 119 | + } |
| 120 | + errMsg = `Individual error codes: ${codes}`; |
| 121 | + } |
| 122 | + |
| 123 | + if (failed.length > 0) { |
| 124 | + if (attemptsMade + 1 < maxAttempts) { |
| 125 | + console.log('Some records failed while calling PutRecords, retrying. %s', errMsg); |
| 126 | + putRecordsToKinesisStream(streamName, failed, client, resolve, reject, attemptsMade + 1, maxAttempts); |
| 127 | + } else { |
| 128 | + reject(`Could not put records after ${maxAttempts} attempts. ${errMsg}`); |
| 129 | + } |
| 130 | + } else { |
| 131 | + resolve(''); |
| 132 | + } |
| 133 | + }); |
| 134 | +} |
| 135 | + |
| 136 | +function createReingestionRecord(isSas, originalRecord) { |
| 137 | + if (isSas) { |
| 138 | + return { |
| 139 | + Data: Buffer.from(originalRecord.data, 'base64'), |
| 140 | + PartitionKey: originalRecord.kinesisRecordMetadata.partitionKey, |
| 141 | + }; |
| 142 | + } else { |
| 143 | + return { |
| 144 | + Data: Buffer.from(originalRecord.data, 'base64'), |
| 145 | + }; |
| 146 | + } |
| 147 | +} |
| 148 | + |
| 149 | + |
| 150 | +function getReingestionRecord(isSas, reIngestionRecord) { |
| 151 | + if (isSas) { |
| 152 | + return { |
| 153 | + Data: reIngestionRecord.Data, |
| 154 | + PartitionKey: reIngestionRecord.PartitionKey, |
| 155 | + }; |
| 156 | + } else { |
| 157 | + return { |
| 158 | + Data: reIngestionRecord.Data, |
| 159 | + }; |
| 160 | + } |
| 161 | +} |
| 162 | + |
| 163 | +exports.handler = (event, context, callback) => { |
| 164 | + Promise.all(event.records.map(r => { |
| 165 | + const buffer = Buffer.from(r.data, 'base64'); |
| 166 | + |
| 167 | + let decompressed; |
| 168 | + try { |
| 169 | + decompressed = zlib.gunzipSync(buffer); |
| 170 | + } catch (e) { |
| 171 | + return Promise.resolve({ |
| 172 | + recordId: r.recordId, |
| 173 | + result: 'ProcessingFailed', |
| 174 | + }); |
| 175 | + } |
| 176 | + |
| 177 | + const data = JSON.parse(decompressed); |
| 178 | + // CONTROL_MESSAGE are sent by CWL to check if the subscription is reachable. |
| 179 | + // They do not contain actual data. |
| 180 | + if (data.messageType === 'CONTROL_MESSAGE') { |
| 181 | + return Promise.resolve({ |
| 182 | + recordId: r.recordId, |
| 183 | + result: 'Dropped', |
| 184 | + }); |
| 185 | + } else if (data.messageType === 'DATA_MESSAGE') { |
| 186 | + const promises = data.logEvents.map(transformLogEvent); |
| 187 | + return Promise.all(promises) |
| 188 | + .then(transformed => { |
| 189 | + const payload = transformed.reduce((a, v) => a + v, ''); |
| 190 | + const encoded = Buffer.from(payload).toString('base64'); |
| 191 | + return { |
| 192 | + recordId: r.recordId, |
| 193 | + result: 'Ok', |
| 194 | + data: encoded, |
| 195 | + }; |
| 196 | + }); |
| 197 | + } else { |
| 198 | + return Promise.resolve({ |
| 199 | + recordId: r.recordId, |
| 200 | + result: 'ProcessingFailed', |
| 201 | + }); |
| 202 | + } |
| 203 | + })).then(recs => { |
| 204 | + const isSas = Object.prototype.hasOwnProperty.call(event, 'sourceKinesisStreamArn'); |
| 205 | + const streamARN = isSas ? event.sourceKinesisStreamArn : event.deliveryStreamArn; |
| 206 | + const region = streamARN.split(':')[3]; |
| 207 | + const streamName = streamARN.split('/')[1]; |
| 208 | + const result = { records: recs }; |
| 209 | + let recordsToReingest = []; |
| 210 | + const putRecordBatches = []; |
| 211 | + let totalRecordsToBeReingested = 0; |
| 212 | + const inputDataByRecId = {}; |
| 213 | + event.records.forEach(r => inputDataByRecId[r.recordId] = createReingestionRecord(isSas, r)); |
| 214 | + |
| 215 | + let projectedSize = recs.filter(rec => rec.result === 'Ok') |
| 216 | + .map(r => r.recordId.length + r.data.length) |
| 217 | + .reduce((a, b) => a + b, 0); |
| 218 | + // 6000000 instead of 6291456 to leave ample headroom for the stuff we didn't account for |
| 219 | + for (let idx = 0; idx < event.records.length && projectedSize > 6000000; idx++) { |
| 220 | + const rec = result.records[idx]; |
| 221 | + if (rec.result === 'Ok') { |
| 222 | + totalRecordsToBeReingested++; |
| 223 | + recordsToReingest.push(getReingestionRecord(isSas, inputDataByRecId[rec.recordId])); |
| 224 | + projectedSize -= rec.data.length; |
| 225 | + delete rec.data; |
| 226 | + result.records[idx].result = 'Dropped'; |
| 227 | + |
| 228 | + // split out the record batches into multiple groups, 500 records at max per group |
| 229 | + if (recordsToReingest.length === 500) { |
| 230 | + putRecordBatches.push(recordsToReingest); |
| 231 | + recordsToReingest = []; |
| 232 | + } |
| 233 | + } |
| 234 | + } |
| 235 | + |
| 236 | + if (recordsToReingest.length > 0) { |
| 237 | + // add the last batch |
| 238 | + putRecordBatches.push(recordsToReingest); |
| 239 | + } |
| 240 | + |
| 241 | + if (putRecordBatches.length > 0) { |
| 242 | + new Promise((resolve, reject) => { |
| 243 | + let recordsReingestedSoFar = 0; |
| 244 | + for (let idx = 0; idx < putRecordBatches.length; idx++) { |
| 245 | + const recordBatch = putRecordBatches[idx]; |
| 246 | + if (isSas) { |
| 247 | + const client = new AWS.Kinesis({ region: region }); |
| 248 | + putRecordsToKinesisStream(streamName, recordBatch, client, resolve, reject, 0, 20); |
| 249 | + } else { |
| 250 | + const client = new AWS.Firehose({ region: region }); |
| 251 | + putRecordsToFirehoseStream(streamName, recordBatch, client, resolve, reject, 0, 20); |
| 252 | + } |
| 253 | + recordsReingestedSoFar += recordBatch.length; |
| 254 | + console.log('Reingested %s/%s records out of %s in to %s stream', recordsReingestedSoFar, totalRecordsToBeReingested, event.records.length, streamName); |
| 255 | + } |
| 256 | + }).then( |
| 257 | + () => { |
| 258 | + console.log('Reingested all %s records out of %s in to %s stream', totalRecordsToBeReingested, event.records.length, streamName); |
| 259 | + callback(null, result); |
| 260 | + }, |
| 261 | + failed => { |
| 262 | + console.log('Failed to reingest records. %s', failed); |
| 263 | + callback(failed, null); |
| 264 | + }); |
| 265 | + } else { |
| 266 | + console.log('No records needed to be reingested.'); |
| 267 | + callback(null, result); |
| 268 | + } |
| 269 | + }).catch(ex => { |
| 270 | + console.log('Error: ', ex); |
| 271 | + callback(ex, null); |
| 272 | + }); |
| 273 | +}; |
0 commit comments