feat: migrate to cloudflare d1 and queues

This commit is contained in:
2025-11-28 16:32:35 +08:00
parent 00e1f82d85
commit bd958fb1ab
19 changed files with 294 additions and 276 deletions

View File

@@ -1 +1,5 @@
export type QueueName = "anilist-updates" | "new-episode";
type QueueKeys<T> = {
[K in keyof T]: T[K] extends Queue ? K : never;
}[keyof T];
export type QueueName = QueueKeys<Cloudflare.Env>;

View File

@@ -1,26 +1,20 @@
import { env as cloudflareEnv } from "cloudflare:workers";
import type { HonoRequest } from "hono";
import isEqual from "lodash.isequal";
import { DateTime, type DurationLike } from "luxon";
import { DateTime, Duration, type DurationLike } from "luxon";
import { AnilistUpdateType } from "~/libs/anilist/updateType.ts";
import type { WatchStatus } from "~/types/title/watchStatus";
import { FailedToQueueTaskError } from "../errors/FailedToQueueTask";
import { getAdminSdkCredentials } from "../gcloud/getAdminSdkCredentials";
import { getGoogleAuthToken } from "../gcloud/getGoogleAuthToken";
import { getCurrentDomain } from "../getCurrentDomain";
import { buildAnilistRetryTaskId, buildNewEpisodeTaskId } from "./id";
import type { QueueName } from "./queueName";
type QueueBody = {
"anilist-updates": {
export type QueueBody = {
ANILIST_UPDATES: {
deviceId: string;
watchStatus: WatchStatus | null;
titleId: number;
isRetrying: true;
nameSuffix: string;
updateType: AnilistUpdateType;
};
"new-episode": { aniListId: number; episodeNumber: number };
NEW_EPISODE: { aniListId: number; episodeNumber: number };
};
type ScheduleConfig =
@@ -39,163 +33,75 @@ export async function queueTask(
body: QueueBody[QueueName],
{ scheduleConfig, req, env = cloudflareEnv }: QueueTaskOptionalArgs = {},
) {
const domain = req
? getCurrentDomain(req)
: "https://aniplay-v2.rururu.workers.dev";
if (!domain) {
console.log("Skipping queue task due to local domain", queueName, body);
return;
}
const adminSdkCredentials = getAdminSdkCredentials(env);
const { projectId } = adminSdkCredentials;
const task = buildTask(
projectId,
const { scheduleTime, headers } = buildTask(
queueName,
scheduleConfig,
domain,
body,
req?.header(),
);
const { res } = await queueCloudTask(task);
if (!res.ok) {
if (res.status === 409) {
if (
await checkIfTaskExists(
env,
queueName,
task.name.split("/").at(-1)!,
body,
)
) {
return;
} else {
const hashedTaskName = await import("node:crypto").then(
({ createHash }) =>
createHash("sha256")
.update(task.name.split("/").at(-1)!)
.digest("hex"),
);
console.log("task name", hashedTaskName);
const { res } = await queueCloudTask({
...task,
name:
task.name.split("/").slice(0, -1).join("/") + "/" + hashedTaskName,
});
if (!res.ok) {
if (await checkIfTaskExists(env, queueName, hashedTaskName, body)) {
return;
}
throw new FailedToQueueTaskError(res.status, await res.text());
}
const contentType =
headers["Content-Type"] === "application/json" ? "json" : "text";
if (!env) {
const Cloudflare = await import("cloudflare").then(
({ Cloudflare }) => Cloudflare,
);
const client = new Cloudflare({ apiToken: env.CLOUDFLARE_TOKEN });
let queueId: string | null = null;
const queues = await client.queues.list({
account_id: env.CLOUDFLARE_ACCOUNT_ID,
});
for await (const queue of queues) {
if (queueId == queue.queue_name) {
queueId = queue.queue_id!;
}
}
if (!queueId) {
throw new Error(`Queue ${queueName} not found`);
}
throw new FailedToQueueTaskError(res.status, await res.text());
}
async function queueCloudTask(task: object) {
const res = await fetch(
`https://content-cloudtasks.googleapis.com/v2/projects/${projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks?alt=json`,
await client.queues.messages.push(queueId, {
body: { body, headers },
content_type: contentType,
delay_seconds: scheduleTime,
account_id: env.CLOUDFLARE_ACCOUNT_ID,
});
} else {
await env[queueName].send(
{ body, headers },
{
headers: {
Authorization: `Bearer ${await getGoogleAuthToken(adminSdkCredentials)}`,
"Content-Type": "application/json",
},
body: JSON.stringify({ task }),
method: "POST",
contentType,
delaySeconds: scheduleTime,
},
);
return { res };
}
}
async function checkIfTaskExists(
env: Cloudflare.Env,
queueName: QueueName,
taskId: string,
expectedBody: QueueBody[QueueName],
) {
const adminSdkCredentials = getAdminSdkCredentials(env);
const body = await fetch(
`https://content-cloudtasks.googleapis.com/v2/projects/${adminSdkCredentials.projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks/${taskId}?responseView=FULL`,
{
headers: {
Authorization: `Bearer ${await getGoogleAuthToken(adminSdkCredentials)}`,
},
},
)
.then((res) => res.json())
.then(({ httpRequest }) => httpRequest?.body);
return (
body &&
isEqual(
JSON.parse(Buffer.from(body as string, "base64").toString()),
expectedBody,
)
);
}
function buildTask(
projectId: string,
queueName: QueueName,
scheduleConfig: ScheduleConfig | undefined,
domain: string,
body: QueueBody[QueueName],
headers: Record<string, string> | undefined,
) {
let scheduleTime: string | undefined;
let scheduleTime: number = 0;
if (scheduleConfig) {
const { delay, epochTime } = scheduleConfig;
if (epochTime) {
scheduleTime = DateTime.fromSeconds(epochTime).toUTC().toISO();
scheduleTime = DateTime.fromSeconds(epochTime)
.diffNow("second")
.as("second");
} else if (delay) {
scheduleTime = DateTime.now().plus(delay).toUTC().toISO();
scheduleTime = Duration.fromDurationLike(delay).as("second");
}
}
let taskId: string;
switch (queueName) {
case "new-episode":
const { aniListId } = body as QueueBody["new-episode"];
taskId = buildNewEpisodeTaskId(aniListId);
case "ANILIST_UPDATES":
case "NEW_EPISODE":
return {
name: `projects/${projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks/${taskId}`,
body,
scheduleTime,
httpRequest: {
url: `${domain}/internal/new-episode`,
httpMethod: "POST",
body: Buffer.from(JSON.stringify(body)).toString("base64"),
headers: {
"Content-Type": "application/json",
"X-Anilist-Token": headers?.["X-Anilist-Token"],
},
},
};
case "anilist-updates":
const { deviceId, titleId, nameSuffix } =
body as QueueBody[typeof queueName];
taskId = buildAnilistRetryTaskId(deviceId, titleId, nameSuffix);
return {
name: `projects/${projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks/${taskId}`,
scheduleTime,
httpRequest: {
url: `${domain}/watch-status`,
httpMethod: "POST",
body: Buffer.from(
JSON.stringify({ ...body, nameSuffix: undefined }),
).toString("base64"),
headers: {
"Content-Type": "application/json",
"X-Anilist-Token": headers?.["X-Anilist-Token"],
},
headers: {
"Content-Type": "application/json",
"X-Anilist-Token": headers?.["X-Anilist-Token"],
},
};
default: