Skip to content

Commit 20be7ec

Browse files
author
utkarsh
committed
function to ingest data from file trigger
1 parent 18c3eb8 commit 20be7ec

File tree

1 file changed

+15
-9
lines changed

1 file changed

+15
-9
lines changed

functions/gcs/index.js

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ function createTable(tableId) {
9292
})
9393
})
9494
}
95-
function insertRowsAsStream(tableId, rows) {
95+
function insertRowsAsStream(tableId, rows, cb, event) {
9696

9797
createTable(tableId).then(function () {
9898
const bigquery = new BigQuery({
@@ -106,7 +106,8 @@ function insertRowsAsStream(tableId, rows) {
106106
.insert(rows)
107107
.then(() => {
108108
console.log(`Inserted ${rows.length} rows`);
109-
// cb(null, "done");
109+
110+
110111
})
111112
.catch(err => {
112113
if (err && err.name === 'PartialFailureError') {
@@ -116,9 +117,11 @@ function insertRowsAsStream(tableId, rows) {
116117
}
117118
} else {
118119
console.error('ERROR:', err);
119-
// cb(err, "error");
120+
cb(err, "error");
120121
}
121122
});
123+
}).catch(function (error) {
124+
cb(error, "error");
122125
})
123126

124127
// [END bigquery_table_insert_rows]
@@ -155,7 +158,7 @@ function getCDCCalorieRequirements() {
155158
});
156159
}
157160

158-
function mapValueWithRecommendation(data, cdcRecoArray) {
161+
function mapValueWithRecommendation(data, cdcRecoArray, cb, event) {
159162
let sleepRecommendations = cdcRecoArray[1];
160163
let calorieRecommendations = cdcRecoArray[0];
161164
let sleepReco = null;
@@ -184,8 +187,10 @@ function mapValueWithRecommendation(data, cdcRecoArray) {
184187
});
185188
let groupedRows = _.chain(result).groupBy('Member_ID').map(function (value, key) {
186189
let tableId = 'Member_' + key;
187-
insertRowsAsStream(tableId, value);
190+
insertRowsAsStream(tableId, value, cb);
188191
});
192+
stotrage.bucket(event.data.bucket).file(event.data.name).delete();
193+
cb(null, "DOne");
189194

190195
return result;
191196
}
@@ -202,7 +207,7 @@ function mapValueWithRecommendation(data, cdcRecoArray) {
202207
* @param {string} event.data.name Name of a file in the Cloud Storage bucket.
203208
* @param {function} callback The callback function.
204209
*/
205-
exports.cdcRecommendation = (event, googleCb) => {
210+
exports.cdcRecommendation = (event, cb) => {
206211
// const file = event.data;
207212

208213
if (file.resourceState === 'not_exists') {
@@ -220,13 +225,14 @@ exports.cdcRecommendation = (event, googleCb) => {
220225
lookupPromise.then(function (resultArray) {
221226
let parser = parse({ columns: true, cast: true }, function (err, data) {
222227
// console.log(data);
223-
googleCb(null, mapValueWithRecommendation(data, resultArray));
224-
stotrage.bucket(event.data.bucket).file(event.data.name).delete();
228+
let result = mapValueWithRecommendation(data, resultArray, cb, event);
229+
230+
225231
});
226232
uploadFile.pipe(parser);
227233

228234
}).catch(function (error) {
229-
googleCb(error, 'error');
235+
cb(error, 'error');
230236
// throw new Error(error);
231237
});
232238
};

0 commit comments

Comments
 (0)