Compare commits

...

6 Commits

5 changed files with 136 additions and 43 deletions

View File

@@ -84,7 +84,7 @@ app.openapi(route, async (c) => {
isComplete, isComplete,
); );
if (isComplete) { if (isComplete) {
await updateWatchStatus(c.req, deviceId, aniListId, "COMPLETED"); await updateWatchStatus(deviceId, aniListId, "COMPLETED");
} }
if (!user) { if (!user) {

View File

@@ -64,7 +64,6 @@ const route = createRoute({
}); });
export async function updateWatchStatus( export async function updateWatchStatus(
req: HonoRequest,
deviceId: string, deviceId: string,
titleId: number, titleId: number,
watchStatus: WatchStatus | null, watchStatus: WatchStatus | null,
@@ -88,8 +87,6 @@ app.openapi(route, async (c) => {
titleId, titleId,
isRetrying = false, isRetrying = false,
} = await c.req.json<typeof UpdateWatchStatusRequest._type>(); } = await c.req.json<typeof UpdateWatchStatusRequest._type>();
const aniListToken = c.req.header("X-AniList-Token");
// Check if we should use mock data // Check if we should use mock data
const { useMockData } = await import("~/libs/useMockData"); const { useMockData } = await import("~/libs/useMockData");
if (useMockData()) { if (useMockData()) {
@@ -99,7 +96,7 @@ app.openapi(route, async (c) => {
if (!isRetrying) { if (!isRetrying) {
try { try {
await updateWatchStatus(c.req, deviceId, titleId, watchStatus); await updateWatchStatus(deviceId, titleId, watchStatus);
} catch (error) { } catch (error) {
console.error("Error setting watch status"); console.error("Error setting watch status");
console.error(error); console.error(error);
@@ -107,16 +104,21 @@ app.openapi(route, async (c) => {
} }
} }
const aniListToken = c.req.header("X-AniList-Token");
if (aniListToken) {
await queueTask( await queueTask(
"ANILIST_UPDATES", "ANILIST_UPDATES",
{ {
deviceId, [AnilistUpdateType.UpdateWatchStatus]: {
watchStatus, aniListToken,
titleId, titleId,
watchStatus,
},
updateType: AnilistUpdateType.UpdateWatchStatus, updateType: AnilistUpdateType.UpdateWatchStatus,
}, },
{ req: c.req, scheduleConfig: { delay: { minute: 1 } } }, { req: c.req, scheduleConfig: { delay: { minute: 1 } } },
); );
}
return c.json(SuccessResponse, { status: 200 }); return c.json(SuccessResponse, { status: 200 });
}); });

View File

@@ -1,12 +1,13 @@
import { swaggerUI } from "@hono/swagger-ui"; import { swaggerUI } from "@hono/swagger-ui";
import { OpenAPIHono } from "@hono/zod-openapi"; import { OpenAPIHono } from "@hono/zod-openapi";
import { Duration, type DurationLike } from "luxon";
import { onNewEpisode } from "~/controllers/internal/new-episode";
import { maybeUpdateLastConnectedAt } from "~/controllers/maybeUpdateLastConnectedAt"; import { maybeUpdateLastConnectedAt } from "~/controllers/maybeUpdateLastConnectedAt";
import { AnilistUpdateType } from "~/libs/anilist/updateType";
import { calculateExponentialBackoff } from "~/libs/calculateExponentialBackoff";
import type { QueueName } from "~/libs/tasks/queueName.ts"; import type { QueueName } from "~/libs/tasks/queueName.ts";
import type { QueueBody } from "~/libs/tasks/queueTask";
import { onNewEpisode } from "./controllers/internal/new-episode";
import { AnilistUpdateType } from "./libs/anilist/updateType";
import type { QueueBody } from "./libs/tasks/queueTask";
export const app = new OpenAPIHono<{ Bindings: Env }>(); export const app = new OpenAPIHono<{ Bindings: Env }>();
@@ -73,50 +74,86 @@ app.get("/docs", swaggerUI({ url: "/openapi.json" }));
export default { export default {
fetch: app.fetch, fetch: app.fetch,
async queue(batch) { async queue(batch) {
switch (batch.queue as QueueName) { onMessageQueue(batch, async (message, queueName) => {
switch (queueName) {
case "ANILIST_UPDATES": case "ANILIST_UPDATES":
for (const message of ( const anilistUpdateBody =
batch as MessageBatch<QueueBody["ANILIST_UPDATES"]> message.body as QueueBody["ANILIST_UPDATES"];
).messages) { switch (anilistUpdateBody.updateType) {
switch (message.body.updateType) {
case AnilistUpdateType.UpdateWatchStatus: case AnilistUpdateType.UpdateWatchStatus:
if (!message.body[AnilistUpdateType.UpdateWatchStatus]) { if (!anilistUpdateBody[AnilistUpdateType.UpdateWatchStatus]) {
throw new Error( console.error(
`Discarding update, unknown body ${JSON.stringify(message.body)}`, `Discarding update, unknown body ${JSON.stringify(message.body)}`,
); );
return;
} }
const { updateWatchStatusOnAnilist } = const { updateWatchStatusOnAnilist } =
await import("~/controllers/watch-status/anilist"); await import("~/controllers/watch-status/anilist");
const payload = message.body[AnilistUpdateType.UpdateWatchStatus]; const payload =
anilistUpdateBody[AnilistUpdateType.UpdateWatchStatus];
await updateWatchStatusOnAnilist( await updateWatchStatusOnAnilist(
payload.titleId, payload.titleId,
payload.watchStatus, payload.watchStatus,
payload.aniListToken, payload.aniListToken,
); );
break; break;
} default:
throw new Error(
message.ack(); `Unhandled update type: ${anilistUpdateBody.updateType}`,
);
} }
break; break;
case "NEW_EPISODE": case "NEW_EPISODE":
for (const message of (batch as MessageBatch<QueueBody["NEW_EPISODE"]>) const newEpisodeBody = message.body as QueueBody["NEW_EPISODE"];
.messages) {
await onNewEpisode( await onNewEpisode(
message.body.aniListId, newEpisodeBody.aniListId,
message.body.episodeNumber, newEpisodeBody.episodeNumber,
); );
message.ack();
}
break; break;
default:
throw new Error(`Unhandled queue name: ${queueName}`);
} }
});
}, },
async scheduled(event, env, ctx) { async scheduled(event, env, ctx) {
const { processDelayedTasks } = const { processDelayedTasks } =
await import("~/libs/tasks/processDelayedTasks"); await import("~/libs/tasks/processDelayedTasks");
await processDelayedTasks(env, ctx); await processDelayedTasks(env);
}, },
} satisfies ExportedHandler<Env>; } satisfies ExportedHandler<Env>;
const retryDelayConfig: Partial<
Record<QueueName, { min: DurationLike; max: DurationLike }>
> = {
NEW_EPISODE: {
min: Duration.fromObject({ hours: 1 }),
max: Duration.fromObject({ hours: 12 }),
},
};
function onMessageQueue<QN extends QueueName>(
messageBatch: MessageBatch<unknown>,
callback: (message: Message<QueueBody[QN]>, queueName: QN) => void,
) {
for (const message of messageBatch.messages) {
try {
callback(message as Message<QueueBody[QN]>, messageBatch.queue as QN);
message.ack();
} catch (error) {
console.error(
`Failed to process message ${message.id} for queue ${messageBatch.queue} with body ${JSON.stringify(message.body)}`,
);
console.error(error);
message.retry({
delaySeconds: calculateExponentialBackoff({
attempt: message.attempts,
baseMin: retryDelayConfig[messageBatch.queue as QN]?.min,
absCap: retryDelayConfig[messageBatch.queue as QN]?.max,
}),
});
}
}
}
export { AnilistDurableObject as AnilistDo } from "~/libs/anilist/anilist-do.ts"; export { AnilistDurableObject as AnilistDo } from "~/libs/anilist/anilist-do.ts";

View File

@@ -0,0 +1,53 @@
import { Duration, type DurationLike } from "luxon";
interface CalculateExponentialBackoffOptions {
attempt: number;
baseMin?: DurationLike;
absCap?: DurationLike;
fuzzFactor?: number;
}
/**
* Generates a backoff time where both the Minimum floor and Maximum ceiling
* are "fuzzed" with jitter to prevent clustering at the edges.
*
* @param attempt - The current retry attempt (0-indexed).
* @param baseMin - The nominal minimum wait time (default: 1s).
* @param absCap - The absolute maximum wait time (default: 60s).
* @param fuzzFactor - How much to wobble the edges (0.1 = +/- 10%).
*
* @returns A random duration between the nominal minimum and maximum, in seconds.
*/
export function calculateExponentialBackoff({
attempt,
baseMin: baseMinDuration = Duration.fromObject({ minutes: 1 }),
absCap: absCapDuration = Duration.fromObject({ hours: 1 }),
fuzzFactor = 0.2,
}: CalculateExponentialBackoffOptions): number {
const baseMin = Duration.fromDurationLike(baseMinDuration).as("seconds");
const absCap = Duration.fromDurationLike(absCapDuration).as("seconds");
// 1. Calculate nominal boundaries
// Example: If baseMin is 1s, the nominal boundaries are 1s, 2s, 4s, 8s... (The 'ceiling' grows exponentially)
const nominalMin = baseMin;
const nominalCeiling = Math.min(baseMin * Math.pow(2, attempt), absCap);
// 2. Fuzz the Min (The Floor)
// Example: If min is 1s and fuzz is 0.2, the floor becomes random between 0.8s and 1.2s
const minFuzz = nominalMin * fuzzFactor;
const fuzzedMin = nominalMin + (Math.random() * 2 * minFuzz - minFuzz);
// 3. Fuzz the Max (The Ceiling)
// Example: If ceiling is 4s (and fuzz is 0.2), it becomes random between 3.2s and 4.8s
const maxFuzz = nominalCeiling * fuzzFactor;
const fuzzedCeiling =
nominalCeiling + (Math.random() * 2 * maxFuzz - maxFuzz);
// Safety: Ensure we don't return a negative number or cross boundaries weirdly
// (e.g. if fuzz makes min > max, we swap or clamp)
const safeMin = Math.max(0, fuzzedMin);
const safeMax = Math.max(safeMin, fuzzedCeiling);
// 4. Return random value in the new fuzzy range
return safeMin + Math.random() * (safeMax - safeMin);
}

View File

@@ -132,6 +132,7 @@ function buildTask(
scheduleTime = Duration.fromDurationLike(delay).as("second"); scheduleTime = Duration.fromDurationLike(delay).as("second");
} }
} }
const authorizationHeader = headers?.["X-Anilist-Token"] ? { Authorization: `Bearer ${headers["X-Anilist-Token"]}` } : {};
switch (queueName) { switch (queueName) {
case "ANILIST_UPDATES": case "ANILIST_UPDATES":
@@ -140,8 +141,8 @@ function buildTask(
body, body,
scheduleTime, scheduleTime,
headers: { headers: {
...authorizationHeader,
"Content-Type": "application/json", "Content-Type": "application/json",
"X-Anilist-Token": headers?.["X-Anilist-Token"],
}, },
}; };
default: default: