diff --git a/src/index.ts b/src/index.ts index 9fe99a4..7f6bd47 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,14 +1,13 @@ import { swaggerUI } from "@hono/swagger-ui"; import { OpenAPIHono } from "@hono/zod-openapi"; - -import { maybeUpdateLastConnectedAt } from "~/controllers/maybeUpdateLastConnectedAt"; -import type { QueueName } from "~/libs/tasks/queueName.ts"; +import { Duration, type DurationLike } from "luxon"; import { onNewEpisode } from "~/controllers/internal/new-episode"; +import { maybeUpdateLastConnectedAt } from "~/controllers/maybeUpdateLastConnectedAt"; import { AnilistUpdateType } from "~/libs/anilist/updateType"; -import type { QueueBody } from "~/libs/tasks/queueTask"; import { calculateExponentialBackoff } from "~/libs/calculateExponentialBackoff"; -import { Duration, type DurationLike } from "luxon"; +import type { QueueName } from "~/libs/tasks/queueName.ts"; +import type { QueueBody } from "~/libs/tasks/queueTask"; export const app = new OpenAPIHono<{ Bindings: Env }>(); @@ -75,50 +74,81 @@ app.get("/docs", swaggerUI({ url: "/openapi.json" })); export default { fetch: app.fetch, async queue(batch) { - switch (batch.queue as QueueName) { - case "ANILIST_UPDATES": - for (const message of ( - batch as MessageBatch - ).messages) { - switch (message.body.updateType) { + onMessageQueue(batch, async (message, queueName) => { + switch (queueName) { + case "ANILIST_UPDATES": + const anilistUpdateBody = + message.body as QueueBody["ANILIST_UPDATES"]; + switch (anilistUpdateBody.updateType) { case AnilistUpdateType.UpdateWatchStatus: - if (!message.body[AnilistUpdateType.UpdateWatchStatus]) { - throw new Error( + if (!anilistUpdateBody[AnilistUpdateType.UpdateWatchStatus]) { + console.error( `Discarding update, unknown body ${JSON.stringify(message.body)}`, ); + return; } const { updateWatchStatusOnAnilist } = await import("~/controllers/watch-status/anilist"); - const payload = message.body[AnilistUpdateType.UpdateWatchStatus]; + const payload = + anilistUpdateBody[AnilistUpdateType.UpdateWatchStatus]; await updateWatchStatusOnAnilist( payload.titleId, payload.watchStatus, payload.aniListToken, ); break; + default: + throw new Error( + `Unhandled update type: ${anilistUpdateBody.updateType}`, + ); } - - message.ack(); - } - break; - case "NEW_EPISODE": - for (const message of (batch as MessageBatch) - .messages) { + break; + case "NEW_EPISODE": + const newEpisodeBody = message.body as QueueBody["NEW_EPISODE"]; await onNewEpisode( - message.body.aniListId, - message.body.episodeNumber, + newEpisodeBody.aniListId, + newEpisodeBody.episodeNumber, ); - message.ack(); - } - break; - } + break; + default: + throw new Error(`Unhandled queue name: ${queueName}`); + } + }); }, async scheduled(event, env, ctx) { const { processDelayedTasks } = await import("~/libs/tasks/processDelayedTasks"); - await processDelayedTasks(env, ctx); + await processDelayedTasks(env); }, } satisfies ExportedHandler; +const retryDelayConfig: Record = { + ANILIST_UPDATES: Duration.fromObject({ minutes: 1 }), + NEW_EPISODE: Duration.fromObject({ hours: 1 }), +}; + +function onMessageQueue( + messageBatch: MessageBatch, + callback: (message: Message, queueName: QN) => void, +) { + for (const message of messageBatch.messages) { + try { + callback(message as Message, messageBatch.queue as QN); + message.ack(); + } catch (error) { + console.error( + `Failed to process message ${message.id} for queue ${messageBatch.queue} with body ${JSON.stringify(message.body)}`, + ); + console.error(error); + message.retry({ + delaySeconds: calculateExponentialBackoff({ + attempt: message.attempts, + baseMin: retryDelayConfig[messageBatch.queue as QN], + }), + }); + } + } +} + export { AnilistDurableObject as AnilistDo } from "~/libs/anilist/anilist-do.ts";