diff --git a/src/libs/tasks/processDelayedTasks.ts b/src/libs/tasks/processDelayedTasks.ts index c68d859..a928928 100644 --- a/src/libs/tasks/processDelayedTasks.ts +++ b/src/libs/tasks/processDelayedTasks.ts @@ -2,15 +2,11 @@ import { DateTime } from "luxon"; import type { DelayedTaskMetadata } from "./delayedTask"; import { deserializeDelayedTask } from "./delayedTask"; -import { queueTask } from "./queueTask"; +import { MAX_DELAY_SECONDS, queueTask } from "./queueTask"; -const MAX_DELAY_SECONDS = 12 * 60 * 60; // 43,200 seconds (12 hours) const RETRY_ALERT_THRESHOLD = 3; -export async function processDelayedTasks( - env: Cloudflare.Env, - ctx: ExecutionContext, -): Promise { +export async function processDelayedTasks(env: Cloudflare.Env): Promise { console.log("Starting delayed task processing cron job"); const kvNamespace = env.DELAYED_TASKS; @@ -31,7 +27,7 @@ export async function processDelayedTasks( console.log(`Found ${keys.length} delayed tasks to check`); const currentTime = Math.floor(Date.now() / 1000); - const twelveHoursFromNow = currentTime + MAX_DELAY_SECONDS; + const maxQueueTime = currentTime + MAX_DELAY_SECONDS; let processedCount = 0; let queuedCount = 0; @@ -40,16 +36,17 @@ export async function processDelayedTasks( for (const key of keys) { try { const value = await kvNamespace.get(key.name); - if (!value) { - console.warn(`Task key ${key.name} has no value, skipping`); + if (!value || value == "null") { + console.warn(`Task key ${key.name} has no value, removing`); + await kvNamespace.delete(key.name); continue; } const metadata: DelayedTaskMetadata = deserializeDelayedTask(value); processedCount++; - // Check if task is ready to be queued (within 12 hours of scheduled time) - if (metadata.scheduledEpochTime <= twelveHoursFromNow) { + // Check if task is ready to be queued (within 9 hours of scheduled time) + if (metadata.scheduledEpochTime <= maxQueueTime) { const remainingDelay = Math.max( 0, metadata.scheduledEpochTime - currentTime, @@ -100,7 +97,7 @@ export async function processDelayedTasks( } } else { const hoursUntilReady = - (metadata.scheduledEpochTime - twelveHoursFromNow) / 3600; + (metadata.scheduledEpochTime - maxQueueTime) / 3600; console.log( `Task ${metadata.taskId} not ready yet (${hoursUntilReady.toFixed(1)} hours until queueable)`, ); diff --git a/src/libs/tasks/queueTask.ts b/src/libs/tasks/queueTask.ts index 83f4ec1..80c675e 100644 --- a/src/libs/tasks/queueTask.ts +++ b/src/libs/tasks/queueTask.ts @@ -30,6 +30,10 @@ interface QueueTaskOptionalArgs { env?: Cloudflare.Env; } +export const MAX_DELAY_SECONDS = Duration.fromObject({ hours: 9 }).as( + "seconds", +); + export async function queueTask( queueName: QueueName, body: QueueBody[QueueName], @@ -42,8 +46,6 @@ export async function queueTask( req?.header(), ); - const MAX_DELAY_SECONDS = Duration.fromObject({ hours: 9 }).as("seconds"); - // If delay exceeds 9 hours, store in KV for later processing if (scheduleTime > MAX_DELAY_SECONDS) { if (!env || !env.DELAYED_TASKS) {