149 lines
4.1 KiB
TypeScript
149 lines
4.1 KiB
TypeScript
import { env as cloudflareEnv } from "cloudflare:workers";
|
|
import type { HonoRequest } from "hono";
|
|
import { DateTime, Duration, type DurationLike } from "luxon";
|
|
|
|
import { AnilistUpdateType } from "~/libs/anilist/updateType.ts";
|
|
import type { WatchStatus } from "~/types/title/watchStatus";
|
|
|
|
import type { QueueName } from "./queueName";
|
|
|
|
export type QueueBody = {
|
|
ANILIST_UPDATES: {
|
|
[AnilistUpdateType.UpdateWatchStatus]: {
|
|
titleId: number;
|
|
watchStatus: WatchStatus | null;
|
|
aniListToken: string;
|
|
};
|
|
updateType: AnilistUpdateType;
|
|
};
|
|
NEW_EPISODE: { aniListId: number; episodeNumber: number };
|
|
};
|
|
|
|
type ScheduleConfig =
|
|
| { delay: DurationLike; epochTime?: never }
|
|
| { epochTime: number; delay?: never };
|
|
|
|
interface QueueTaskOptionalArgs {
|
|
scheduleConfig?: ScheduleConfig;
|
|
/** when req is not provided, that means the task is being created locally */
|
|
req?: HonoRequest;
|
|
env?: Cloudflare.Env;
|
|
}
|
|
|
|
export async function queueTask(
|
|
queueName: QueueName,
|
|
body: QueueBody[QueueName],
|
|
{ scheduleConfig, req, env = cloudflareEnv }: QueueTaskOptionalArgs = {},
|
|
) {
|
|
const { scheduleTime, headers } = buildTask(
|
|
queueName,
|
|
scheduleConfig,
|
|
body,
|
|
req?.header(),
|
|
);
|
|
|
|
const MAX_DELAY_SECONDS = Duration.fromObject({ hours: 9 }).as("seconds");
|
|
|
|
// If delay exceeds 9 hours, store in KV for later processing
|
|
if (scheduleTime > MAX_DELAY_SECONDS) {
|
|
if (!env || !env.DELAYED_TASKS) {
|
|
throw new Error("DELAYED_TASKS KV namespace not available");
|
|
}
|
|
|
|
const { generateTaskKey, serializeDelayedTask } =
|
|
await import("./delayedTask");
|
|
const taskId = crypto.randomUUID();
|
|
const scheduledEpochTime = Math.floor(Date.now() / 1000) + scheduleTime;
|
|
|
|
const metadata = {
|
|
queueName,
|
|
body,
|
|
headers,
|
|
scheduledEpochTime,
|
|
taskId,
|
|
createdAt: Math.floor(Date.now() / 1000),
|
|
retryCount: 0,
|
|
};
|
|
|
|
const key = generateTaskKey(scheduledEpochTime, taskId);
|
|
await env.DELAYED_TASKS.put(key, serializeDelayedTask(metadata));
|
|
|
|
console.log(
|
|
`Task stored in KV for delayed execution. Scheduled for: ${new Date(scheduledEpochTime * 1000).toISOString()}, Key: ${key}`,
|
|
);
|
|
return;
|
|
}
|
|
|
|
// Otherwise, queue directly
|
|
const contentType =
|
|
headers["Content-Type"] === "application/json" ? "json" : "text";
|
|
if (!env) {
|
|
const Cloudflare = await import("cloudflare").then(
|
|
({ Cloudflare }) => Cloudflare,
|
|
);
|
|
const client = new Cloudflare({ apiToken: env.CLOUDFLARE_TOKEN });
|
|
let queueId: string | null = null;
|
|
const queues = await client.queues.list({
|
|
account_id: env.CLOUDFLARE_ACCOUNT_ID,
|
|
});
|
|
for await (const queue of queues) {
|
|
if (queueId == queue.queue_name) {
|
|
queueId = queue.queue_id!;
|
|
}
|
|
}
|
|
if (!queueId) {
|
|
throw new Error(`Queue ${queueName} not found`);
|
|
}
|
|
|
|
await client.queues.messages.push(queueId, {
|
|
body: { body, headers },
|
|
content_type: contentType,
|
|
delay_seconds: scheduleTime,
|
|
account_id: env.CLOUDFLARE_ACCOUNT_ID,
|
|
});
|
|
} else {
|
|
await env[queueName].send(
|
|
{ body, headers },
|
|
{
|
|
contentType,
|
|
delaySeconds: scheduleTime,
|
|
},
|
|
);
|
|
}
|
|
}
|
|
function buildTask(
|
|
queueName: QueueName,
|
|
scheduleConfig: ScheduleConfig | undefined,
|
|
body: QueueBody[QueueName],
|
|
headers: Record<string, string> | undefined,
|
|
) {
|
|
let scheduleTime: number = 0;
|
|
if (scheduleConfig) {
|
|
const { delay, epochTime } = scheduleConfig;
|
|
if (epochTime) {
|
|
console.log("epochTime", epochTime);
|
|
scheduleTime = DateTime.fromSeconds(epochTime)
|
|
.diffNow("second")
|
|
.as("second");
|
|
} else if (delay) {
|
|
console.log("delay", delay);
|
|
scheduleTime = Duration.fromDurationLike(delay).as("second");
|
|
}
|
|
}
|
|
|
|
switch (queueName) {
|
|
case "ANILIST_UPDATES":
|
|
case "NEW_EPISODE":
|
|
return {
|
|
body,
|
|
scheduleTime,
|
|
headers: {
|
|
"Content-Type": "application/json",
|
|
"X-Anilist-Token": headers?.["X-Anilist-Token"],
|
|
},
|
|
};
|
|
default:
|
|
throw new Error(`Unknown queue name: ${queueName}`);
|
|
}
|
|
}
|