import { OpenAPIHono, createRoute, z } from "@hono/zod-openapi"; import { env } from "hono/adapter"; import { streamSSE } from "hono/streaming"; import { fetchEpisodes } from "~/controllers/episodes/getByAniListId"; import { maybeScheduleNextAiringEpisode } from "~/libs/maybeScheduleNextAiringEpisode"; import { readEnvVariable } from "~/libs/readEnvVariable"; import { associateDeviceIdWithUsername } from "~/models/token"; import { setWatchStatus } from "~/models/watchStatus"; import type { Env } from "~/types/env"; import { EpisodesResponseSchema } from "~/types/episode"; import { ErrorResponse, ErrorResponseSchema } from "~/types/schema"; import { Title } from "~/types/title"; import { getUser } from "./getUser"; import { getWatchingTitles } from "./getWatchingTitles"; const UserSchema = z.object({ name: z.string(), avatar: z.object({ medium: z.string().nullable(), large: z.string(), }), statistics: z.object({ minutesWatched: z.number().openapi({ type: "integer", format: "int64" }), episodesWatched: z.number().int(), count: z.number().int(), meanScore: z.number().openapi({ type: "number", format: "float" }), }), }); const route = createRoute({ tags: ["aniplay", "auth"], summary: "Authenticate with AniList and return all upcoming and 'currently watching' titles", operationId: "authenticateAniList", method: "get", path: "/", request: { headers: z.object({ "x-anilist-token": z.string(), "x-aniplay-device-id": z.string(), }), // Uncomment when testing locally // headers: z.object({ // "x-anilist-token": // process.env.NODE_ENV === "production" // ? z.string() // : z.string().optional(), // "x-aniplay-device-id": // process.env.NODE_ENV === "production" // ? z.string() // : z.string().optional(), // }), // query: z.object({ // aniListToken: z.string().optional(), // deviceId: z.string().optional(), // }), }, responses: { 200: { content: { "text/event-stream": { schema: z.union([ z.object({ title: Title, episodes: EpisodesResponseSchema }), UserSchema, ]), }, }, description: "Streams a list of titles", }, 401: { content: { "application/json": { schema: ErrorResponseSchema, }, }, description: "Failed to authenticate with AniList", }, 500: { content: { "application/json": { schema: ErrorResponseSchema, }, }, description: "Error fetching episodes", }, }, }); const app = new OpenAPIHono(); app.openapi(route, async (c) => { const deviceId = c.req.header("X-Aniplay-Device-Id") ?? c.req.query("deviceId"); const aniListToken = c.req.header("X-AniList-Token") ?? c.req.query("aniListToken"); if (!aniListToken) { return c.json(ErrorResponse, { status: 401 }); } let user: Awaited>; try { user = await getUser(aniListToken); if (!user) { return c.json(ErrorResponse, { status: 401 }); } } catch (error) { console.error(new Error("Failed to authenticate with AniList")); console.error(error); return c.json(ErrorResponse, { status: 500 }); } try { await associateDeviceIdWithUsername( env(c, "workerd"), deviceId!, user.name!, ); } catch (error) { console.error(new Error("Failed to associate device")); console.error(error); return c.json(ErrorResponse, { status: 500 }); } return streamSSE( c, async (stream) => { stream.writeSSE({ event: "user", data: JSON.stringify(user) }); let currentPage = 1; let hasNextPage = true; do { const { mediaList, pageInfo } = await getWatchingTitles( user.name!, currentPage++, aniListToken, c.executionCtx, ); if (!mediaList) { break; } if (!(pageInfo?.hasNextPage ?? false) && (pageInfo?.total ?? 0) > 0) { stream.writeSSE({ event: "count", data: pageInfo!.total.toString(), }); } for (const mediaObj of mediaList) { const media = mediaObj?.media!; if (!media) { continue; } const mediaListEntry = media.mediaListEntry; if (mediaListEntry) { const { wasAdded } = await setWatchStatus( env(c, "workerd"), deviceId!, media.id, mediaListEntry.status, ); if (wasAdded) { await maybeScheduleNextAiringEpisode( env(c, "workerd"), c.req, media.id, ); } } const nextEpisode = media.nextAiringEpisode?.episode; if ( nextEpisode === 0 || nextEpisode === 1 || media.status === "NOT_YET_RELEASED" ) { await stream.writeSSE({ event: "title", data: JSON.stringify({ title: media, episodes: [] }), id: media.id.toString(), }); continue; } await fetchEpisodes( media.id, readEnvVariable(c.env, "ENABLE_ANIFY"), ).then(({ result: { episodes } }) => { stream.writeSSE({ event: "title", data: JSON.stringify({ title: media, episodes }), id: media.id.toString(), }); }); } hasNextPage = pageInfo?.hasNextPage ?? false; hasNextPage = pageInfo?.hasNextPage ?? false; console.log(hasNextPage); hasNextPage = pageInfo?.hasNextPage ?? false; console.log(hasNextPage); } while (hasNextPage); // send end event instead of closing the connection to let the client know that the stream didn't end abruptly await stream.writeSSE({ event: "end", data: "end" }); }, async (err, stream) => { console.error("Error occurred in SSE"); console.error(err); await stream.writeln("An error occurred"); await stream.close(); }, ); }); export default app;