import type { HonoRequest } from "hono"; import isEqual from "lodash.isequal"; import { DateTime, type DurationLike } from "luxon"; import type { Env } from "~/types/env"; import type { WatchStatus } from "~/types/title/watchStatus"; import { FailedToQueueTaskError } from "../errors/FailedToQueueTask"; import { getAdminSdkCredentials } from "../gcloud/getAdminSdkCredentials"; import { getGoogleAuthToken } from "../gcloud/getGoogleAuthToken"; import { getCurrentDomain } from "../getCurrentDomain"; import { buildAnilistRetryTaskId, buildNewEpisodeTaskId } from "./id"; import type { QueueName } from "./queueName"; type QueueBody = { "anilist-updates": { deviceId: string; watchStatus: WatchStatus | null; titleId: number; isRetrying: true; }; "new-episode": { aniListId: number; episodeNumber: number }; }; type ScheduleConfig = | { delay: DurationLike; epochTime: never } | { epochTime: number; delay: never }; interface QueueTaskOptionalArgs { scheduleConfig?: ScheduleConfig; /** when req is not provided, that means the task is being created locally */ req?: HonoRequest; } export async function queueTask( env: Env, queueName: QueueName, body: QueueBody[QueueName], { scheduleConfig, req }: QueueTaskOptionalArgs = {}, ) { const domain = req ? getCurrentDomain(req) : "https://aniplay-v2.rururu.workers.dev"; if (!domain) { console.log("Skipping queue task due to local domain", queueName, body); return; } const adminSdkCredentials = getAdminSdkCredentials(env); const { projectId } = adminSdkCredentials; const task = buildTask( projectId, queueName, scheduleConfig, domain, body, req?.header(), ); const { res } = await queueCloudTask(task); if (!res.ok) { if (res.status === 409) { if ( await checkIfTaskExists( env, queueName, task.name.split("/").at(-1)!, body, ) ) { return; } else { const hashedTaskName = await import("node:crypto").then( ({ createHash }) => createHash("sha256") .update(task.name.split("/").at(-1)!) .digest("hex"), ); console.log(hashedTaskName); console.log( { ...task, name: hashedTaskName, }, { ...task, name: task.name.split("/").slice(0, -1).join("/") + "/" + hashedTaskName, }, ); const { res } = await queueCloudTask({ ...task, name: task.name.split("/").slice(0, -1).join("/") + "/" + hashedTaskName, }); if (!res.ok) { throw new FailedToQueueTaskError(res.status, await res.text()); } } } throw new FailedToQueueTaskError(res.status, await res.text()); } async function queueCloudTask(task: object) { const res = await fetch( `https://content-cloudtasks.googleapis.com/v2/projects/${projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks?alt=json`, { headers: { Authorization: `Bearer ${await getGoogleAuthToken(adminSdkCredentials)}`, "Content-Type": "application/json", }, body: JSON.stringify({ task }), method: "POST", }, ); return { res }; } } async function checkIfTaskExists( env: Env, queueName: QueueName, taskId: string, expectedBody: QueueBody[QueueName], ) { const adminSdkCredentials = getAdminSdkCredentials(env); const body = await fetch( `https://content-cloudtasks.googleapis.com/v2/projects/${adminSdkCredentials.projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks/${taskId}?responseView=FULL`, { headers: { Authorization: `Bearer ${await getGoogleAuthToken(adminSdkCredentials)}`, }, }, ) .then((res) => res.json()) .then(({ httpRequest: { body } }) => body); return isEqual( JSON.parse(Buffer.from(body as string, "base64").toString()), expectedBody, ); } function buildTask( projectId: string, queueName: QueueName, scheduleConfig: ScheduleConfig | undefined, domain: string, body: QueueBody[QueueName], headers: Record | undefined, ) { let scheduleTime: string | undefined; if (scheduleConfig) { const { delay, epochTime } = scheduleConfig; if (epochTime) { scheduleTime = DateTime.fromSeconds(epochTime).toUTC().toISO(); } else if (delay) { scheduleTime = DateTime.now().plus(delay).toUTC().toISO(); } } let taskId: string; switch (queueName) { case "new-episode": const { aniListId } = body as QueueBody["new-episode"]; taskId = buildNewEpisodeTaskId(aniListId); return { name: `projects/${projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks/${taskId}`, scheduleTime, httpRequest: { url: `${domain}/internal/new-episode`, httpMethod: "POST", body: Buffer.from(JSON.stringify(body)).toString("base64"), headers: { "Content-Type": "application/json", "X-Anilist-Token": headers?.["X-Anilist-Token"], }, }, }; case "anilist-updates": const { deviceId, titleId } = body as QueueBody[typeof queueName]; taskId = buildAnilistRetryTaskId(deviceId, titleId); return { name: `projects/${projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks/${taskId}`, scheduleTime, httpRequest: { url: `${domain}/watch-status`, httpMethod: "POST", body: JSON.stringify(body), headers: { "Content-Type": "application/json", "X-Anilist-Token": headers?.["X-Anilist-Token"], }, }, }; default: throw new Error(`Unknown queue name: ${queueName}`); } }