Compare commits
6 Commits
c527a6eac5
...
6570c25617
| Author | SHA1 | Date | |
|---|---|---|---|
| 6570c25617 | |||
| 6f795bdde0 | |||
| 243c279ca9 | |||
| 286824e3a1 | |||
| b26d22ad91 | |||
| 3c5685dbdb |
@@ -84,7 +84,7 @@ app.openapi(route, async (c) => {
|
|||||||
isComplete,
|
isComplete,
|
||||||
);
|
);
|
||||||
if (isComplete) {
|
if (isComplete) {
|
||||||
await updateWatchStatus(c.req, deviceId, aniListId, "COMPLETED");
|
await updateWatchStatus(deviceId, aniListId, "COMPLETED");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!user) {
|
if (!user) {
|
||||||
|
|||||||
@@ -64,7 +64,6 @@ const route = createRoute({
|
|||||||
});
|
});
|
||||||
|
|
||||||
export async function updateWatchStatus(
|
export async function updateWatchStatus(
|
||||||
req: HonoRequest,
|
|
||||||
deviceId: string,
|
deviceId: string,
|
||||||
titleId: number,
|
titleId: number,
|
||||||
watchStatus: WatchStatus | null,
|
watchStatus: WatchStatus | null,
|
||||||
@@ -88,8 +87,6 @@ app.openapi(route, async (c) => {
|
|||||||
titleId,
|
titleId,
|
||||||
isRetrying = false,
|
isRetrying = false,
|
||||||
} = await c.req.json<typeof UpdateWatchStatusRequest._type>();
|
} = await c.req.json<typeof UpdateWatchStatusRequest._type>();
|
||||||
const aniListToken = c.req.header("X-AniList-Token");
|
|
||||||
|
|
||||||
// Check if we should use mock data
|
// Check if we should use mock data
|
||||||
const { useMockData } = await import("~/libs/useMockData");
|
const { useMockData } = await import("~/libs/useMockData");
|
||||||
if (useMockData()) {
|
if (useMockData()) {
|
||||||
@@ -99,7 +96,7 @@ app.openapi(route, async (c) => {
|
|||||||
|
|
||||||
if (!isRetrying) {
|
if (!isRetrying) {
|
||||||
try {
|
try {
|
||||||
await updateWatchStatus(c.req, deviceId, titleId, watchStatus);
|
await updateWatchStatus(deviceId, titleId, watchStatus);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error("Error setting watch status");
|
console.error("Error setting watch status");
|
||||||
console.error(error);
|
console.error(error);
|
||||||
@@ -107,16 +104,21 @@ app.openapi(route, async (c) => {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const aniListToken = c.req.header("X-AniList-Token");
|
||||||
|
if (aniListToken) {
|
||||||
await queueTask(
|
await queueTask(
|
||||||
"ANILIST_UPDATES",
|
"ANILIST_UPDATES",
|
||||||
{
|
{
|
||||||
deviceId,
|
[AnilistUpdateType.UpdateWatchStatus]: {
|
||||||
watchStatus,
|
aniListToken,
|
||||||
titleId,
|
titleId,
|
||||||
|
watchStatus,
|
||||||
|
},
|
||||||
updateType: AnilistUpdateType.UpdateWatchStatus,
|
updateType: AnilistUpdateType.UpdateWatchStatus,
|
||||||
},
|
},
|
||||||
{ req: c.req, scheduleConfig: { delay: { minute: 1 } } },
|
{ req: c.req, scheduleConfig: { delay: { minute: 1 } } },
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
|
||||||
return c.json(SuccessResponse, { status: 200 });
|
return c.json(SuccessResponse, { status: 200 });
|
||||||
});
|
});
|
||||||
|
|||||||
81
src/index.ts
81
src/index.ts
@@ -1,12 +1,13 @@
|
|||||||
import { swaggerUI } from "@hono/swagger-ui";
|
import { swaggerUI } from "@hono/swagger-ui";
|
||||||
import { OpenAPIHono } from "@hono/zod-openapi";
|
import { OpenAPIHono } from "@hono/zod-openapi";
|
||||||
|
import { Duration, type DurationLike } from "luxon";
|
||||||
|
|
||||||
|
import { onNewEpisode } from "~/controllers/internal/new-episode";
|
||||||
import { maybeUpdateLastConnectedAt } from "~/controllers/maybeUpdateLastConnectedAt";
|
import { maybeUpdateLastConnectedAt } from "~/controllers/maybeUpdateLastConnectedAt";
|
||||||
|
import { AnilistUpdateType } from "~/libs/anilist/updateType";
|
||||||
|
import { calculateExponentialBackoff } from "~/libs/calculateExponentialBackoff";
|
||||||
import type { QueueName } from "~/libs/tasks/queueName.ts";
|
import type { QueueName } from "~/libs/tasks/queueName.ts";
|
||||||
|
import type { QueueBody } from "~/libs/tasks/queueTask";
|
||||||
import { onNewEpisode } from "./controllers/internal/new-episode";
|
|
||||||
import { AnilistUpdateType } from "./libs/anilist/updateType";
|
|
||||||
import type { QueueBody } from "./libs/tasks/queueTask";
|
|
||||||
|
|
||||||
export const app = new OpenAPIHono<{ Bindings: Env }>();
|
export const app = new OpenAPIHono<{ Bindings: Env }>();
|
||||||
|
|
||||||
@@ -73,50 +74,86 @@ app.get("/docs", swaggerUI({ url: "/openapi.json" }));
|
|||||||
export default {
|
export default {
|
||||||
fetch: app.fetch,
|
fetch: app.fetch,
|
||||||
async queue(batch) {
|
async queue(batch) {
|
||||||
switch (batch.queue as QueueName) {
|
onMessageQueue(batch, async (message, queueName) => {
|
||||||
|
switch (queueName) {
|
||||||
case "ANILIST_UPDATES":
|
case "ANILIST_UPDATES":
|
||||||
for (const message of (
|
const anilistUpdateBody =
|
||||||
batch as MessageBatch<QueueBody["ANILIST_UPDATES"]>
|
message.body as QueueBody["ANILIST_UPDATES"];
|
||||||
).messages) {
|
switch (anilistUpdateBody.updateType) {
|
||||||
switch (message.body.updateType) {
|
|
||||||
case AnilistUpdateType.UpdateWatchStatus:
|
case AnilistUpdateType.UpdateWatchStatus:
|
||||||
if (!message.body[AnilistUpdateType.UpdateWatchStatus]) {
|
if (!anilistUpdateBody[AnilistUpdateType.UpdateWatchStatus]) {
|
||||||
throw new Error(
|
console.error(
|
||||||
`Discarding update, unknown body ${JSON.stringify(message.body)}`,
|
`Discarding update, unknown body ${JSON.stringify(message.body)}`,
|
||||||
);
|
);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const { updateWatchStatusOnAnilist } =
|
const { updateWatchStatusOnAnilist } =
|
||||||
await import("~/controllers/watch-status/anilist");
|
await import("~/controllers/watch-status/anilist");
|
||||||
const payload = message.body[AnilistUpdateType.UpdateWatchStatus];
|
const payload =
|
||||||
|
anilistUpdateBody[AnilistUpdateType.UpdateWatchStatus];
|
||||||
await updateWatchStatusOnAnilist(
|
await updateWatchStatusOnAnilist(
|
||||||
payload.titleId,
|
payload.titleId,
|
||||||
payload.watchStatus,
|
payload.watchStatus,
|
||||||
payload.aniListToken,
|
payload.aniListToken,
|
||||||
);
|
);
|
||||||
break;
|
break;
|
||||||
}
|
default:
|
||||||
|
throw new Error(
|
||||||
message.ack();
|
`Unhandled update type: ${anilistUpdateBody.updateType}`,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case "NEW_EPISODE":
|
case "NEW_EPISODE":
|
||||||
for (const message of (batch as MessageBatch<QueueBody["NEW_EPISODE"]>)
|
const newEpisodeBody = message.body as QueueBody["NEW_EPISODE"];
|
||||||
.messages) {
|
|
||||||
await onNewEpisode(
|
await onNewEpisode(
|
||||||
message.body.aniListId,
|
newEpisodeBody.aniListId,
|
||||||
message.body.episodeNumber,
|
newEpisodeBody.episodeNumber,
|
||||||
);
|
);
|
||||||
message.ack();
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
|
default:
|
||||||
|
throw new Error(`Unhandled queue name: ${queueName}`);
|
||||||
}
|
}
|
||||||
|
});
|
||||||
},
|
},
|
||||||
async scheduled(event, env, ctx) {
|
async scheduled(event, env, ctx) {
|
||||||
const { processDelayedTasks } =
|
const { processDelayedTasks } =
|
||||||
await import("~/libs/tasks/processDelayedTasks");
|
await import("~/libs/tasks/processDelayedTasks");
|
||||||
await processDelayedTasks(env, ctx);
|
await processDelayedTasks(env);
|
||||||
},
|
},
|
||||||
} satisfies ExportedHandler<Env>;
|
} satisfies ExportedHandler<Env>;
|
||||||
|
|
||||||
|
const retryDelayConfig: Partial<
|
||||||
|
Record<QueueName, { min: DurationLike; max: DurationLike }>
|
||||||
|
> = {
|
||||||
|
NEW_EPISODE: {
|
||||||
|
min: Duration.fromObject({ hours: 1 }),
|
||||||
|
max: Duration.fromObject({ hours: 12 }),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
function onMessageQueue<QN extends QueueName>(
|
||||||
|
messageBatch: MessageBatch<unknown>,
|
||||||
|
callback: (message: Message<QueueBody[QN]>, queueName: QN) => void,
|
||||||
|
) {
|
||||||
|
for (const message of messageBatch.messages) {
|
||||||
|
try {
|
||||||
|
callback(message as Message<QueueBody[QN]>, messageBatch.queue as QN);
|
||||||
|
message.ack();
|
||||||
|
} catch (error) {
|
||||||
|
console.error(
|
||||||
|
`Failed to process message ${message.id} for queue ${messageBatch.queue} with body ${JSON.stringify(message.body)}`,
|
||||||
|
);
|
||||||
|
console.error(error);
|
||||||
|
message.retry({
|
||||||
|
delaySeconds: calculateExponentialBackoff({
|
||||||
|
attempt: message.attempts,
|
||||||
|
baseMin: retryDelayConfig[messageBatch.queue as QN]?.min,
|
||||||
|
absCap: retryDelayConfig[messageBatch.queue as QN]?.max,
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export { AnilistDurableObject as AnilistDo } from "~/libs/anilist/anilist-do.ts";
|
export { AnilistDurableObject as AnilistDo } from "~/libs/anilist/anilist-do.ts";
|
||||||
|
|||||||
53
src/libs/calculateExponentialBackoff.ts
Normal file
53
src/libs/calculateExponentialBackoff.ts
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
import { Duration, type DurationLike } from "luxon";
|
||||||
|
|
||||||
|
interface CalculateExponentialBackoffOptions {
|
||||||
|
attempt: number;
|
||||||
|
baseMin?: DurationLike;
|
||||||
|
absCap?: DurationLike;
|
||||||
|
fuzzFactor?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generates a backoff time where both the Minimum floor and Maximum ceiling
|
||||||
|
* are "fuzzed" with jitter to prevent clustering at the edges.
|
||||||
|
*
|
||||||
|
* @param attempt - The current retry attempt (0-indexed).
|
||||||
|
* @param baseMin - The nominal minimum wait time (default: 1s).
|
||||||
|
* @param absCap - The absolute maximum wait time (default: 60s).
|
||||||
|
* @param fuzzFactor - How much to wobble the edges (0.1 = +/- 10%).
|
||||||
|
*
|
||||||
|
* @returns A random duration between the nominal minimum and maximum, in seconds.
|
||||||
|
*/
|
||||||
|
export function calculateExponentialBackoff({
|
||||||
|
attempt,
|
||||||
|
baseMin: baseMinDuration = Duration.fromObject({ minutes: 1 }),
|
||||||
|
absCap: absCapDuration = Duration.fromObject({ hours: 1 }),
|
||||||
|
fuzzFactor = 0.2,
|
||||||
|
}: CalculateExponentialBackoffOptions): number {
|
||||||
|
const baseMin = Duration.fromDurationLike(baseMinDuration).as("seconds");
|
||||||
|
const absCap = Duration.fromDurationLike(absCapDuration).as("seconds");
|
||||||
|
|
||||||
|
// 1. Calculate nominal boundaries
|
||||||
|
// Example: If baseMin is 1s, the nominal boundaries are 1s, 2s, 4s, 8s... (The 'ceiling' grows exponentially)
|
||||||
|
const nominalMin = baseMin;
|
||||||
|
const nominalCeiling = Math.min(baseMin * Math.pow(2, attempt), absCap);
|
||||||
|
|
||||||
|
// 2. Fuzz the Min (The Floor)
|
||||||
|
// Example: If min is 1s and fuzz is 0.2, the floor becomes random between 0.8s and 1.2s
|
||||||
|
const minFuzz = nominalMin * fuzzFactor;
|
||||||
|
const fuzzedMin = nominalMin + (Math.random() * 2 * minFuzz - minFuzz);
|
||||||
|
|
||||||
|
// 3. Fuzz the Max (The Ceiling)
|
||||||
|
// Example: If ceiling is 4s (and fuzz is 0.2), it becomes random between 3.2s and 4.8s
|
||||||
|
const maxFuzz = nominalCeiling * fuzzFactor;
|
||||||
|
const fuzzedCeiling =
|
||||||
|
nominalCeiling + (Math.random() * 2 * maxFuzz - maxFuzz);
|
||||||
|
|
||||||
|
// Safety: Ensure we don't return a negative number or cross boundaries weirdly
|
||||||
|
// (e.g. if fuzz makes min > max, we swap or clamp)
|
||||||
|
const safeMin = Math.max(0, fuzzedMin);
|
||||||
|
const safeMax = Math.max(safeMin, fuzzedCeiling);
|
||||||
|
|
||||||
|
// 4. Return random value in the new fuzzy range
|
||||||
|
return safeMin + Math.random() * (safeMax - safeMin);
|
||||||
|
}
|
||||||
@@ -132,6 +132,7 @@ function buildTask(
|
|||||||
scheduleTime = Duration.fromDurationLike(delay).as("second");
|
scheduleTime = Duration.fromDurationLike(delay).as("second");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
const authorizationHeader = headers?.["X-Anilist-Token"] ? { Authorization: `Bearer ${headers["X-Anilist-Token"]}` } : {};
|
||||||
|
|
||||||
switch (queueName) {
|
switch (queueName) {
|
||||||
case "ANILIST_UPDATES":
|
case "ANILIST_UPDATES":
|
||||||
@@ -140,8 +141,8 @@ function buildTask(
|
|||||||
body,
|
body,
|
||||||
scheduleTime,
|
scheduleTime,
|
||||||
headers: {
|
headers: {
|
||||||
|
...authorizationHeader,
|
||||||
"Content-Type": "application/json",
|
"Content-Type": "application/json",
|
||||||
"X-Anilist-Token": headers?.["X-Anilist-Token"],
|
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
default:
|
default:
|
||||||
|
|||||||
Reference in New Issue
Block a user