Skip to content

Commit 6da86a8

Browse files
authored
perf: optimize adding large amount of requests via crawler.addRequests() (#2456)
This PR resolves three main issues with adding large amount of requests into the queue: - Every requests added to the queue was automatically added to the LRU requests cache, which has a size of 1 million items. this makes sense for enqueuing a few items, but if we try to add more than the limit, we end up with overloading the LRU cache for no reason. Now we only add the first 1000 requests to the cache (plus any requests added via separate calls, e.g. when doing `enqueueLinks` from inside a request handler, again with a limit of the first 1000 links). - We used to validate the whole requests array via `ow`, and since the shape can vary, it was very slow (e.g. 20s just for the `ow` validation). Now we use a tailored validation for the array that does the same but resolves within 100ms or so. - We always created the `Request` objects out of everything, which had a significant impact on memory usage. Now we skip this completely and let the objects be created later when needed (when calling `RQ.addRequests()` which only receives the actual batch and not the whole array) Related: https://apify.slack.com/archives/C0L33UM7Z/p1715109984834079
1 parent 3a847f6 commit 6da86a8

File tree

1 file changed

+51
-15
lines changed

1 file changed

+51
-15
lines changed

packages/core/src/storages/request_provider.ts

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { inspect } from 'node:util';
2+
13
import { ListDictionary, LruCache } from '@apify/datastructures';
24
import type { Log } from '@apify/log';
35
import { cryptoRandomObjectId } from '@apify/utilities';
@@ -180,9 +182,10 @@ export abstract class RequestProvider implements IStorage {
180182
ow(requestsLike, ow.array);
181183
ow(options, ow.object.exactShape({
182184
forefront: ow.optional.boolean,
185+
cache: ow.optional.boolean,
183186
}));
184187

185-
const { forefront = false } = options;
188+
const { forefront = false, cache = true } = options;
186189

187190
const uniqueKeyToCacheKey = new Map<string, string>();
188191
const getCachedRequestId = (uniqueKey: string) => {
@@ -253,7 +256,10 @@ export abstract class RequestProvider implements IStorage {
253256
const cacheKey = getCachedRequestId(newRequest.uniqueKey);
254257

255258
const { requestId, wasAlreadyPresent } = newRequest;
256-
this._cacheRequest(cacheKey, newRequest);
259+
260+
if (cache) {
261+
this._cacheRequest(cacheKey, newRequest);
262+
}
257263

258264
if (!wasAlreadyPresent && !this.inProgress.has(requestId) && !this.recentlyHandledRequestsCache.get(requestId)) {
259265
this.assumedTotalCount++;
@@ -278,55 +284,79 @@ export abstract class RequestProvider implements IStorage {
278284
async addRequestsBatched(requests: (string | Source)[], options: AddRequestsBatchedOptions = {}): Promise<AddRequestsBatchedResult> {
279285
checkStorageAccess();
280286

281-
ow(requests, ow.array.ofType(ow.any(
282-
ow.string,
283-
ow.object.partialShape({ url: ow.string, id: ow.undefined }),
284-
ow.object.partialShape({ requestsFromUrl: ow.string, regex: ow.optional.regExp }),
285-
)));
286287
ow(options, ow.object.exactShape({
287288
forefront: ow.optional.boolean,
288289
waitForAllRequestsToBeAdded: ow.optional.boolean,
289290
batchSize: ow.optional.number,
290291
waitBetweenBatchesMillis: ow.optional.number,
291292
}));
292293

294+
// The `requests` array can be huge, and `ow` is very slow for anything more complex.
295+
// This explicit iteration takes a few milliseconds, while the ow check can take tens of seconds.
296+
297+
// ow(requests, ow.array.ofType(ow.any(
298+
// ow.string,
299+
// ow.object.partialShape({ url: ow.string, id: ow.undefined }),
300+
// ow.object.partialShape({ requestsFromUrl: ow.string, regex: ow.optional.regExp }),
301+
// )));
302+
303+
for (const request of requests) {
304+
if (typeof request === 'string') {
305+
continue;
306+
}
307+
308+
if (typeof request === 'object' && request !== null) {
309+
if (typeof request.url === 'string' && typeof request.id === 'undefined') {
310+
continue;
311+
}
312+
313+
if (typeof (request as any).requestsFromUrl === 'string') {
314+
continue;
315+
}
316+
}
317+
318+
// eslint-disable-next-line max-len
319+
throw new Error(`Request options are not valid, provide either a URL or an object with 'url' property (but without 'id' property), or an object with 'requestsFromUrl' property. Input: ${inspect(request)}`);
320+
}
321+
293322
const {
294323
batchSize = 1000,
295324
waitBetweenBatchesMillis = 1000,
296325
} = options;
297-
const builtRequests: Request[] = [];
326+
const sources: Source[] = [];
298327

299328
for (const opts of requests) {
300329
if (opts && typeof opts === 'object' && 'requestsFromUrl' in opts) {
301330
await this.addRequest(opts, { forefront: options.forefront });
302331
} else {
303-
builtRequests.push(new Request(typeof opts === 'string' ? { url: opts } : opts as RequestOptions));
332+
sources.push(typeof opts === 'string' ? { url: opts } : opts as RequestOptions);
304333
}
305334
}
306335

307-
const attemptToAddToQueueAndAddAnyUnprocessed = async (providedRequests: Request[]) => {
336+
const attemptToAddToQueueAndAddAnyUnprocessed = async (providedRequests: Source[], cache = true) => {
308337
const resultsToReturn: ProcessedRequest[] = [];
309-
const apiResult = await this.addRequests(providedRequests, { forefront: options.forefront });
338+
const apiResult = await this.addRequests(providedRequests, { forefront: options.forefront, cache });
310339
resultsToReturn.push(...apiResult.processedRequests);
311340

312341
if (apiResult.unprocessedRequests.length) {
313342
await sleep(waitBetweenBatchesMillis);
314343

315344
resultsToReturn.push(...await attemptToAddToQueueAndAddAnyUnprocessed(
316345
providedRequests.filter((r) => !apiResult.processedRequests.some((pr) => pr.uniqueKey === r.uniqueKey)),
346+
false,
317347
));
318348
}
319349

320350
return resultsToReturn;
321351
};
322352

323-
const initialChunk = builtRequests.splice(0, batchSize);
353+
const initialChunk = sources.splice(0, batchSize);
324354

325355
// Add initial batch of `batchSize` to process them right away
326356
const addedRequests = await attemptToAddToQueueAndAddAnyUnprocessed(initialChunk);
327357

328358
// If we have no more requests to add, return early
329-
if (!builtRequests.length) {
359+
if (!sources.length) {
330360
return {
331361
addedRequests,
332362
waitForAllRequestsToBeAdded: Promise.resolve([]),
@@ -335,11 +365,11 @@ export abstract class RequestProvider implements IStorage {
335365

336366
// eslint-disable-next-line no-async-promise-executor
337367
const promise = new Promise<ProcessedRequest[]>(async (resolve) => {
338-
const chunks = chunk(builtRequests, batchSize);
368+
const chunks = chunk(sources, batchSize);
339369
const finalAddedRequests: ProcessedRequest[] = [];
340370

341371
for (const requestChunk of chunks) {
342-
finalAddedRequests.push(...await attemptToAddToQueueAndAddAnyUnprocessed(requestChunk));
372+
finalAddedRequests.push(...await attemptToAddToQueueAndAddAnyUnprocessed(requestChunk, false));
343373

344374
await sleep(waitBetweenBatchesMillis);
345375
}
@@ -707,6 +737,12 @@ export interface RequestQueueOperationOptions {
707737
* @default false
708738
*/
709739
forefront?: boolean;
740+
/**
741+
* Should the requests be added to the local LRU cache?
742+
* @default false
743+
* @internal
744+
*/
745+
cache?: boolean;
710746
}
711747

712748
/**

0 commit comments

Comments
 (0)