import { env as cloudflareEnv } from "cloudflare:workers"; import type { HonoRequest } from "hono"; import { DateTime, Duration, type DurationLike } from "luxon"; import { AnilistUpdateType } from "~/libs/anilist/updateType.ts"; import type { WatchStatus } from "~/types/title/watchStatus"; import type { QueueName } from "./queueName"; export type QueueBody = { ANILIST_UPDATES: { [AnilistUpdateType.UpdateWatchStatus]: { titleId: number; watchStatus: WatchStatus | null; aniListToken: string; }; updateType: AnilistUpdateType; }; NEW_EPISODE: { aniListId: number; episodeNumber: number }; }; type ScheduleConfig = | { delay: DurationLike; epochTime?: never } | { epochTime: number; delay?: never }; interface QueueTaskOptionalArgs { scheduleConfig?: ScheduleConfig; /** when req is not provided, that means the task is being created locally */ req?: HonoRequest; env?: Cloudflare.Env; } export const MAX_DELAY_SECONDS = Duration.fromObject({ hours: 9 }).as( "seconds", ); export async function queueTask( queueName: QueueName, body: QueueBody[QueueName], { scheduleConfig, req, env = cloudflareEnv }: QueueTaskOptionalArgs = {}, ) { const { scheduleTime, headers } = buildTask( queueName, scheduleConfig, body, req?.header(), ); // If delay exceeds 9 hours, store in KV for later processing if (scheduleTime > MAX_DELAY_SECONDS) { if (!env || !env.DELAYED_TASKS) { throw new Error("DELAYED_TASKS KV namespace not available"); } const { generateTaskKey, serializeDelayedTask } = await import("./delayedTask"); const taskId = crypto.randomUUID(); const scheduledEpochTime = Math.floor(Date.now() / 1000) + scheduleTime; const metadata = { queueName, body, headers, scheduledEpochTime, taskId, createdAt: Math.floor(Date.now() / 1000), retryCount: 0, }; const key = generateTaskKey(scheduledEpochTime, taskId); await env.DELAYED_TASKS.put(key, serializeDelayedTask(metadata)); console.log( `Task stored in KV for delayed execution. Scheduled for: ${new Date(scheduledEpochTime * 1000).toISOString()}, Key: ${key}`, ); return; } // Otherwise, queue directly const contentType = headers["Content-Type"] === "application/json" ? "json" : "text"; if (!env) { const Cloudflare = await import("cloudflare").then( ({ Cloudflare }) => Cloudflare, ); const client = new Cloudflare({ apiToken: env.CLOUDFLARE_TOKEN }); let queueId: string | null = null; const queues = await client.queues.list({ account_id: env.CLOUDFLARE_ACCOUNT_ID, }); for await (const queue of queues) { if (queueId == queue.queue_name) { queueId = queue.queue_id!; } } if (!queueId) { throw new Error(`Queue ${queueName} not found`); } await client.queues.messages.push(queueId, { body: { body, headers }, content_type: contentType, delay_seconds: scheduleTime, account_id: env.CLOUDFLARE_ACCOUNT_ID, }); } else { await env[queueName].send( { body, headers }, { contentType, delaySeconds: scheduleTime, }, ); } } function buildTask( queueName: QueueName, scheduleConfig: ScheduleConfig | undefined, body: QueueBody[QueueName], headers: Record | undefined, ) { let scheduleTime: number = 0; if (scheduleConfig) { const { delay, epochTime } = scheduleConfig; if (epochTime) { console.log("epochTime", epochTime); scheduleTime = DateTime.fromSeconds(epochTime) .diffNow("second") .as("second"); } else if (delay) { console.log("delay", delay); scheduleTime = Duration.fromDurationLike(delay).as("second"); } } const authorizationHeader = headers?.["X-Anilist-Token"] ? { Authorization: `Bearer ${headers["X-Anilist-Token"]}` } : {}; switch (queueName) { case "ANILIST_UPDATES": case "NEW_EPISODE": return { body, scheduleTime, headers: { ...authorizationHeader, "Content-Type": "application/json", }, }; default: throw new Error(`Unknown queue name: ${queueName}`); } }