diff --git a/src/index.js b/src/index.js index dcd77cf..27e164b 100644 --- a/src/index.js +++ b/src/index.js @@ -14,6 +14,61 @@ import { logger, runStep } from "./logger.js"; const AVATAR_PREFIX = "avatar"; const THUMBNAIL_PREFIX = "lg"; const DRY_RUN = process.env.DRY_RUN === "true"; +const S3_MAX_ATTEMPTS = Number(process.env.S3_MAX_ATTEMPTS ?? 10); +const S3_RETRY_MODE = process.env.S3_RETRY_MODE ?? "adaptive"; +const S3_REQUEST_DELAY_MS = Number(process.env.S3_REQUEST_DELAY_MS ?? 200); +const S3_RATE_LIMIT_MAX_RETRIES = Number( + process.env.S3_RATE_LIMIT_MAX_RETRIES ?? 5, +); +const S3_RATE_LIMIT_BASE_DELAY_MS = Number( + process.env.S3_RATE_LIMIT_BASE_DELAY_MS ?? 1000, +); + +function sleep(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +function getHttpStatusCode(error) { + if ( + typeof error === "object" && + error !== null && + "$metadata" in error && + typeof error.$metadata === "object" && + error.$metadata !== null && + "httpStatusCode" in error.$metadata + ) { + return error.$metadata.httpStatusCode; + } + + return undefined; +} + +async function withRateLimitRetry(fn, step) { + for (let attempt = 0; ; attempt++) { + try { + return await fn(); + } catch (error) { + const statusCode = getHttpStatusCode(error); + if (statusCode !== 429 || attempt >= S3_RATE_LIMIT_MAX_RETRIES) { + throw error; + } + + const delayMs = + S3_RATE_LIMIT_BASE_DELAY_MS * 2 ** attempt + + Math.floor(Math.random() * 500); + logger.warn(step, "rate limited (429), retrying", { + attempt: attempt + 1, + maxRetries: S3_RATE_LIMIT_MAX_RETRIES, + delayMs, + }); + await sleep(delayMs); + } + } +} + +async function sendS3Command(client, command, step) { + return withRateLimitRetry(() => client.send(command), step); +} function resolveParsPackBucket(endpoint, bucket) { const match = endpoint?.match(/https?:\/\/(c\d+)\.parspack\.net/i); @@ -77,6 +132,8 @@ function createS3Client(config, label) { endpoint: config.endpoint, region: config.region, bucket: config.bucket, + maxAttempts: S3_MAX_ATTEMPTS, + retryMode: S3_RETRY_MODE, }); try { @@ -84,6 +141,8 @@ function createS3Client(config, label) { region: config.region, endpoint: config.endpoint, forcePathStyle: true, + maxAttempts: S3_MAX_ATTEMPTS, + retryMode: S3_RETRY_MODE, credentials: { accessKeyId: config.accessKeyId, secretAccessKey: config.secretAccessKey, @@ -176,19 +235,15 @@ function resolveAssetSourceKey(asset) { async function objectExists(client, bucket, key) { const step = `objectExists:${bucket}/${key}`; try { - await client.send(new HeadObjectCommand({ Bucket: bucket, Key: key })); + await sendS3Command( + client, + new HeadObjectCommand({ Bucket: bucket, Key: key }), + step, + ); logger.debug(step, "object exists"); return true; } catch (error) { - const statusCode = - typeof error === "object" && - error !== null && - "$metadata" in error && - typeof error.$metadata === "object" && - error.$metadata !== null && - "httpStatusCode" in error.$metadata - ? error.$metadata.httpStatusCode - : undefined; + const statusCode = getHttpStatusCode(error); if (statusCode === 404) { logger.debug(step, "object not found"); @@ -211,11 +266,13 @@ async function copyFileCrossClient( const step = `copyFileCrossClient:${sourceBucket}/${sourceKey}->${destBucket}/${destKey}`; try { logger.debug(step, "fetching source object"); - const response = await sourceClient.send( + const response = await sendS3Command( + sourceClient, new GetObjectCommand({ Bucket: sourceBucket, Key: sourceKey, }), + step, ); if (!response.Body) { @@ -227,7 +284,8 @@ async function copyFileCrossClient( contentLength: response.ContentLength, }); - await destClient.send( + await sendS3Command( + destClient, new PutObjectCommand({ Bucket: destBucket, Key: destKey, @@ -238,6 +296,7 @@ async function copyFileCrossClient( : {}), ...(response.Metadata ? { Metadata: response.Metadata } : {}), }), + step, ); logger.info(step, "copy completed"); @@ -299,11 +358,13 @@ async function copyIfNeeded( ); } else { logger.debug(step, "creating directory marker"); - await destClient.send( + await sendS3Command( + destClient, new PutObjectCommand({ Bucket: destBucket, Key: destKey, }), + step, ); } @@ -434,6 +495,10 @@ async function copyBrandAssets(sourceClient, destClient, brands, stats) { stats.errors++; logger.error(assetStep, "asset copy failed", logger.formatError(error)); } + + if (S3_REQUEST_DELAY_MS > 0) { + await sleep(S3_REQUEST_DELAY_MS); + } } logger.info(brandStep, "brand assets completed"); @@ -490,7 +555,14 @@ async function destroyDataSource() { async function main() { const step = "main"; - logger.info(step, "migration started", { dryRun: DRY_RUN }); + logger.info(step, "migration started", { + dryRun: DRY_RUN, + s3MaxAttempts: S3_MAX_ATTEMPTS, + s3RetryMode: S3_RETRY_MODE, + s3RequestDelayMs: S3_REQUEST_DELAY_MS, + s3RateLimitMaxRetries: S3_RATE_LIMIT_MAX_RETRIES, + s3RateLimitBaseDelayMs: S3_RATE_LIMIT_BASE_DELAY_MS, + }); const stats = { foldersCreated: 0,