Files
aniplay-api/src/libs/tasks/queueTask.ts

151 lines
4.1 KiB
TypeScript

import { env as cloudflareEnv } from "cloudflare:workers";
import type { HonoRequest } from "hono";
import { DateTime, Duration, type DurationLike } from "luxon";
import { AnilistUpdateType } from "~/libs/anilist/updateType.ts";
import type { WatchStatus } from "~/types/title/watchStatus";
import type { QueueName } from "./queueName";
export type QueueBody = {
ANILIST_UPDATES: {
[AnilistUpdateType.UpdateWatchStatus]: {
titleId: number;
watchStatus: WatchStatus | null;
aniListToken: string;
};
updateType: AnilistUpdateType;
};
NEW_EPISODE: { aniListId: number; episodeNumber: number };
};
type ScheduleConfig =
| { delay: DurationLike; epochTime?: never }
| { epochTime: number; delay?: never };
interface QueueTaskOptionalArgs {
scheduleConfig?: ScheduleConfig;
/** when req is not provided, that means the task is being created locally */
req?: HonoRequest;
env?: Cloudflare.Env;
}
export const MAX_DELAY_SECONDS = Duration.fromObject({ hours: 9 }).as(
"seconds",
);
export async function queueTask(
queueName: QueueName,
body: QueueBody[QueueName],
{ scheduleConfig, req, env = cloudflareEnv }: QueueTaskOptionalArgs = {},
) {
const { scheduleTime, headers } = buildTask(
queueName,
scheduleConfig,
body,
req?.header(),
);
// If delay exceeds 9 hours, store in KV for later processing
if (scheduleTime > MAX_DELAY_SECONDS) {
if (!env || !env.DELAYED_TASKS) {
throw new Error("DELAYED_TASKS KV namespace not available");
}
const { generateTaskKey, serializeDelayedTask } =
await import("./delayedTask");
const taskId = crypto.randomUUID();
const scheduledEpochTime = Math.floor(Date.now() / 1000) + scheduleTime;
const metadata = {
queueName,
body,
headers,
scheduledEpochTime,
taskId,
createdAt: Math.floor(Date.now() / 1000),
retryCount: 0,
};
const key = generateTaskKey(scheduledEpochTime, taskId);
await env.DELAYED_TASKS.put(key, serializeDelayedTask(metadata));
console.log(
`Task stored in KV for delayed execution. Scheduled for: ${new Date(scheduledEpochTime * 1000).toISOString()}, Key: ${key}`,
);
return;
}
// Otherwise, queue directly
const contentType =
headers["Content-Type"] === "application/json" ? "json" : "text";
if (!env) {
const Cloudflare = await import("cloudflare").then(
({ Cloudflare }) => Cloudflare,
);
const client = new Cloudflare({ apiToken: env.CLOUDFLARE_TOKEN });
let queueId: string | null = null;
const queues = await client.queues.list({
account_id: env.CLOUDFLARE_ACCOUNT_ID,
});
for await (const queue of queues) {
if (queueId == queue.queue_name) {
queueId = queue.queue_id!;
}
}
if (!queueId) {
throw new Error(`Queue ${queueName} not found`);
}
await client.queues.messages.push(queueId, {
body: { body, headers },
content_type: contentType,
delay_seconds: scheduleTime,
account_id: env.CLOUDFLARE_ACCOUNT_ID,
});
} else {
await env[queueName].send(
{ body, headers },
{
contentType,
delaySeconds: scheduleTime,
},
);
}
}
function buildTask(
queueName: QueueName,
scheduleConfig: ScheduleConfig | undefined,
body: QueueBody[QueueName],
headers: Record<string, string> | undefined,
) {
let scheduleTime: number = 0;
if (scheduleConfig) {
const { delay, epochTime } = scheduleConfig;
if (epochTime) {
console.log("epochTime", epochTime);
scheduleTime = DateTime.fromSeconds(epochTime)
.diffNow("second")
.as("second");
} else if (delay) {
console.log("delay", delay);
scheduleTime = Duration.fromDurationLike(delay).as("second");
}
}
switch (queueName) {
case "ANILIST_UPDATES":
case "NEW_EPISODE":
return {
body,
scheduleTime,
headers: {
"Content-Type": "application/json",
"X-Anilist-Token": headers?.["X-Anilist-Token"],
},
};
default:
throw new Error(`Unknown queue name: ${queueName}`);
}
}