Compare commits
6 Commits
c527a6eac5
...
6570c25617
| Author | SHA1 | Date | |
|---|---|---|---|
| 6570c25617 | |||
| 6f795bdde0 | |||
| 243c279ca9 | |||
| 286824e3a1 | |||
| b26d22ad91 | |||
| 3c5685dbdb |
@@ -84,7 +84,7 @@ app.openapi(route, async (c) => {
|
||||
isComplete,
|
||||
);
|
||||
if (isComplete) {
|
||||
await updateWatchStatus(c.req, deviceId, aniListId, "COMPLETED");
|
||||
await updateWatchStatus(deviceId, aniListId, "COMPLETED");
|
||||
}
|
||||
|
||||
if (!user) {
|
||||
|
||||
@@ -64,7 +64,6 @@ const route = createRoute({
|
||||
});
|
||||
|
||||
export async function updateWatchStatus(
|
||||
req: HonoRequest,
|
||||
deviceId: string,
|
||||
titleId: number,
|
||||
watchStatus: WatchStatus | null,
|
||||
@@ -88,8 +87,6 @@ app.openapi(route, async (c) => {
|
||||
titleId,
|
||||
isRetrying = false,
|
||||
} = await c.req.json<typeof UpdateWatchStatusRequest._type>();
|
||||
const aniListToken = c.req.header("X-AniList-Token");
|
||||
|
||||
// Check if we should use mock data
|
||||
const { useMockData } = await import("~/libs/useMockData");
|
||||
if (useMockData()) {
|
||||
@@ -99,7 +96,7 @@ app.openapi(route, async (c) => {
|
||||
|
||||
if (!isRetrying) {
|
||||
try {
|
||||
await updateWatchStatus(c.req, deviceId, titleId, watchStatus);
|
||||
await updateWatchStatus(deviceId, titleId, watchStatus);
|
||||
} catch (error) {
|
||||
console.error("Error setting watch status");
|
||||
console.error(error);
|
||||
@@ -107,16 +104,21 @@ app.openapi(route, async (c) => {
|
||||
}
|
||||
}
|
||||
|
||||
await queueTask(
|
||||
"ANILIST_UPDATES",
|
||||
{
|
||||
deviceId,
|
||||
watchStatus,
|
||||
titleId,
|
||||
updateType: AnilistUpdateType.UpdateWatchStatus,
|
||||
},
|
||||
{ req: c.req, scheduleConfig: { delay: { minute: 1 } } },
|
||||
);
|
||||
const aniListToken = c.req.header("X-AniList-Token");
|
||||
if (aniListToken) {
|
||||
await queueTask(
|
||||
"ANILIST_UPDATES",
|
||||
{
|
||||
[AnilistUpdateType.UpdateWatchStatus]: {
|
||||
aniListToken,
|
||||
titleId,
|
||||
watchStatus,
|
||||
},
|
||||
updateType: AnilistUpdateType.UpdateWatchStatus,
|
||||
},
|
||||
{ req: c.req, scheduleConfig: { delay: { minute: 1 } } },
|
||||
);
|
||||
}
|
||||
|
||||
return c.json(SuccessResponse, { status: 200 });
|
||||
});
|
||||
|
||||
91
src/index.ts
91
src/index.ts
@@ -1,12 +1,13 @@
|
||||
import { swaggerUI } from "@hono/swagger-ui";
|
||||
import { OpenAPIHono } from "@hono/zod-openapi";
|
||||
import { Duration, type DurationLike } from "luxon";
|
||||
|
||||
import { onNewEpisode } from "~/controllers/internal/new-episode";
|
||||
import { maybeUpdateLastConnectedAt } from "~/controllers/maybeUpdateLastConnectedAt";
|
||||
import { AnilistUpdateType } from "~/libs/anilist/updateType";
|
||||
import { calculateExponentialBackoff } from "~/libs/calculateExponentialBackoff";
|
||||
import type { QueueName } from "~/libs/tasks/queueName.ts";
|
||||
|
||||
import { onNewEpisode } from "./controllers/internal/new-episode";
|
||||
import { AnilistUpdateType } from "./libs/anilist/updateType";
|
||||
import type { QueueBody } from "./libs/tasks/queueTask";
|
||||
import type { QueueBody } from "~/libs/tasks/queueTask";
|
||||
|
||||
export const app = new OpenAPIHono<{ Bindings: Env }>();
|
||||
|
||||
@@ -73,50 +74,86 @@ app.get("/docs", swaggerUI({ url: "/openapi.json" }));
|
||||
export default {
|
||||
fetch: app.fetch,
|
||||
async queue(batch) {
|
||||
switch (batch.queue as QueueName) {
|
||||
case "ANILIST_UPDATES":
|
||||
for (const message of (
|
||||
batch as MessageBatch<QueueBody["ANILIST_UPDATES"]>
|
||||
).messages) {
|
||||
switch (message.body.updateType) {
|
||||
onMessageQueue(batch, async (message, queueName) => {
|
||||
switch (queueName) {
|
||||
case "ANILIST_UPDATES":
|
||||
const anilistUpdateBody =
|
||||
message.body as QueueBody["ANILIST_UPDATES"];
|
||||
switch (anilistUpdateBody.updateType) {
|
||||
case AnilistUpdateType.UpdateWatchStatus:
|
||||
if (!message.body[AnilistUpdateType.UpdateWatchStatus]) {
|
||||
throw new Error(
|
||||
if (!anilistUpdateBody[AnilistUpdateType.UpdateWatchStatus]) {
|
||||
console.error(
|
||||
`Discarding update, unknown body ${JSON.stringify(message.body)}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const { updateWatchStatusOnAnilist } =
|
||||
await import("~/controllers/watch-status/anilist");
|
||||
const payload = message.body[AnilistUpdateType.UpdateWatchStatus];
|
||||
const payload =
|
||||
anilistUpdateBody[AnilistUpdateType.UpdateWatchStatus];
|
||||
await updateWatchStatusOnAnilist(
|
||||
payload.titleId,
|
||||
payload.watchStatus,
|
||||
payload.aniListToken,
|
||||
);
|
||||
break;
|
||||
default:
|
||||
throw new Error(
|
||||
`Unhandled update type: ${anilistUpdateBody.updateType}`,
|
||||
);
|
||||
}
|
||||
|
||||
message.ack();
|
||||
}
|
||||
break;
|
||||
case "NEW_EPISODE":
|
||||
for (const message of (batch as MessageBatch<QueueBody["NEW_EPISODE"]>)
|
||||
.messages) {
|
||||
break;
|
||||
case "NEW_EPISODE":
|
||||
const newEpisodeBody = message.body as QueueBody["NEW_EPISODE"];
|
||||
await onNewEpisode(
|
||||
message.body.aniListId,
|
||||
message.body.episodeNumber,
|
||||
newEpisodeBody.aniListId,
|
||||
newEpisodeBody.episodeNumber,
|
||||
);
|
||||
message.ack();
|
||||
}
|
||||
break;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new Error(`Unhandled queue name: ${queueName}`);
|
||||
}
|
||||
});
|
||||
},
|
||||
async scheduled(event, env, ctx) {
|
||||
const { processDelayedTasks } =
|
||||
await import("~/libs/tasks/processDelayedTasks");
|
||||
await processDelayedTasks(env, ctx);
|
||||
await processDelayedTasks(env);
|
||||
},
|
||||
} satisfies ExportedHandler<Env>;
|
||||
|
||||
const retryDelayConfig: Partial<
|
||||
Record<QueueName, { min: DurationLike; max: DurationLike }>
|
||||
> = {
|
||||
NEW_EPISODE: {
|
||||
min: Duration.fromObject({ hours: 1 }),
|
||||
max: Duration.fromObject({ hours: 12 }),
|
||||
},
|
||||
};
|
||||
|
||||
function onMessageQueue<QN extends QueueName>(
|
||||
messageBatch: MessageBatch<unknown>,
|
||||
callback: (message: Message<QueueBody[QN]>, queueName: QN) => void,
|
||||
) {
|
||||
for (const message of messageBatch.messages) {
|
||||
try {
|
||||
callback(message as Message<QueueBody[QN]>, messageBatch.queue as QN);
|
||||
message.ack();
|
||||
} catch (error) {
|
||||
console.error(
|
||||
`Failed to process message ${message.id} for queue ${messageBatch.queue} with body ${JSON.stringify(message.body)}`,
|
||||
);
|
||||
console.error(error);
|
||||
message.retry({
|
||||
delaySeconds: calculateExponentialBackoff({
|
||||
attempt: message.attempts,
|
||||
baseMin: retryDelayConfig[messageBatch.queue as QN]?.min,
|
||||
absCap: retryDelayConfig[messageBatch.queue as QN]?.max,
|
||||
}),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export { AnilistDurableObject as AnilistDo } from "~/libs/anilist/anilist-do.ts";
|
||||
|
||||
53
src/libs/calculateExponentialBackoff.ts
Normal file
53
src/libs/calculateExponentialBackoff.ts
Normal file
@@ -0,0 +1,53 @@
|
||||
import { Duration, type DurationLike } from "luxon";
|
||||
|
||||
interface CalculateExponentialBackoffOptions {
|
||||
attempt: number;
|
||||
baseMin?: DurationLike;
|
||||
absCap?: DurationLike;
|
||||
fuzzFactor?: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates a backoff time where both the Minimum floor and Maximum ceiling
|
||||
* are "fuzzed" with jitter to prevent clustering at the edges.
|
||||
*
|
||||
* @param attempt - The current retry attempt (0-indexed).
|
||||
* @param baseMin - The nominal minimum wait time (default: 1s).
|
||||
* @param absCap - The absolute maximum wait time (default: 60s).
|
||||
* @param fuzzFactor - How much to wobble the edges (0.1 = +/- 10%).
|
||||
*
|
||||
* @returns A random duration between the nominal minimum and maximum, in seconds.
|
||||
*/
|
||||
export function calculateExponentialBackoff({
|
||||
attempt,
|
||||
baseMin: baseMinDuration = Duration.fromObject({ minutes: 1 }),
|
||||
absCap: absCapDuration = Duration.fromObject({ hours: 1 }),
|
||||
fuzzFactor = 0.2,
|
||||
}: CalculateExponentialBackoffOptions): number {
|
||||
const baseMin = Duration.fromDurationLike(baseMinDuration).as("seconds");
|
||||
const absCap = Duration.fromDurationLike(absCapDuration).as("seconds");
|
||||
|
||||
// 1. Calculate nominal boundaries
|
||||
// Example: If baseMin is 1s, the nominal boundaries are 1s, 2s, 4s, 8s... (The 'ceiling' grows exponentially)
|
||||
const nominalMin = baseMin;
|
||||
const nominalCeiling = Math.min(baseMin * Math.pow(2, attempt), absCap);
|
||||
|
||||
// 2. Fuzz the Min (The Floor)
|
||||
// Example: If min is 1s and fuzz is 0.2, the floor becomes random between 0.8s and 1.2s
|
||||
const minFuzz = nominalMin * fuzzFactor;
|
||||
const fuzzedMin = nominalMin + (Math.random() * 2 * minFuzz - minFuzz);
|
||||
|
||||
// 3. Fuzz the Max (The Ceiling)
|
||||
// Example: If ceiling is 4s (and fuzz is 0.2), it becomes random between 3.2s and 4.8s
|
||||
const maxFuzz = nominalCeiling * fuzzFactor;
|
||||
const fuzzedCeiling =
|
||||
nominalCeiling + (Math.random() * 2 * maxFuzz - maxFuzz);
|
||||
|
||||
// Safety: Ensure we don't return a negative number or cross boundaries weirdly
|
||||
// (e.g. if fuzz makes min > max, we swap or clamp)
|
||||
const safeMin = Math.max(0, fuzzedMin);
|
||||
const safeMax = Math.max(safeMin, fuzzedCeiling);
|
||||
|
||||
// 4. Return random value in the new fuzzy range
|
||||
return safeMin + Math.random() * (safeMax - safeMin);
|
||||
}
|
||||
@@ -132,6 +132,7 @@ function buildTask(
|
||||
scheduleTime = Duration.fromDurationLike(delay).as("second");
|
||||
}
|
||||
}
|
||||
const authorizationHeader = headers?.["X-Anilist-Token"] ? { Authorization: `Bearer ${headers["X-Anilist-Token"]}` } : {};
|
||||
|
||||
switch (queueName) {
|
||||
case "ANILIST_UPDATES":
|
||||
@@ -140,8 +141,8 @@ function buildTask(
|
||||
body,
|
||||
scheduleTime,
|
||||
headers: {
|
||||
...authorizationHeader,
|
||||
"Content-Type": "application/json",
|
||||
"X-Anilist-Token": headers?.["X-Anilist-Token"],
|
||||
},
|
||||
};
|
||||
default:
|
||||
|
||||
Reference in New Issue
Block a user