Skip to content

Commit 54ccc50

Browse files
fix: FIR-36189 nest js script stuck when running a query with limit 58 and above (#115)
1 parent ace1eb6 commit 54ccc50

File tree

5 files changed

+73
-125
lines changed

5 files changed

+73
-125
lines changed

src/connection/base.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
import { Statement } from "../statement";
88
import { generateUserAgent } from "../common/util";
99
import { ConnectionError, CompositeError } from "../common/errors";
10+
import JSONbig from "json-bigint";
1011

1112
const defaultQuerySettings = {
1213
output_format: OutputFormat.COMPACT
@@ -206,14 +207,14 @@ export abstract class Connection {
206207

207208
try {
208209
const response = await request.ready();
210+
const text = await response.text();
209211
await this.processHeaders(response.headers);
210-
await this.throwErrorIfErrorBody(response);
211-
const statement = new Statement(this.context, {
212+
await this.throwErrorIfErrorBody(text, response);
213+
return new Statement(this.context, {
212214
query: formattedQuery,
213-
request,
215+
text,
214216
executeQueryOptions
215217
});
216-
return statement;
217218
} catch (error) {
218219
// In case it was a set query, remove set parameter if query fails
219220
if (setKey.length > 0) {
@@ -225,14 +226,13 @@ export abstract class Connection {
225226
}
226227
}
227228

228-
private async throwErrorIfErrorBody(response: Response) {
229+
private async throwErrorIfErrorBody(text: string, response: Response) {
229230
// Hack, but looks like this is a limitation of the fetch API
230231
// In order to read the body here and elesewhere, we need to clone the response
231232
// since body can only be read once
232-
const clonedResponse = response.clone();
233233
let json;
234234
try {
235-
json = await clonedResponse.json();
235+
json = JSONbig.parse(text);
236236
} catch (error) {
237237
// If we can't parse the JSON, we'll have to ignore it
238238
if (this.hasJsonContent(response)) {

src/statement/index.ts

Lines changed: 30 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -13,47 +13,50 @@ import {
1313
normalizeResponse,
1414
getNormalizedStatistics
1515
} from "./normalizeResponse";
16+
import { CompositeError } from "../common/errors";
1617

1718
export class Statement {
1819
private context: Context;
1920
private query: string;
2021
private executeQueryOptions: ExecuteQueryOptions;
2122

22-
private request: { ready: () => Promise<any>; abort: () => void };
23+
private text;
2324
private rowStream: RowStream;
2425

2526
constructor(
2627
context: Context,
2728
{
2829
query,
29-
request,
30+
text,
3031
executeQueryOptions
3132
}: {
3233
query: string;
33-
request: { ready: () => Promise<any>; abort: () => void };
34+
text: string;
3435
executeQueryOptions: ExecuteQueryOptions;
3536
}
3637
) {
3738
this.context = context;
38-
this.request = request;
39+
this.text = text;
3940
this.query = query;
4041
this.executeQueryOptions = executeQueryOptions;
4142
this.rowStream = new RowStream();
4243
}
4344

4445
private parseResponse(response: string) {
4546
const parsed = JSONbig.parse(response);
46-
const { data, meta, statistics } = parsed;
47+
const { data, meta, statistics, errors = undefined } = parsed;
4748
return {
4849
data,
4950
meta,
50-
statistics
51+
statistics,
52+
errors
5153
};
5254
}
5355

5456
private handleParseResponse(response: string) {
57+
let errors, json;
5558
try {
56-
return this.parseResponse(response);
59+
({ errors, ...json } = this.parseResponse(response));
5760
} catch (error) {
5861
const isData = isDataQuery(this.query);
5962
if (isData || (response.length && !isData)) {
@@ -65,111 +68,37 @@ export class Statement {
6568
statistics: null
6669
};
6770
}
71+
if (errors !== undefined) throw new CompositeError(errors);
72+
return json;
6873
}
6974

7075
async streamResult(options?: StreamOptions) {
71-
const response = await this.request.ready();
72-
const jsonStream = new JSONStream({
73-
emitter: this.rowStream,
74-
options,
75-
executeQueryOptions: this.executeQueryOptions
76-
});
77-
78-
let resolveMetadata: (metadata: Meta[]) => void;
79-
let rejectMetadata: (reason?: any) => void;
80-
81-
let resolveStatistics: (statistics: Statistics) => void;
82-
let rejectStatistics: (reason?: any) => void;
83-
84-
const metadataPromise = new Promise<Meta[]>((resolve, reject) => {
85-
resolveMetadata = resolve;
86-
rejectMetadata = reject;
87-
});
88-
89-
const statisticsPromise = new Promise<Statistics>((resolve, reject) => {
90-
resolveStatistics = resolve;
91-
rejectStatistics = reject;
92-
});
93-
94-
let str = Buffer.alloc(0);
95-
96-
this.rowStream.on("metadata", (metadata: Meta[]) => {
97-
resolveMetadata(metadata);
98-
});
99-
100-
this.rowStream.on("statistics", (statistics: Statistics) => {
101-
resolveStatistics(statistics);
102-
});
103-
104-
const errorHandler = (error: any) => {
105-
this.rowStream.emit("error", error);
106-
this.rowStream.push(null);
107-
rejectStatistics(error);
108-
rejectMetadata(error);
109-
};
110-
111-
response.body.on("error", errorHandler);
112-
113-
response.body.on("data", (chunk: Buffer) => {
114-
// content type should be application/json?
115-
// maybe in the future it will change
116-
const contentType = response.headers.get("content-type");
117-
118-
if (chunk.lastIndexOf("\n") !== -1 && str) {
119-
// store in buffer anything after
120-
const newLinePosition = chunk.lastIndexOf("\n");
121-
const rest = chunk.slice(newLinePosition + 1);
122-
123-
const lines = Buffer.concat([str, chunk.slice(0, newLinePosition)])
124-
.toString("utf8")
125-
.split("\n");
126-
try {
127-
for (const line of lines) {
128-
jsonStream.processLine(line);
129-
}
130-
} catch (error) {
131-
errorHandler(error);
132-
return;
133-
}
134-
135-
const result = jsonStream.getResult(0);
136-
// for now only supports single statement sql
137-
for (const row of result.rows) {
138-
this.rowStream.push(row);
139-
}
76+
// Streaming is not supported right now in Firebolt
77+
// This is a placeholder for future implementation
78+
const parsed = this.handleParseResponse(this.text);
79+
const normalized = normalizeResponse(parsed, this.executeQueryOptions);
14080

141-
result.rows = [];
142-
str = rest;
143-
}
144-
});
81+
const { data, meta, statistics } = normalized;
14582

146-
response.body.on("end", () => {
147-
try {
148-
const result = jsonStream.getResult(0);
149-
const statistics = getNormalizedStatistics(result.statistics);
150-
this.rowStream.emit("statistics", statistics);
151-
this.rowStream.push(null);
152-
} catch (error) {
153-
errorHandler(error);
154-
}
155-
});
83+
for (const row of data) {
84+
this.rowStream.push(row);
85+
}
86+
this.rowStream.push(null);
87+
this.rowStream.end();
15688

15789
return {
15890
data: this.rowStream,
159-
meta: metadataPromise,
160-
statistics: statisticsPromise
91+
meta: Promise.resolve(meta),
92+
statistics: Promise.resolve(statistics)
16193
};
16294
}
16395

164-
async fetchResult() {
165-
const response = await this.request.ready();
166-
const text = await response.text();
167-
168-
if (this.executeQueryOptions?.settings?.async_execution) {
169-
return JSONbig.parse(text);
170-
}
171-
172-
const parsed = this.handleParseResponse(text);
96+
async fetchResult(): Promise<{
97+
data: any;
98+
meta: Meta[];
99+
statistics: Statistics | null;
100+
}> {
101+
const parsed = this.handleParseResponse(this.text);
173102
const normalized = normalizeResponse(parsed, this.executeQueryOptions);
174103

175104
const { data, meta, statistics } = normalized;

test/integration/v1/index.test.ts

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -39,19 +39,6 @@ describe("integration test", () => {
3939
const row = data[0];
4040
expect(row).toMatchObject({ "?column?": 1 });
4141
});
42-
it("async query", async () => {
43-
const firebolt = Firebolt({
44-
apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string
45-
});
46-
47-
const connection = await firebolt.connect(connectionParams);
48-
const statement = await connection.execute("SELECT 1", {
49-
settings: { async_execution: true },
50-
response: { normalizeData: false }
51-
});
52-
const result = await statement.fetchResult();
53-
expect(result.query_id).toBeTruthy();
54-
});
5542
it("returns Date type", async () => {
5643
const firebolt = Firebolt({
5744
apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string
@@ -179,7 +166,8 @@ describe("integration test", () => {
179166
await firebolt.testConnection(connectionParams);
180167
expect(true).toBeTruthy();
181168
});
182-
it("custom parser", async () => {
169+
// Since streaming is currently disabled, custom parser is not supported
170+
it.skip("custom parser", async () => {
183171
const firebolt = Firebolt({
184172
apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string
185173
});

test/integration/v2/index.test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,8 @@ describe("integration test", () => {
268268
await firebolt.testConnection(connectionParams);
269269
expect(true).toBeTruthy();
270270
});
271-
it("custom parser", async () => {
271+
// Since streaming is currently disabled, custom parser is not supported
272+
it.skip("custom parser", async () => {
272273
const firebolt = Firebolt({
273274
apiEndpoint: process.env.FIREBOLT_API_ENDPOINT as string
274275
});

test/unit/connection.test.ts

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ describe("Connection", () => {
114114
resetServerHandlers(server);
115115
});
116116

117-
it("throws an error when error body is present", async () => {
117+
it("throws an error when error json is present", async () => {
118118
server.use(
119119
rest.post(`https://some_engine.com`, async (req, res, ctx) => {
120120
const body = await req.text();
@@ -166,7 +166,8 @@ describe("Connection", () => {
166166
INFO: SYNTAX_ERROR - Unexpected character at {"failingLine":42,"startOffset":120,"endOffset":135}`
167167
);
168168
});
169-
it("throws an error when error body is present", async () => {
169+
170+
it("throws an error when error json 2 is present", async () => {
170171
server.use(
171172
rest.post(`https://some_engine.com`, async (req, res, ctx) => {
172173
const body = await req.text();
@@ -524,4 +525,33 @@ INFO: SYNTAX_ERROR - Unexpected character at {"failingLine":42,"startOffset":120
524525
expect(searchParamsUsed.get("param")).toEqual("value");
525526
expect(searchParamsUsed2.get("param")).toEqual(null);
526527
});
528+
529+
it("handles large response bodies correctly", async () => {
530+
const largeResponse = {
531+
data: Array(1000).fill({ one: 1, two: 2, three: 3 })
532+
};
533+
server.use(
534+
rest.post(`https://some_engine.com`, async (req, res, ctx) => {
535+
return res(ctx.json(largeResponse));
536+
})
537+
);
538+
539+
const connectionParams: ConnectionOptions = {
540+
auth: {
541+
client_id: "dummy",
542+
client_secret: "dummy"
543+
},
544+
database: "dummy",
545+
engineName: "dummy",
546+
account: "my_account"
547+
};
548+
const firebolt = Firebolt({
549+
apiEndpoint
550+
});
551+
552+
const connection = await firebolt.connect(connectionParams);
553+
const statement = await connection.execute("SELECT 1");
554+
const { data } = await statement.fetchResult();
555+
expect(data.length).toEqual(1000);
556+
}, 10000);
527557
});

0 commit comments

Comments
 (0)