Compare commits

..

8 Commits

17 changed files with 202 additions and 113 deletions

View File

@@ -31,6 +31,7 @@
},
"devDependencies": {
"@cloudflare/vitest-pool-workers": "^0.10.15",
"@graphql-typed-document-node/core": "^3.2.0",
"@trivago/prettier-plugin-sort-imports": "^4.3.0",
"@types/lodash.mapkeys": "^4.6.9",
"@types/luxon": "^3.6.2",

3
pnpm-lock.yaml generated
View File

@@ -47,6 +47,9 @@ importers:
"@cloudflare/vitest-pool-workers":
specifier: ^0.10.15
version: 0.10.15(@vitest/runner@3.2.4)(@vitest/snapshot@3.2.4)(vitest@3.2.4)
"@graphql-typed-document-node/core":
specifier: ^3.2.0
version: 3.2.0(graphql@16.12.0)
"@trivago/prettier-plugin-sort-imports":
specifier: ^4.3.0
version: 4.3.0(prettier@3.7.4)

View File

@@ -30,7 +30,7 @@ export async function fetchPopularTitlesFromAnilist(
);
break;
case "upcoming":
data = await stub.nextSeasonPopular(next.season, next.year, limit);
data = await stub.nextSeasonPopular(next.season, next.year, page, limit);
break;
default:
throw new Error(`Unknown category: ${category}`);

View File

@@ -2,6 +2,7 @@ import { OpenAPIHono, createRoute, z } from "@hono/zod-openapi";
import { fetchTitleFromAnilist } from "~/libs/anilist/getTitle";
import { fetchFromMultipleSources } from "~/libs/fetchFromMultipleSources";
import { userProfileMiddleware } from "~/middleware/userProfile";
import {
AniListIdQuerySchema,
ErrorResponse,
@@ -9,6 +10,7 @@ import {
SuccessResponseSchema,
} from "~/types/schema";
import { Title } from "~/types/title";
import type { User } from "~/types/user";
const app = new OpenAPIHono();
@@ -40,6 +42,7 @@ const route = createRoute({
description: "Title could not be found",
},
},
middleware: [userProfileMiddleware],
});
app.openapi(route, async (c) => {
@@ -55,7 +58,12 @@ app.openapi(route, async (c) => {
}
const { result: title, errorOccurred } = await fetchFromMultipleSources([
() => fetchTitleFromAnilist(aniListId, aniListToken ?? undefined),
() =>
fetchTitleFromAnilist(
aniListId,
(c.get("user") as User)?.id,
aniListToken ?? undefined,
),
]);
if (errorOccurred) {

View File

@@ -1,5 +1,4 @@
import { OpenAPIHono, createRoute, z } from "@hono/zod-openapi";
import type { HonoRequest } from "hono";
import { AnilistUpdateType } from "~/libs/anilist/updateType.ts";
import { maybeScheduleNextAiringEpisode } from "~/libs/maybeScheduleNextAiringEpisode";
@@ -22,7 +21,6 @@ const UpdateWatchStatusRequest = z.object({
deviceId: z.string(),
watchStatus: WatchStatus.nullable(),
titleId: AniListIdSchema,
isRetrying: z.boolean().optional().default(false),
});
const route = createRoute({
@@ -81,12 +79,8 @@ export async function updateWatchStatus(
}
app.openapi(route, async (c) => {
const {
deviceId,
watchStatus,
titleId,
isRetrying = false,
} = await c.req.json<typeof UpdateWatchStatusRequest._type>();
const { deviceId, watchStatus, titleId } =
await c.req.json<typeof UpdateWatchStatusRequest._type>();
// Check if we should use mock data
const { useMockData } = await import("~/libs/useMockData");
if (useMockData()) {
@@ -94,14 +88,12 @@ app.openapi(route, async (c) => {
return c.json(SuccessResponse, { status: 200 });
}
if (!isRetrying) {
try {
await updateWatchStatus(deviceId, titleId, watchStatus);
} catch (error) {
console.error("Error setting watch status");
console.error(error);
return c.json(ErrorResponse, { status: 500 });
}
try {
await updateWatchStatus(deviceId, titleId, watchStatus);
} catch (error) {
console.error("Error setting watch status");
console.error(error);
return c.json(ErrorResponse, { status: 500 });
}
const aniListToken = c.req.header("X-AniList-Token");

View File

@@ -3,11 +3,14 @@ import { OpenAPIHono } from "@hono/zod-openapi";
import { Duration, type DurationLike } from "luxon";
import { onNewEpisode } from "~/controllers/internal/new-episode";
import { maybeUpdateLastConnectedAt } from "~/controllers/maybeUpdateLastConnectedAt";
import { AnilistUpdateType } from "~/libs/anilist/updateType";
import { calculateExponentialBackoff } from "~/libs/calculateExponentialBackoff";
import type { QueueName } from "~/libs/tasks/queueName.ts";
import type { QueueBody } from "~/libs/tasks/queueTask";
import {
MAX_QUEUE_DELAY_SECONDS,
type QueueBody,
} from "~/libs/tasks/queueTask";
import { maybeUpdateLastConnectedAt } from "~/middleware/maybeUpdateLastConnectedAt";
export const app = new OpenAPIHono<{ Bindings: Env }>();
@@ -79,6 +82,7 @@ export default {
case "ANILIST_UPDATES":
const anilistUpdateBody =
message.body as QueueBody["ANILIST_UPDATES"];
console.log("queue run", message.body);
switch (anilistUpdateBody.updateType) {
case AnilistUpdateType.UpdateWatchStatus:
if (!anilistUpdateBody[AnilistUpdateType.UpdateWatchStatus]) {
@@ -146,11 +150,14 @@ function onMessageQueue<QN extends QueueName>(
);
console.error(error);
message.retry({
delaySeconds: calculateExponentialBackoff({
attempt: message.attempts,
baseMin: retryDelayConfig[messageBatch.queue as QN]?.min,
absCap: retryDelayConfig[messageBatch.queue as QN]?.max,
}),
delaySeconds: Math.min(
calculateExponentialBackoff({
attempt: message.attempts,
baseMin: retryDelayConfig[messageBatch.queue as QN]?.min,
absCap: retryDelayConfig[messageBatch.queue as QN]?.max,
}),
MAX_QUEUE_DELAY_SECONDS,
),
});
}
}

View File

@@ -1,5 +1,7 @@
import type { TypedDocumentNode } from "@graphql-typed-document-node/core";
import { DurableObject } from "cloudflare:workers";
import { print } from "graphql";
import { DateTime } from "luxon";
import { z } from "zod";
import {
@@ -7,6 +9,7 @@ import {
GetNextEpisodeAiringAtQuery,
GetPopularTitlesQuery,
GetTitleQuery,
GetTitleUserDataQuery,
GetTrendingTitlesQuery,
GetUpcomingTitlesQuery,
GetUserProfileQuery,
@@ -17,6 +20,7 @@ import {
SearchQuery,
} from "~/libs/anilist/queries";
import { sleep } from "~/libs/sleep.ts";
import type { Title } from "~/types/title";
const nextAiringEpisodeSchema = z.nullable(
z.object({
@@ -37,30 +41,54 @@ export class AnilistDurableObject extends DurableObject {
return new Response("Not found", { status: 404 });
}
async getTitle(id: number, token?: string) {
return this.handleCachedRequest(
`title:${id}`,
async () => {
const anilistResponse = await this.fetchFromAnilist(
GetTitleQuery,
{ id },
token,
);
return anilistResponse?.Media ?? null;
},
(media) => {
if (!media) return undefined;
// Cast to any to access fragment fields without unmasking
const nextAiringEpisode = nextAiringEpisodeSchema.parse(
(media as any)?.nextAiringEpisode,
);
const airingAt = (nextAiringEpisode?.airingAt ?? 0) * 1000;
if (airingAt) {
return airingAt - Date.now();
}
return undefined;
},
async getTitle(
id: number,
userId?: string,
token?: string,
): Promise<Title | null> {
const promises: Promise<any>[] = [
this.handleCachedRequest(
`title:${id}`,
async () => {
const anilistResponse = await this.fetchFromAnilist(GetTitleQuery, {
id,
});
return anilistResponse?.Media ?? null;
},
(media) => {
if (!media) return undefined;
// Cast to any to access fragment fields without unmasking
const nextAiringEpisode = nextAiringEpisodeSchema.parse(
(media as any)?.nextAiringEpisode,
);
return nextAiringEpisode?.airingAt
? DateTime.fromMillis(nextAiringEpisode?.airingAt)
: undefined;
},
),
];
promises.push(
userId
? this.handleCachedRequest(
`title:${id}:${userId}`,
async () => {
const anilistResponse = await this.fetchFromAnilist(
GetTitleUserDataQuery,
{ id },
{ token },
);
return anilistResponse?.Media ?? null;
},
DateTime.now().plus({ days: 1 }),
)
: Promise.resolve({ mediaListEntry: null }),
);
return Promise.all(promises).then(([title, userTitle]) => ({
...title,
...userTitle,
}));
}
async getNextEpisodeAiringAt(id: number) {
@@ -72,7 +100,7 @@ export class AnilistDurableObject extends DurableObject {
});
return data?.Media;
},
60 * 60 * 1000,
DateTime.now().plus({ hours: 1 }),
);
}
@@ -87,7 +115,7 @@ export class AnilistDurableObject extends DurableObject {
});
return data?.Page;
},
60 * 60 * 1000,
DateTime.now().plus({ hours: 1 }),
);
}
@@ -100,23 +128,28 @@ export class AnilistDurableObject extends DurableObject {
) {
return this.handleCachedRequest(
`popular:${JSON.stringify({ season, seasonYear, nextSeason, nextYear, limit })}`,
async () => {
console.log(nextSeason, nextYear, print(BrowsePopularQuery));
() => {
return this.fetchFromAnilist(BrowsePopularQuery, {
season,
seasonYear,
nextSeason,
nextYear,
limit,
});
page,
}).then((data) => data?.Page);
},
24 * 60 * 60 * 1000,
DateTime.now().plus({ days: 1 }),
);
}
async nextSeasonPopular(nextSeason: any, nextYear: number, limit: number) {
async nextSeasonPopular(
nextSeason: any,
nextYear: number,
page: number,
limit: number,
) {
return this.handleCachedRequest(
`next_season:${JSON.stringify({ nextSeason, nextYear, limit })}`,
`next_season:${JSON.stringify({ nextSeason, nextYear, page, limit })}`,
async () => {
return this.fetchFromAnilist(NextSeasonPopularQuery, {
nextSeason,
@@ -124,7 +157,7 @@ export class AnilistDurableObject extends DurableObject {
limit,
});
},
24 * 60 * 60 * 1000,
DateTime.now().plus({ days: 1 }),
);
}
@@ -137,15 +170,14 @@ export class AnilistDurableObject extends DurableObject {
return this.handleCachedRequest(
`popular:${JSON.stringify({ page, limit, season, seasonYear })}`,
async () => {
const data = await this.fetchFromAnilist(GetPopularTitlesQuery, {
return this.fetchFromAnilist(GetPopularTitlesQuery, {
page,
limit,
season,
seasonYear,
});
return data?.Page;
}).then((data) => data?.Page);
},
24 * 60 * 60 * 1000,
DateTime.now().plus({ days: 1 }),
);
}
@@ -159,7 +191,7 @@ export class AnilistDurableObject extends DurableObject {
});
return data?.Page;
},
24 * 60 * 60 * 1000,
DateTime.now().plus({ days: 1 }),
);
}
@@ -178,7 +210,7 @@ export class AnilistDurableObject extends DurableObject {
});
return data?.Page;
},
24 * 60 * 60 * 1000,
DateTime.now().plus({ days: 1 }),
);
}
@@ -186,10 +218,10 @@ export class AnilistDurableObject extends DurableObject {
return this.handleCachedRequest(
`user:${token}`,
async () => {
const data = await this.fetchFromAnilist(GetUserQuery, {}, token);
const data = await this.fetchFromAnilist(GetUserQuery, {}, { token });
return data?.Viewer;
},
60 * 60 * 24 * 30 * 1000,
DateTime.now().plus({ days: 30 }),
);
}
@@ -200,11 +232,11 @@ export class AnilistDurableObject extends DurableObject {
const data = await this.fetchFromAnilist(
GetUserProfileQuery,
{ token },
token,
{ token },
);
return data?.Viewer;
},
60 * 60 * 24 * 30 * 1000,
DateTime.now().plus({ days: 30 }),
);
}
@@ -216,7 +248,7 @@ export class AnilistDurableObject extends DurableObject {
const data = await this.fetchFromAnilist(
MarkEpisodeAsWatchedMutation,
{ titleId, episodeNumber },
token,
{ token },
);
return data?.SaveMediaListEntry;
}
@@ -225,7 +257,7 @@ export class AnilistDurableObject extends DurableObject {
const data = await this.fetchFromAnilist(
MarkTitleAsWatchedMutation,
{ titleId },
token,
{ token },
);
return data?.SaveMediaListEntry;
}
@@ -234,7 +266,7 @@ export class AnilistDurableObject extends DurableObject {
async handleCachedRequest<T>(
key: string,
fetcher: () => Promise<T>,
ttl?: number | ((data: T) => number | undefined),
ttl?: DateTime | ((data: T) => DateTime | undefined),
) {
const cache = await this.state.storage.get(key);
console.debug(`Retrieving request ${key} from cache:`, cache != null);
@@ -244,12 +276,13 @@ export class AnilistDurableObject extends DurableObject {
const result = await fetcher();
await this.state.storage.put(key, result);
console.debug(`Retrieved alarms from cache:`, Object.entries(alarms));
const calculatedTtl = typeof ttl === "function" ? ttl(result) : ttl;
if (calculatedTtl && calculatedTtl > 0) {
const alarmTime = Date.now() + calculatedTtl;
if (calculatedTtl) {
const alarmTime = calculatedTtl.toMillis();
await this.state.storage.setAlarm(alarmTime);
console.debug(`Deleting storage key ${storageKey} & alarm ${key}`);
await this.state.storage.put(`alarm:${key}`, alarmTime);
}
@@ -271,10 +304,13 @@ export class AnilistDurableObject extends DurableObject {
}
async fetchFromAnilist<Result = any, Variables = any>(
queryString: string,
query: TypedDocumentNode<Result, Variables>,
variables: Variables,
token?: string | undefined,
): Promise<Result> {
{
token,
shouldRetryOnRateLimit = true,
}: { token?: string | undefined; shouldRetryOnRateLimit?: boolean } = {},
): Promise<Result | undefined> {
const headers: any = {
"Content-Type": "application/json",
};
@@ -285,7 +321,7 @@ export class AnilistDurableObject extends DurableObject {
// Use the query passed in, or fallback if needed (though we expect it to be passed)
// We print the query to string
// const queryString = print(query);
const queryString = print(query);
const response = await fetch(`${this.env.PROXY_URL}/proxy`, {
method: "POST",
@@ -304,14 +340,17 @@ export class AnilistDurableObject extends DurableObject {
});
// 1. Handle Rate Limiting (429)
if (response.status === 429) {
if (shouldRetryOnRateLimit && response.status === 429) {
const retryAfter = await response
.json()
.json<{ headers: Record<string, string> }>()
.then(({ headers }) => new Headers(headers).get("Retry-After"));
console.log("429, retrying in", retryAfter);
await sleep(Number(retryAfter || 1) * 1000); // specific fallback or ensure logic
return this.fetchFromAnilist(query, variables, token);
return this.fetchFromAnilist(query, variables, {
token,
shouldRetryOnRateLimit: false,
});
}
// 2. Handle HTTP Errors (like 404 or 500)

View File

@@ -5,6 +5,7 @@ import type { Title } from "~/types/title";
export async function fetchTitleFromAnilist(
id: number,
userId?: number | undefined,
token?: string | undefined,
): Promise<Title | undefined> {
if (useMockData()) {
@@ -17,8 +18,7 @@ export async function fetchTitleFromAnilist(
);
const stub = env.ANILIST_DO.get(durableObjectId);
const data = await stub.getTitle(id, token);
const data = await stub.getTitle(id, userId, token);
if (!data) {
return undefined;
}

View File

@@ -14,6 +14,18 @@ export const GetTitleQuery = graphql(
[MediaFragment],
);
export const GetTitleUserDataQuery = graphql(`
query GetTitleUserData($id: Int!) {
Media(id: $id) {
mediaListEntry {
id
progress
status
}
}
}
`);
export const SearchQuery = graphql(
`
query Search($query: String!, $page: Int!, $limit: Int!) {
@@ -248,7 +260,8 @@ export const NextSeasonPopularQuery = graphql(
$nextYear: Int
$limit: Int!
) {
Page(page: 1, perPage: $limit) {
$page: Int!
Page(page: $page, perPage: $limit) {
media(
season: $nextSeason
seasonYear: $nextYear

View File

@@ -2,7 +2,7 @@ import { DateTime } from "luxon";
import type { DelayedTaskMetadata } from "./delayedTask";
import { deserializeDelayedTask } from "./delayedTask";
import { MAX_DELAY_SECONDS, queueTask } from "./queueTask";
import { MAX_QUEUE_DELAY_SECONDS, queueTask } from "./queueTask";
const RETRY_ALERT_THRESHOLD = 3;
@@ -27,7 +27,7 @@ export async function processDelayedTasks(env: Cloudflare.Env): Promise<void> {
console.log(`Found ${keys.length} delayed tasks to check`);
const currentTime = Math.floor(Date.now() / 1000);
const maxQueueTime = currentTime + MAX_DELAY_SECONDS;
const maxQueueTime = currentTime + MAX_QUEUE_DELAY_SECONDS;
let processedCount = 0;
let queuedCount = 0;

View File

@@ -81,8 +81,8 @@ describe("queueTask - delayed task handling", () => {
});
});
describe("tasks with delay > 9 hours", () => {
it("stores task in KV when delay exceeds 9 hours", async () => {
describe("tasks with delay > 12 hours", () => {
it("stores task in KV when delay exceeds 12 hours", async () => {
await queueTask(
"NEW_EPISODE",
{ aniListId: 111, episodeNumber: 4 },
@@ -98,12 +98,12 @@ describe("queueTask - delayed task handling", () => {
expect(queueSendSpy).not.toHaveBeenCalled();
});
it("stores task in KV when delay is 9 hours + 1 second", async () => {
it("stores task in KV when delay is 12 hours + 1 second", async () => {
await queueTask(
"NEW_EPISODE",
{ aniListId: 222, episodeNumber: 5 },
{
scheduleConfig: { delay: { hours: 9, seconds: 1 } },
scheduleConfig: { delay: { hours: 12, seconds: 1 } },
env: mockEnv,
},
);
@@ -176,7 +176,7 @@ describe("queueTask - delayed task handling", () => {
});
describe("epoch time scheduling", () => {
it("queues directly when epoch time is within 9 hours", async () => {
it("queues directly when epoch time is within 12 hours", async () => {
const futureTime = Math.floor(Date.now() / 1000) + 3600; // 1 hour from now
await queueTask(
@@ -192,7 +192,7 @@ describe("queueTask - delayed task handling", () => {
expect(kvPutSpy).not.toHaveBeenCalled();
});
it("stores in KV when epoch time is beyond 9 hours", async () => {
it("stores in KV when epoch time is beyond 12 hours", async () => {
const futureTime = Math.floor(Date.now() / 1000) + 24 * 3600; // 24 hours from now
await queueTask(

View File

@@ -30,7 +30,7 @@ interface QueueTaskOptionalArgs {
env?: Cloudflare.Env;
}
export const MAX_DELAY_SECONDS = Duration.fromObject({ hours: 9 }).as(
export const MAX_QUEUE_DELAY_SECONDS = Duration.fromObject({ hours: 12 }).as(
"seconds",
);
@@ -46,8 +46,8 @@ export async function queueTask(
req?.header(),
);
// If delay exceeds 9 hours, store in KV for later processing
if (scheduleTime > MAX_DELAY_SECONDS) {
// If delay exceeds 12 hours, store in KV for later processing
if (scheduleTime > MAX_QUEUE_DELAY_SECONDS) {
if (!env || !env.DELAYED_TASKS) {
throw new Error("DELAYED_TASKS KV namespace not available");
}
@@ -132,7 +132,9 @@ function buildTask(
scheduleTime = Duration.fromDurationLike(delay).as("second");
}
}
const authorizationHeader = headers?.["X-Anilist-Token"] ? { Authorization: `Bearer ${headers["X-Anilist-Token"]}` } : {};
const authorizationHeader = headers?.["X-Anilist-Token"]
? { Authorization: `Bearer ${headers["X-Anilist-Token"]}` }
: {};
switch (queueName) {
case "ANILIST_UPDATES":

View File

@@ -0,0 +1,25 @@
import { createMiddleware } from "hono/factory";
import type { User } from "~/types/user";
export const userProfileMiddleware = createMiddleware<
Cloudflare.Env & {
Variables: {
user: User;
};
Bindings: Env;
}
>(async (c, next) => {
const aniListToken = await c.req.header("X-AniList-Token");
if (!aniListToken) {
return next();
}
const user = await c.env.ANILIST_DO.getByName("GLOBAL").getUser(aniListToken);
if (!user) {
return c.json({ error: "User not found" }, 401);
}
c.set("user", user);
return next();
});

View File

@@ -21,11 +21,6 @@ export const MediaFragment = graphql(`
medium
}
countryOfOrigin
mediaListEntry {
id
progress
status
}
nextAiringEpisode {
timeUntilAiring
airingAt

View File

@@ -3,20 +3,24 @@ import { z } from "zod";
export type User = z.infer<typeof User>;
export const User = z
.object({
statistics: z.object({
minutesWatched: z.number().openapi({ type: "integer", format: "int64" }),
episodesWatched: z.number().openapi({ type: "integer", format: "int64" }),
count: z
.number()
.int() /* .openapi({ type: "integer", format: "int64" }) */,
meanScore: z.number().openapi({ type: "number", format: "float" }),
}),
id: z.number().openapi({ type: "integer", format: "int64" }),
name: z.string(),
avatar: z.object({
medium: z.string(),
large: z.string(),
}),
})
.optional()
.nullable();
export type UserProfile = z.infer<typeof UserProfile>;
export const UserProfile = z.object({
statistics: z.object({
minutesWatched: z.number().openapi({ type: "integer", format: "int64" }),
episodesWatched: z.number().openapi({ type: "integer", format: "int64" }),
count: z.number().int(),
meanScore: z.number().openapi({ type: "number", format: "float" }),
}),
id: z.number().openapi({ type: "integer", format: "int64" }),
name: z.string(),
avatar: z.object({
medium: z.string(),
large: z.string(),
}),
});

View File

@@ -67,7 +67,7 @@ id = "c8db249d8ee7462b91f9c374321776e4"
preview_id = "ff38240eb2aa4b1388c705f4974f5aec"
[triggers]
crons = ["0 */9 * * *"]
crons = ["0 */12 * * *"]
[[d1_databases]]
binding = "DB"