refactor: adding sendWithDelay for fixing rate limitation
This commit is contained in:
100
src/index.js
100
src/index.js
@@ -14,6 +14,61 @@ import { logger, runStep } from "./logger.js";
|
||||
const AVATAR_PREFIX = "avatar";
|
||||
const THUMBNAIL_PREFIX = "lg";
|
||||
const DRY_RUN = process.env.DRY_RUN === "true";
|
||||
const S3_MAX_ATTEMPTS = Number(process.env.S3_MAX_ATTEMPTS ?? 10);
|
||||
const S3_RETRY_MODE = process.env.S3_RETRY_MODE ?? "adaptive";
|
||||
const S3_REQUEST_DELAY_MS = Number(process.env.S3_REQUEST_DELAY_MS ?? 200);
|
||||
const S3_RATE_LIMIT_MAX_RETRIES = Number(
|
||||
process.env.S3_RATE_LIMIT_MAX_RETRIES ?? 5,
|
||||
);
|
||||
const S3_RATE_LIMIT_BASE_DELAY_MS = Number(
|
||||
process.env.S3_RATE_LIMIT_BASE_DELAY_MS ?? 1000,
|
||||
);
|
||||
|
||||
function sleep(ms) {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
function getHttpStatusCode(error) {
|
||||
if (
|
||||
typeof error === "object" &&
|
||||
error !== null &&
|
||||
"$metadata" in error &&
|
||||
typeof error.$metadata === "object" &&
|
||||
error.$metadata !== null &&
|
||||
"httpStatusCode" in error.$metadata
|
||||
) {
|
||||
return error.$metadata.httpStatusCode;
|
||||
}
|
||||
|
||||
return undefined;
|
||||
}
|
||||
|
||||
async function withRateLimitRetry(fn, step) {
|
||||
for (let attempt = 0; ; attempt++) {
|
||||
try {
|
||||
return await fn();
|
||||
} catch (error) {
|
||||
const statusCode = getHttpStatusCode(error);
|
||||
if (statusCode !== 429 || attempt >= S3_RATE_LIMIT_MAX_RETRIES) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
const delayMs =
|
||||
S3_RATE_LIMIT_BASE_DELAY_MS * 2 ** attempt +
|
||||
Math.floor(Math.random() * 500);
|
||||
logger.warn(step, "rate limited (429), retrying", {
|
||||
attempt: attempt + 1,
|
||||
maxRetries: S3_RATE_LIMIT_MAX_RETRIES,
|
||||
delayMs,
|
||||
});
|
||||
await sleep(delayMs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function sendS3Command(client, command, step) {
|
||||
return withRateLimitRetry(() => client.send(command), step);
|
||||
}
|
||||
|
||||
function resolveParsPackBucket(endpoint, bucket) {
|
||||
const match = endpoint?.match(/https?:\/\/(c\d+)\.parspack\.net/i);
|
||||
@@ -77,6 +132,8 @@ function createS3Client(config, label) {
|
||||
endpoint: config.endpoint,
|
||||
region: config.region,
|
||||
bucket: config.bucket,
|
||||
maxAttempts: S3_MAX_ATTEMPTS,
|
||||
retryMode: S3_RETRY_MODE,
|
||||
});
|
||||
|
||||
try {
|
||||
@@ -84,6 +141,8 @@ function createS3Client(config, label) {
|
||||
region: config.region,
|
||||
endpoint: config.endpoint,
|
||||
forcePathStyle: true,
|
||||
maxAttempts: S3_MAX_ATTEMPTS,
|
||||
retryMode: S3_RETRY_MODE,
|
||||
credentials: {
|
||||
accessKeyId: config.accessKeyId,
|
||||
secretAccessKey: config.secretAccessKey,
|
||||
@@ -176,19 +235,15 @@ function resolveAssetSourceKey(asset) {
|
||||
async function objectExists(client, bucket, key) {
|
||||
const step = `objectExists:${bucket}/${key}`;
|
||||
try {
|
||||
await client.send(new HeadObjectCommand({ Bucket: bucket, Key: key }));
|
||||
await sendS3Command(
|
||||
client,
|
||||
new HeadObjectCommand({ Bucket: bucket, Key: key }),
|
||||
step,
|
||||
);
|
||||
logger.debug(step, "object exists");
|
||||
return true;
|
||||
} catch (error) {
|
||||
const statusCode =
|
||||
typeof error === "object" &&
|
||||
error !== null &&
|
||||
"$metadata" in error &&
|
||||
typeof error.$metadata === "object" &&
|
||||
error.$metadata !== null &&
|
||||
"httpStatusCode" in error.$metadata
|
||||
? error.$metadata.httpStatusCode
|
||||
: undefined;
|
||||
const statusCode = getHttpStatusCode(error);
|
||||
|
||||
if (statusCode === 404) {
|
||||
logger.debug(step, "object not found");
|
||||
@@ -211,11 +266,13 @@ async function copyFileCrossClient(
|
||||
const step = `copyFileCrossClient:${sourceBucket}/${sourceKey}->${destBucket}/${destKey}`;
|
||||
try {
|
||||
logger.debug(step, "fetching source object");
|
||||
const response = await sourceClient.send(
|
||||
const response = await sendS3Command(
|
||||
sourceClient,
|
||||
new GetObjectCommand({
|
||||
Bucket: sourceBucket,
|
||||
Key: sourceKey,
|
||||
}),
|
||||
step,
|
||||
);
|
||||
|
||||
if (!response.Body) {
|
||||
@@ -227,7 +284,8 @@ async function copyFileCrossClient(
|
||||
contentLength: response.ContentLength,
|
||||
});
|
||||
|
||||
await destClient.send(
|
||||
await sendS3Command(
|
||||
destClient,
|
||||
new PutObjectCommand({
|
||||
Bucket: destBucket,
|
||||
Key: destKey,
|
||||
@@ -238,6 +296,7 @@ async function copyFileCrossClient(
|
||||
: {}),
|
||||
...(response.Metadata ? { Metadata: response.Metadata } : {}),
|
||||
}),
|
||||
step,
|
||||
);
|
||||
|
||||
logger.info(step, "copy completed");
|
||||
@@ -299,11 +358,13 @@ async function copyIfNeeded(
|
||||
);
|
||||
} else {
|
||||
logger.debug(step, "creating directory marker");
|
||||
await destClient.send(
|
||||
await sendS3Command(
|
||||
destClient,
|
||||
new PutObjectCommand({
|
||||
Bucket: destBucket,
|
||||
Key: destKey,
|
||||
}),
|
||||
step,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -434,6 +495,10 @@ async function copyBrandAssets(sourceClient, destClient, brands, stats) {
|
||||
stats.errors++;
|
||||
logger.error(assetStep, "asset copy failed", logger.formatError(error));
|
||||
}
|
||||
|
||||
if (S3_REQUEST_DELAY_MS > 0) {
|
||||
await sleep(S3_REQUEST_DELAY_MS);
|
||||
}
|
||||
}
|
||||
|
||||
logger.info(brandStep, "brand assets completed");
|
||||
@@ -490,7 +555,14 @@ async function destroyDataSource() {
|
||||
|
||||
async function main() {
|
||||
const step = "main";
|
||||
logger.info(step, "migration started", { dryRun: DRY_RUN });
|
||||
logger.info(step, "migration started", {
|
||||
dryRun: DRY_RUN,
|
||||
s3MaxAttempts: S3_MAX_ATTEMPTS,
|
||||
s3RetryMode: S3_RETRY_MODE,
|
||||
s3RequestDelayMs: S3_REQUEST_DELAY_MS,
|
||||
s3RateLimitMaxRetries: S3_RATE_LIMIT_MAX_RETRIES,
|
||||
s3RateLimitBaseDelayMs: S3_RATE_LIMIT_BASE_DELAY_MS,
|
||||
});
|
||||
|
||||
const stats = {
|
||||
foldersCreated: 0,
|
||||
|
||||
Reference in New Issue
Block a user