diff --git a/src/index.ts b/src/index.ts index 7a4082e..60dd6c3 100644 --- a/src/index.ts +++ b/src/index.ts @@ -7,7 +7,10 @@ import { maybeUpdateLastConnectedAt } from "~/controllers/maybeUpdateLastConnect import { AnilistUpdateType } from "~/libs/anilist/updateType"; import { calculateExponentialBackoff } from "~/libs/calculateExponentialBackoff"; import type { QueueName } from "~/libs/tasks/queueName.ts"; -import type { QueueBody } from "~/libs/tasks/queueTask"; +import { + MAX_QUEUE_DELAY_SECONDS, + type QueueBody, +} from "~/libs/tasks/queueTask"; export const app = new OpenAPIHono<{ Bindings: Env }>(); @@ -146,11 +149,14 @@ function onMessageQueue( ); console.error(error); message.retry({ - delaySeconds: calculateExponentialBackoff({ - attempt: message.attempts, - baseMin: retryDelayConfig[messageBatch.queue as QN]?.min, - absCap: retryDelayConfig[messageBatch.queue as QN]?.max, - }), + delaySeconds: Math.min( + calculateExponentialBackoff({ + attempt: message.attempts, + baseMin: retryDelayConfig[messageBatch.queue as QN]?.min, + absCap: retryDelayConfig[messageBatch.queue as QN]?.max, + }), + MAX_QUEUE_DELAY_SECONDS, + ), }); } } diff --git a/src/libs/tasks/processDelayedTasks.ts b/src/libs/tasks/processDelayedTasks.ts index a928928..9a66331 100644 --- a/src/libs/tasks/processDelayedTasks.ts +++ b/src/libs/tasks/processDelayedTasks.ts @@ -2,7 +2,7 @@ import { DateTime } from "luxon"; import type { DelayedTaskMetadata } from "./delayedTask"; import { deserializeDelayedTask } from "./delayedTask"; -import { MAX_DELAY_SECONDS, queueTask } from "./queueTask"; +import { MAX_QUEUE_DELAY_SECONDS, queueTask } from "./queueTask"; const RETRY_ALERT_THRESHOLD = 3; @@ -27,7 +27,7 @@ export async function processDelayedTasks(env: Cloudflare.Env): Promise { console.log(`Found ${keys.length} delayed tasks to check`); const currentTime = Math.floor(Date.now() / 1000); - const maxQueueTime = currentTime + MAX_DELAY_SECONDS; + const maxQueueTime = currentTime + MAX_QUEUE_DELAY_SECONDS; let processedCount = 0; let queuedCount = 0; diff --git a/src/libs/tasks/queueTask.spec.ts b/src/libs/tasks/queueTask.spec.ts index f893af6..da3cc87 100644 --- a/src/libs/tasks/queueTask.spec.ts +++ b/src/libs/tasks/queueTask.spec.ts @@ -81,8 +81,8 @@ describe("queueTask - delayed task handling", () => { }); }); - describe("tasks with delay > 9 hours", () => { - it("stores task in KV when delay exceeds 9 hours", async () => { + describe("tasks with delay > 12 hours", () => { + it("stores task in KV when delay exceeds 12 hours", async () => { await queueTask( "NEW_EPISODE", { aniListId: 111, episodeNumber: 4 }, @@ -98,12 +98,12 @@ describe("queueTask - delayed task handling", () => { expect(queueSendSpy).not.toHaveBeenCalled(); }); - it("stores task in KV when delay is 9 hours + 1 second", async () => { + it("stores task in KV when delay is 12 hours + 1 second", async () => { await queueTask( "NEW_EPISODE", { aniListId: 222, episodeNumber: 5 }, { - scheduleConfig: { delay: { hours: 9, seconds: 1 } }, + scheduleConfig: { delay: { hours: 12, seconds: 1 } }, env: mockEnv, }, ); @@ -176,7 +176,7 @@ describe("queueTask - delayed task handling", () => { }); describe("epoch time scheduling", () => { - it("queues directly when epoch time is within 9 hours", async () => { + it("queues directly when epoch time is within 12 hours", async () => { const futureTime = Math.floor(Date.now() / 1000) + 3600; // 1 hour from now await queueTask( @@ -192,7 +192,7 @@ describe("queueTask - delayed task handling", () => { expect(kvPutSpy).not.toHaveBeenCalled(); }); - it("stores in KV when epoch time is beyond 9 hours", async () => { + it("stores in KV when epoch time is beyond 12 hours", async () => { const futureTime = Math.floor(Date.now() / 1000) + 24 * 3600; // 24 hours from now await queueTask( diff --git a/src/libs/tasks/queueTask.ts b/src/libs/tasks/queueTask.ts index 1a6baca..cb362b6 100644 --- a/src/libs/tasks/queueTask.ts +++ b/src/libs/tasks/queueTask.ts @@ -30,7 +30,7 @@ interface QueueTaskOptionalArgs { env?: Cloudflare.Env; } -export const MAX_DELAY_SECONDS = Duration.fromObject({ hours: 9 }).as( +export const MAX_QUEUE_DELAY_SECONDS = Duration.fromObject({ hours: 12 }).as( "seconds", ); @@ -46,8 +46,8 @@ export async function queueTask( req?.header(), ); - // If delay exceeds 9 hours, store in KV for later processing - if (scheduleTime > MAX_DELAY_SECONDS) { + // If delay exceeds 12 hours, store in KV for later processing + if (scheduleTime > MAX_QUEUE_DELAY_SECONDS) { if (!env || !env.DELAYED_TASKS) { throw new Error("DELAYED_TASKS KV namespace not available"); } @@ -132,7 +132,9 @@ function buildTask( scheduleTime = Duration.fromDurationLike(delay).as("second"); } } - const authorizationHeader = headers?.["X-Anilist-Token"] ? { Authorization: `Bearer ${headers["X-Anilist-Token"]}` } : {}; + const authorizationHeader = headers?.["X-Anilist-Token"] + ? { Authorization: `Bearer ${headers["X-Anilist-Token"]}` } + : {}; switch (queueName) { case "ANILIST_UPDATES": diff --git a/wrangler.toml b/wrangler.toml index 36ca112..305999e 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -67,7 +67,7 @@ id = "c8db249d8ee7462b91f9c374321776e4" preview_id = "ff38240eb2aa4b1388c705f4974f5aec" [triggers] -crons = ["0 */9 * * *"] +crons = ["0 */12 * * *"] [[d1_databases]] binding = "DB"