feat: implement generic queue message processing with retry logic

This commit is contained in:
2025-12-17 07:52:48 -05:00
parent 243c279ca9
commit 6f795bdde0

View File

@@ -1,14 +1,13 @@
import { swaggerUI } from "@hono/swagger-ui"; import { swaggerUI } from "@hono/swagger-ui";
import { OpenAPIHono } from "@hono/zod-openapi"; import { OpenAPIHono } from "@hono/zod-openapi";
import { Duration, type DurationLike } from "luxon";
import { maybeUpdateLastConnectedAt } from "~/controllers/maybeUpdateLastConnectedAt";
import type { QueueName } from "~/libs/tasks/queueName.ts";
import { onNewEpisode } from "~/controllers/internal/new-episode"; import { onNewEpisode } from "~/controllers/internal/new-episode";
import { maybeUpdateLastConnectedAt } from "~/controllers/maybeUpdateLastConnectedAt";
import { AnilistUpdateType } from "~/libs/anilist/updateType"; import { AnilistUpdateType } from "~/libs/anilist/updateType";
import type { QueueBody } from "~/libs/tasks/queueTask";
import { calculateExponentialBackoff } from "~/libs/calculateExponentialBackoff"; import { calculateExponentialBackoff } from "~/libs/calculateExponentialBackoff";
import { Duration, type DurationLike } from "luxon"; import type { QueueName } from "~/libs/tasks/queueName.ts";
import type { QueueBody } from "~/libs/tasks/queueTask";
export const app = new OpenAPIHono<{ Bindings: Env }>(); export const app = new OpenAPIHono<{ Bindings: Env }>();
@@ -75,50 +74,81 @@ app.get("/docs", swaggerUI({ url: "/openapi.json" }));
export default { export default {
fetch: app.fetch, fetch: app.fetch,
async queue(batch) { async queue(batch) {
switch (batch.queue as QueueName) { onMessageQueue(batch, async (message, queueName) => {
case "ANILIST_UPDATES": switch (queueName) {
for (const message of ( case "ANILIST_UPDATES":
batch as MessageBatch<QueueBody["ANILIST_UPDATES"]> const anilistUpdateBody =
).messages) { message.body as QueueBody["ANILIST_UPDATES"];
switch (message.body.updateType) { switch (anilistUpdateBody.updateType) {
case AnilistUpdateType.UpdateWatchStatus: case AnilistUpdateType.UpdateWatchStatus:
if (!message.body[AnilistUpdateType.UpdateWatchStatus]) { if (!anilistUpdateBody[AnilistUpdateType.UpdateWatchStatus]) {
throw new Error( console.error(
`Discarding update, unknown body ${JSON.stringify(message.body)}`, `Discarding update, unknown body ${JSON.stringify(message.body)}`,
); );
return;
} }
const { updateWatchStatusOnAnilist } = const { updateWatchStatusOnAnilist } =
await import("~/controllers/watch-status/anilist"); await import("~/controllers/watch-status/anilist");
const payload = message.body[AnilistUpdateType.UpdateWatchStatus]; const payload =
anilistUpdateBody[AnilistUpdateType.UpdateWatchStatus];
await updateWatchStatusOnAnilist( await updateWatchStatusOnAnilist(
payload.titleId, payload.titleId,
payload.watchStatus, payload.watchStatus,
payload.aniListToken, payload.aniListToken,
); );
break; break;
default:
throw new Error(
`Unhandled update type: ${anilistUpdateBody.updateType}`,
);
} }
break;
message.ack(); case "NEW_EPISODE":
} const newEpisodeBody = message.body as QueueBody["NEW_EPISODE"];
break;
case "NEW_EPISODE":
for (const message of (batch as MessageBatch<QueueBody["NEW_EPISODE"]>)
.messages) {
await onNewEpisode( await onNewEpisode(
message.body.aniListId, newEpisodeBody.aniListId,
message.body.episodeNumber, newEpisodeBody.episodeNumber,
); );
message.ack(); break;
} default:
break; throw new Error(`Unhandled queue name: ${queueName}`);
} }
});
}, },
async scheduled(event, env, ctx) { async scheduled(event, env, ctx) {
const { processDelayedTasks } = const { processDelayedTasks } =
await import("~/libs/tasks/processDelayedTasks"); await import("~/libs/tasks/processDelayedTasks");
await processDelayedTasks(env, ctx); await processDelayedTasks(env);
}, },
} satisfies ExportedHandler<Env>; } satisfies ExportedHandler<Env>;
const retryDelayConfig: Record<QueueName, DurationLike> = {
ANILIST_UPDATES: Duration.fromObject({ minutes: 1 }),
NEW_EPISODE: Duration.fromObject({ hours: 1 }),
};
function onMessageQueue<QN extends QueueName>(
messageBatch: MessageBatch<unknown>,
callback: (message: Message<QueueBody[QN]>, queueName: QN) => void,
) {
for (const message of messageBatch.messages) {
try {
callback(message as Message<QueueBody[QN]>, messageBatch.queue as QN);
message.ack();
} catch (error) {
console.error(
`Failed to process message ${message.id} for queue ${messageBatch.queue} with body ${JSON.stringify(message.body)}`,
);
console.error(error);
message.retry({
delaySeconds: calculateExponentialBackoff({
attempt: message.attempts,
baseMin: retryDelayConfig[messageBatch.queue as QN],
}),
});
}
}
}
export { AnilistDurableObject as AnilistDo } from "~/libs/anilist/anilist-do.ts"; export { AnilistDurableObject as AnilistDo } from "~/libs/anilist/anilist-do.ts";