Skip to content

Commit 9cc6583

Browse files
authored
feat(FIR-42859): async query execution (#130)
1 parent 71bd51c commit 9cc6583

File tree

12 files changed

+600
-40
lines changed

12 files changed

+600
-40
lines changed

README.md

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ console.log(rows)
7878
* <a href="#fetch-result">Fetch result</a>
7979
* <a href="#stream-result">Stream result</a>
8080
* <a href="#result-hydration">Result hydration</a>
81+
* <a href="#server-side-async-query-execution">Server-side async queries</a>
82+
* <a href="#execute-async-query">Execute Async Query</a>
83+
* <a href="#check-async-query-status">Check Async Query Status</a>
84+
* <a href="#cancel-async-query">Cancel Async Query</a>
8185
* <a href="#engine-management">Engine management</a>
8286
* <a href="#getbyname">getByName</a>
8387
* <a href="#engine">Engine</a>
@@ -340,6 +344,44 @@ firebolt-sdk maps SQL data types to their corresponding JavaScript equivalents.
340344
| | STRING | String | |
341345
| Date & Time | DATE | Date | |
342346

347+
<a id="Server-side async queries"></a>
348+
## Server-side async query execution
349+
350+
Firebolt supports server-side asynchronous query execution. This feature allows you to run
351+
queries in the background and fetch the results later. This is especially useful for long-running
352+
queries that you don't want to wait for or maintain a persistent connection to the server.
353+
354+
<a id="Execute Async Query"></a>
355+
### Execute Async Query
356+
357+
Executes a query asynchronously. This is useful for long-running queries that you don't want to block the main thread. The resulting statement does not contain data and should only be used to receive an async query token. Token can be saved elsewhere and reused, even on a new connection to check on this query.
358+
359+
```typescript
360+
const statement = await connection.executeAsync(query, executeQueryOptions);
361+
const token = statement.asyncQueryToken; // used to check query status and cancel it
362+
// statement.fetchResult() -- not allowed as there's no result to fetch
363+
```
364+
365+
<a id="Check Async Query Status"></a>
366+
### Check Async Query Status
367+
368+
Checks the status of an asynchronous query. Use this to determine if the query is still running or has completed. `isAsyncQueryRunning` woudl return true or false if the query is running or has finished. `isAsyncQuerySuccessful` would return true if the query has completed successfully, false if it has failed and `undefined` if the query is still running.
369+
370+
```typescript
371+
const token = statement.asyncQueryToken; // can only be fetched for async query
372+
const isRunning = await connection.isAsyncQueryRunning(token);
373+
const isSuccessful = await connection.isAsyncQuerySuccessful(token);
374+
```
375+
376+
<a id="Cancel Async Query"></a>
377+
### Cancel Async Query
378+
379+
Cancels a running asynchronous query. Use this if you need to stop a long-running query, if its execution is no longer needed.
380+
381+
```typescript
382+
const token = statement.asyncQueryToken; // can only be fetched for async query
383+
await connection.cancelAsyncQuery(token);
384+
```
343385

344386
<a id="engine-management"></a>
345387
### Engine management

src/connection/base.ts

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@ import { generateUserAgent } from "../common/util";
99
import { CompositeError } from "../common/errors";
1010
import JSONbig from "json-bigint";
1111
import { QueryFormatter } from "../formatter/base";
12+
import { AsyncStatement } from "../statement/async";
1213

1314
const defaultQuerySettings = {
1415
output_format: OutputFormat.COMPACT
1516
};
1617

17-
const defaultResponseSettings = {
18+
export const defaultResponseSettings = {
1819
normalizeData: false
1920
};
2021

@@ -160,10 +161,21 @@ export abstract class Connection {
160161
}
161162
}
162163

163-
async execute(
164+
abstract executeAsync(
164165
query: string,
165-
executeQueryOptions: ExecuteQueryOptions = {}
166-
): Promise<Statement> {
166+
executeQueryOptions?: ExecuteQueryOptions
167+
): Promise<AsyncStatement>;
168+
169+
abstract isAsyncQueryRunning(token: string): Promise<boolean>;
170+
171+
abstract isAsyncQuerySuccessful(token: string): Promise<boolean | undefined>;
172+
173+
abstract cancelAsyncQuery(token: string): Promise<void>;
174+
175+
protected async prepareAndExecuteQuery(
176+
query: string,
177+
executeQueryOptions: ExecuteQueryOptions
178+
): Promise<{ formattedQuery: string; response: Response }> {
167179
const { httpClient } = this.context;
168180

169181
executeQueryOptions.response = {
@@ -201,14 +213,8 @@ export abstract class Connection {
201213

202214
try {
203215
const response = await request.ready();
204-
const text = await response.text();
205216
await this.processHeaders(response.headers);
206-
await this.throwErrorIfErrorBody(text, response);
207-
return new Statement(this.context, {
208-
query: formattedQuery,
209-
text,
210-
executeQueryOptions
211-
});
217+
return { formattedQuery, response };
212218
} catch (error) {
213219
// In case it was a set query, remove set parameter if query fails
214220
if (setKey.length > 0) {
@@ -220,7 +226,25 @@ export abstract class Connection {
220226
}
221227
}
222228

223-
private async throwErrorIfErrorBody(text: string, response: Response) {
229+
async execute(
230+
query: string,
231+
executeQueryOptions: ExecuteQueryOptions = {}
232+
): Promise<Statement> {
233+
const { formattedQuery, response } = await this.prepareAndExecuteQuery(
234+
query,
235+
executeQueryOptions
236+
);
237+
238+
const text = await response.text();
239+
await this.throwErrorIfErrorBody(text, response);
240+
return new Statement(this.context, {
241+
query: formattedQuery,
242+
text,
243+
executeQueryOptions
244+
});
245+
}
246+
247+
protected async throwErrorIfErrorBody(text: string, response: Response) {
224248
// Hack, but looks like this is a limitation of the fetch API
225249
// In order to read the body here and elesewhere, we need to clone the response
226250
// since body can only be read once

src/connection/connection_v1.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import { ACCOUNT, ACCOUNT_BY_NAME } from "../common/api";
22
import { Connection as BaseConnection } from "./base";
33
import { ResourceManager } from "../service";
4+
import { ExecuteQueryOptions } from "../types";
5+
import { AsyncStatement } from "../statement/async";
46

57
export interface AccountInfo {
68
id: string;
@@ -63,4 +65,32 @@ export class ConnectionV1 extends BaseConnection {
6365
async testConnection() {
6466
await this.execute("select 1");
6567
}
68+
69+
// Async methods
70+
async isAsyncQueryRunning(token: string): Promise<boolean> {
71+
throw new Error(
72+
"Asynchronous query running check is not supported in this Firebolt version."
73+
);
74+
}
75+
76+
async isAsyncQuerySuccessful(token: string): Promise<boolean | undefined> {
77+
throw new Error(
78+
"Asynchronous query success check is not supported in this Firebolt version."
79+
);
80+
}
81+
82+
async cancelAsyncQuery(token: string): Promise<void> {
83+
throw new Error(
84+
"Asynchronous query cancellation is not supported in this Firebolt version."
85+
);
86+
}
87+
88+
async executeAsync(
89+
query: string,
90+
executeQueryOptions?: ExecuteQueryOptions
91+
): Promise<AsyncStatement> {
92+
throw new Error(
93+
"Asynchronous execution is not supported in this Firebolt version."
94+
);
95+
}
6696
}

src/connection/connection_v2.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ import { ACCOUNT_SYSTEM_ENGINE, QUERY_URL } from "../common/api";
33

44
import { Connection as BaseConnection } from "./base";
55
import { Cache, inMemoryCache, noneCache } from "../common/tokenCache";
6+
import { ExecuteQueryOptions } from "../types";
7+
import { AsyncStatement } from "../statement/async";
68

79
export class ConnectionV2 extends BaseConnection {
810
private get account(): string {
@@ -70,6 +72,72 @@ export class ConnectionV2 extends BaseConnection {
7072
return this.engineEndpoint;
7173
}
7274

75+
// Async methods
76+
async executeAsync(
77+
query: string,
78+
executeQueryOptions: ExecuteQueryOptions = {}
79+
): Promise<AsyncStatement> {
80+
const asyncExecuteQueryOptions = {
81+
...executeQueryOptions,
82+
settings: {
83+
...executeQueryOptions.settings,
84+
async: true
85+
}
86+
};
87+
88+
if (this.queryFormatter.isSetStatement(query)) {
89+
// can't have an async set query
90+
throw new Error("SET statements cannot be executed asynchronously.");
91+
}
92+
const { formattedQuery, response } = await this.prepareAndExecuteQuery(
93+
query,
94+
asyncExecuteQueryOptions
95+
);
96+
97+
const text = await response.text();
98+
await this.throwErrorIfErrorBody(text, response);
99+
return new AsyncStatement(this.context, {
100+
query: formattedQuery,
101+
text,
102+
executeQueryOptions: asyncExecuteQueryOptions
103+
});
104+
}
105+
106+
private async getAsyncQueryInfo(token: string) {
107+
const query = `CALL fb_GetAsyncStatus('${token}')`;
108+
109+
const statement = await this.execute(query);
110+
const { data, meta } = await statement.fetchResult();
111+
const result: Record<string, any> = {};
112+
if (data.length > 0) {
113+
meta.forEach((field, index) => {
114+
result[field.name] = data[0][index];
115+
});
116+
} else {
117+
throw new Error("No data returned from fb_GetAsyncStatus");
118+
}
119+
return result;
120+
}
121+
122+
async isAsyncQueryRunning(token: string): Promise<boolean> {
123+
const info = await this.getAsyncQueryInfo(token);
124+
return info["status"] === "RUNNING";
125+
}
126+
127+
async isAsyncQuerySuccessful(token: string): Promise<boolean | undefined> {
128+
const info = await this.getAsyncQueryInfo(token);
129+
if (info["status"] === "RUNNING") {
130+
return undefined;
131+
}
132+
return info["status"] === "ENDED_SUCCESSFULLY";
133+
}
134+
135+
async cancelAsyncQuery(token: string): Promise<void> {
136+
const info = await this.getAsyncQueryInfo(token);
137+
const async_query_id = info["query_id"];
138+
this.execute(`CANCEL QUERY WHERE query_id='${async_query_id}'`);
139+
}
140+
73141
async testConnection() {
74142
const settings = { internal: [{ auto_start_stop_control: "ignore" }] };
75143
await this.execute("select 1", { settings });

src/http/node.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ const DEFAULT_ERROR = "Server error";
2626
const DEFAULT_USER_AGENT = systemInfoString();
2727

2828
const PROTOCOL_VERSION_HEADER = "Firebolt-Protocol-Version";
29-
const PROTOCOL_VERSION = "2.1";
29+
const PROTOCOL_VERSION = "2.3";
3030
const createSocket = HttpsAgent.prototype.createSocket;
3131

3232
const agentOptions = {

src/statement/async.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import JSONbig from "json-bigint";
2+
import {
3+
ExecuteQueryOptions,
4+
StreamOptions,
5+
Context,
6+
Statistics
7+
} from "../types";
8+
import { Meta } from "../meta";
9+
10+
export class AsyncStatement {
11+
private readonly asyncToken: string;
12+
private readonly context: Context;
13+
private readonly query: string;
14+
private readonly executeQueryOptions: ExecuteQueryOptions;
15+
private readonly text: string;
16+
17+
constructor(
18+
context: Context,
19+
{
20+
query,
21+
text,
22+
executeQueryOptions
23+
}: {
24+
query: string;
25+
text: string;
26+
executeQueryOptions: ExecuteQueryOptions;
27+
}
28+
) {
29+
this.context = context;
30+
this.text = text;
31+
this.query = query;
32+
this.executeQueryOptions = executeQueryOptions;
33+
this.asyncToken = this.parseResponse(this.text).token;
34+
}
35+
36+
private parseResponse(response: string) {
37+
const parsed = JSONbig.parse(response);
38+
const { token, message, monitorSql } = parsed;
39+
return {
40+
token,
41+
message,
42+
monitorSql
43+
};
44+
}
45+
get asyncQueryToken(): string {
46+
return this.asyncToken;
47+
}
48+
49+
async streamResult(options?: StreamOptions) {
50+
throw new Error("Method not allowed for async statements");
51+
}
52+
53+
async fetchResult(): Promise<{
54+
data: any;
55+
meta: Meta[];
56+
statistics: Statistics | null;
57+
}> {
58+
throw new Error("Method not allowed for async statements");
59+
}
60+
}

src/statement/index.ts

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,15 @@ import {
88
import { Meta } from "../meta";
99
import { isDataQuery } from "../common/util";
1010
import { RowStream } from "./stream/rowStream";
11-
import { JSONStream } from "./stream/jsonStream";
12-
import {
13-
normalizeResponse,
14-
getNormalizedStatistics
15-
} from "./normalizeResponse";
11+
import { normalizeResponse } from "./normalizeResponse";
1612
import { CompositeError } from "../common/errors";
1713

1814
export class Statement {
15+
private rowStream: RowStream;
1916
private context: Context;
2017
private query: string;
2118
private executeQueryOptions: ExecuteQueryOptions;
22-
23-
private text;
24-
private rowStream: RowStream;
19+
private readonly text: string;
2520

2621
constructor(
2722
context: Context,

src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ export type QuerySettings = Record<
3535
> & {
3636
output_format?: OutputFormat;
3737
internal?: Record<string, string | number>[];
38+
async?: boolean;
3839
};
3940

4041
export type RowParser = (row: string, isLastRow: boolean) => any;

0 commit comments

Comments
 (0)