Cloudflare doesn't log causes, only the messages so splitting the logs in to 2
201 lines
5.7 KiB
TypeScript
201 lines
5.7 KiB
TypeScript
import type { HonoRequest } from "hono";
|
|
import isEqual from "lodash.isequal";
|
|
import { DateTime, type DurationLike } from "luxon";
|
|
|
|
import type { Env } from "~/types/env";
|
|
import type { WatchStatus } from "~/types/title/watchStatus";
|
|
|
|
import { FailedToQueueTaskError } from "../errors/FailedToQueueTask";
|
|
import { getAdminSdkCredentials } from "../gcloud/getAdminSdkCredentials";
|
|
import { getGoogleAuthToken } from "../gcloud/getGoogleAuthToken";
|
|
import { getCurrentDomain } from "../getCurrentDomain";
|
|
import { buildAnilistRetryTaskId, buildNewEpisodeTaskId } from "./id";
|
|
import type { QueueName } from "./queueName";
|
|
|
|
type QueueBody = {
|
|
"anilist-updates": {
|
|
deviceId: string;
|
|
watchStatus: WatchStatus | null;
|
|
titleId: number;
|
|
isRetrying: true;
|
|
};
|
|
"new-episode": { aniListId: number; episodeNumber: number };
|
|
};
|
|
|
|
type ScheduleConfig =
|
|
| { delay: DurationLike; epochTime: never }
|
|
| { epochTime: number; delay: never };
|
|
|
|
interface QueueTaskOptionalArgs {
|
|
scheduleConfig?: ScheduleConfig;
|
|
/** when req is not provided, that means the task is being created locally */
|
|
req?: HonoRequest;
|
|
}
|
|
|
|
export async function queueTask(
|
|
env: Env,
|
|
queueName: QueueName,
|
|
body: QueueBody[QueueName],
|
|
{ scheduleConfig, req }: QueueTaskOptionalArgs = {},
|
|
) {
|
|
const domain = req
|
|
? getCurrentDomain(req)
|
|
: "https://aniplay-v2.rururu.workers.dev";
|
|
if (!domain) {
|
|
console.log("Skipping queue task due to local domain", queueName, body);
|
|
return;
|
|
}
|
|
|
|
const adminSdkCredentials = getAdminSdkCredentials(env);
|
|
const { projectId } = adminSdkCredentials;
|
|
|
|
const task = buildTask(
|
|
projectId,
|
|
queueName,
|
|
scheduleConfig,
|
|
domain,
|
|
body,
|
|
req?.header(),
|
|
);
|
|
const { res } = await queueCloudTask(task);
|
|
|
|
if (!res.ok) {
|
|
if (res.status === 409) {
|
|
if (
|
|
await checkIfTaskExists(
|
|
env,
|
|
queueName,
|
|
task.name.split("/").at(-1)!,
|
|
body,
|
|
)
|
|
) {
|
|
return;
|
|
} else {
|
|
const hashedTaskName = await import("node:crypto").then(
|
|
({ createHash }) =>
|
|
createHash("sha256")
|
|
.update(task.name.split("/").at(-1)!)
|
|
.digest("hex"),
|
|
);
|
|
console.log("task name", hashedTaskName);
|
|
const { res } = await queueCloudTask({
|
|
...task,
|
|
name:
|
|
task.name.split("/").slice(0, -1).join("/") + "/" + hashedTaskName,
|
|
});
|
|
if (!res.ok) {
|
|
if (await checkIfTaskExists(env, queueName, hashedTaskName, body)) {
|
|
return;
|
|
}
|
|
|
|
throw new FailedToQueueTaskError(res.status, await res.text());
|
|
}
|
|
}
|
|
}
|
|
|
|
throw new FailedToQueueTaskError(res.status, await res.text());
|
|
}
|
|
|
|
async function queueCloudTask(task: object) {
|
|
const res = await fetch(
|
|
`https://content-cloudtasks.googleapis.com/v2/projects/${projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks?alt=json`,
|
|
{
|
|
headers: {
|
|
Authorization: `Bearer ${await getGoogleAuthToken(adminSdkCredentials)}`,
|
|
"Content-Type": "application/json",
|
|
},
|
|
body: JSON.stringify({ task }),
|
|
method: "POST",
|
|
},
|
|
);
|
|
return { res };
|
|
}
|
|
}
|
|
|
|
async function checkIfTaskExists(
|
|
env: Env,
|
|
queueName: QueueName,
|
|
taskId: string,
|
|
expectedBody: QueueBody[QueueName],
|
|
) {
|
|
const adminSdkCredentials = getAdminSdkCredentials(env);
|
|
|
|
const body = await fetch(
|
|
`https://content-cloudtasks.googleapis.com/v2/projects/${adminSdkCredentials.projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks/${taskId}?responseView=FULL`,
|
|
{
|
|
headers: {
|
|
Authorization: `Bearer ${await getGoogleAuthToken(adminSdkCredentials)}`,
|
|
},
|
|
},
|
|
)
|
|
.then((res) => res.json())
|
|
.then(({ httpRequest }) => httpRequest?.body);
|
|
|
|
return (
|
|
body &&
|
|
isEqual(
|
|
JSON.parse(Buffer.from(body as string, "base64").toString()),
|
|
expectedBody,
|
|
)
|
|
);
|
|
}
|
|
|
|
function buildTask(
|
|
projectId: string,
|
|
queueName: QueueName,
|
|
scheduleConfig: ScheduleConfig | undefined,
|
|
domain: string,
|
|
body: QueueBody[QueueName],
|
|
headers: Record<string, string> | undefined,
|
|
) {
|
|
let scheduleTime: string | undefined;
|
|
if (scheduleConfig) {
|
|
const { delay, epochTime } = scheduleConfig;
|
|
if (epochTime) {
|
|
scheduleTime = DateTime.fromSeconds(epochTime).toUTC().toISO();
|
|
} else if (delay) {
|
|
scheduleTime = DateTime.now().plus(delay).toUTC().toISO();
|
|
}
|
|
}
|
|
let taskId: string;
|
|
|
|
switch (queueName) {
|
|
case "new-episode":
|
|
const { aniListId } = body as QueueBody["new-episode"];
|
|
taskId = buildNewEpisodeTaskId(aniListId);
|
|
|
|
return {
|
|
name: `projects/${projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks/${taskId}`,
|
|
scheduleTime,
|
|
httpRequest: {
|
|
url: `${domain}/internal/new-episode`,
|
|
httpMethod: "POST",
|
|
body: Buffer.from(JSON.stringify(body)).toString("base64"),
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
"X-Anilist-Token": headers?.["X-Anilist-Token"],
|
|
},
|
|
},
|
|
};
|
|
case "anilist-updates":
|
|
const { deviceId, titleId } = body as QueueBody[typeof queueName];
|
|
taskId = buildAnilistRetryTaskId(deviceId, titleId);
|
|
|
|
return {
|
|
name: `projects/${projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks/${taskId}`,
|
|
scheduleTime,
|
|
httpRequest: {
|
|
url: `${domain}/watch-status`,
|
|
httpMethod: "POST",
|
|
body: JSON.stringify(body),
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
"X-Anilist-Token": headers?.["X-Anilist-Token"],
|
|
},
|
|
},
|
|
};
|
|
default:
|
|
throw new Error(`Unknown queue name: ${queueName}`);
|
|
}
|
|
}
|