Skip to content

Commit f201cca

Browse files
authored
fix: key value stores emitting an error when multiple write promises ran in parallel (#1460)
1 parent 99d125e commit f201cca

File tree

1 file changed

+32
-21
lines changed

1 file changed

+32
-21
lines changed

packages/memory-storage/src/memory-storage.ts

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
import type * as storage from '@crawlee/types';
33
import type { Dictionary } from '@crawlee/types';
44
import { s } from '@sapphire/shapeshift';
5-
import { ensureDir, pathExistsSync } from 'fs-extra';
5+
import { ensureDirSync, pathExistsSync } from 'fs-extra';
6+
import { renameSync } from 'node:fs';
67
import { rm, rename, readdir } from 'node:fs/promises';
78
import { resolve } from 'node:path';
89
import { DatasetClient } from './resource-clients/dataset';
@@ -114,6 +115,20 @@ export class MemoryStorage implements storage.StorageClient {
114115
* - local directory containing the default request queue.
115116
*/
116117
async purge(): Promise<void> {
118+
// Key-value stores
119+
const keyValueStores = await readdir(this.keyValueStoresDirectory).catch(() => []);
120+
const keyValueStorePromises: Promise<void>[] = [];
121+
122+
for (const keyValueStoreFolder of keyValueStores) {
123+
if (keyValueStoreFolder.startsWith('__CRAWLEE_TEMPORARY') || keyValueStoreFolder.startsWith('__OLD')) {
124+
keyValueStorePromises.push((await this.batchRemoveFiles(resolve(this.keyValueStoresDirectory, keyValueStoreFolder)))());
125+
} else if (keyValueStoreFolder === 'default') {
126+
keyValueStorePromises.push(this.handleDefaultKeyValueStore(resolve(this.keyValueStoresDirectory, keyValueStoreFolder))());
127+
}
128+
}
129+
130+
void Promise.allSettled(keyValueStorePromises);
131+
117132
// Datasets
118133
const datasets = await readdir(this.datasetsDirectory).catch(() => []);
119134
const datasetPromises: Promise<void>[] = [];
@@ -137,20 +152,6 @@ export class MemoryStorage implements storage.StorageClient {
137152
}
138153

139154
void Promise.allSettled(requestQueuePromises);
140-
141-
// Key-value stores
142-
const keyValueStores = await readdir(this.keyValueStoresDirectory).catch(() => []);
143-
const keyValueStorePromises: Promise<void>[] = [];
144-
145-
for (const keyValueStoreFolder of keyValueStores) {
146-
if (keyValueStoreFolder.startsWith('__CRAWLEE_TEMPORARY')) {
147-
keyValueStorePromises.push((await this.batchRemoveFiles(resolve(this.keyValueStoresDirectory, keyValueStoreFolder)))());
148-
} else if (keyValueStoreFolder === 'default') {
149-
keyValueStorePromises.push((await this.handleDefaultKeyValueStore(resolve(this.keyValueStoresDirectory, keyValueStoreFolder)))());
150-
}
151-
}
152-
153-
void Promise.allSettled(keyValueStorePromises);
154155
}
155156

156157
/**
@@ -162,7 +163,7 @@ export class MemoryStorage implements storage.StorageClient {
162163
await Promise.all(promises);
163164
}
164165

165-
private async handleDefaultKeyValueStore(folder: string): Promise<() => Promise<void>> {
166+
private handleDefaultKeyValueStore(folder: string): () => Promise<void> {
166167
const storagePathExists = pathExistsSync(folder);
167168
const temporaryPath = resolve(folder, '../__CRAWLEE_MIGRATING_KEY_VALUE_STORE__');
168169

@@ -176,26 +177,36 @@ export class MemoryStorage implements storage.StorageClient {
176177

177178
if (storagePathExists) {
178179
// Create temporary folder to save important files in
179-
await ensureDir(temporaryPath);
180+
ensureDirSync(temporaryPath);
180181

181182
// Go through each file and save the ones that are important
182183
for (const entity of possibleInputKeys) {
183184
const originalFilePath = resolve(folder, entity);
184185
const tempFilePath = resolve(temporaryPath, entity);
185186

186187
try {
187-
await rename(originalFilePath, tempFilePath);
188+
renameSync(originalFilePath, tempFilePath);
188189
} catch {
189190
// Ignore
190191
}
191192
}
192193

193194
// Remove the original folder and all its content
194-
const tempPathForOldFolder = resolve(folder, '../__OLD_DEFAULT__');
195-
await rename(folder, tempPathForOldFolder);
195+
let counter = 0;
196+
let tempPathForOldFolder = resolve(folder, `../__OLD_DEFAULT_${counter}__`);
197+
let done = false;
198+
199+
while (!done) {
200+
try {
201+
renameSync(folder, tempPathForOldFolder);
202+
done = true;
203+
} catch {
204+
tempPathForOldFolder = resolve(folder, `../__OLD_DEFAULT_${++counter}__`);
205+
}
206+
}
196207

197208
// Replace the temporary folder with the original folder
198-
await rename(temporaryPath, folder);
209+
renameSync(temporaryPath, folder);
199210

200211
// Remove the old folder
201212
return async () => (await this.batchRemoveFiles(tempPathForOldFolder))();

0 commit comments

Comments
 (0)