feat: update delayed task processing to a shared 9-hour maximum delay, remove invalid KV entries

This commit is contained in:
2025-12-17 06:35:57 -05:00
parent 05df043fbe
commit 6eb42f6a33
2 changed files with 13 additions and 14 deletions

View File

@@ -2,15 +2,11 @@ import { DateTime } from "luxon";
import type { DelayedTaskMetadata } from "./delayedTask"; import type { DelayedTaskMetadata } from "./delayedTask";
import { deserializeDelayedTask } from "./delayedTask"; import { deserializeDelayedTask } from "./delayedTask";
import { queueTask } from "./queueTask"; import { MAX_DELAY_SECONDS, queueTask } from "./queueTask";
const MAX_DELAY_SECONDS = 12 * 60 * 60; // 43,200 seconds (12 hours)
const RETRY_ALERT_THRESHOLD = 3; const RETRY_ALERT_THRESHOLD = 3;
export async function processDelayedTasks( export async function processDelayedTasks(env: Cloudflare.Env): Promise<void> {
env: Cloudflare.Env,
ctx: ExecutionContext,
): Promise<void> {
console.log("Starting delayed task processing cron job"); console.log("Starting delayed task processing cron job");
const kvNamespace = env.DELAYED_TASKS; const kvNamespace = env.DELAYED_TASKS;
@@ -31,7 +27,7 @@ export async function processDelayedTasks(
console.log(`Found ${keys.length} delayed tasks to check`); console.log(`Found ${keys.length} delayed tasks to check`);
const currentTime = Math.floor(Date.now() / 1000); const currentTime = Math.floor(Date.now() / 1000);
const twelveHoursFromNow = currentTime + MAX_DELAY_SECONDS; const maxQueueTime = currentTime + MAX_DELAY_SECONDS;
let processedCount = 0; let processedCount = 0;
let queuedCount = 0; let queuedCount = 0;
@@ -40,16 +36,17 @@ export async function processDelayedTasks(
for (const key of keys) { for (const key of keys) {
try { try {
const value = await kvNamespace.get(key.name); const value = await kvNamespace.get(key.name);
if (!value) { if (!value || value == "null") {
console.warn(`Task key ${key.name} has no value, skipping`); console.warn(`Task key ${key.name} has no value, removing`);
await kvNamespace.delete(key.name);
continue; continue;
} }
const metadata: DelayedTaskMetadata = deserializeDelayedTask(value); const metadata: DelayedTaskMetadata = deserializeDelayedTask(value);
processedCount++; processedCount++;
// Check if task is ready to be queued (within 12 hours of scheduled time) // Check if task is ready to be queued (within 9 hours of scheduled time)
if (metadata.scheduledEpochTime <= twelveHoursFromNow) { if (metadata.scheduledEpochTime <= maxQueueTime) {
const remainingDelay = Math.max( const remainingDelay = Math.max(
0, 0,
metadata.scheduledEpochTime - currentTime, metadata.scheduledEpochTime - currentTime,
@@ -100,7 +97,7 @@ export async function processDelayedTasks(
} }
} else { } else {
const hoursUntilReady = const hoursUntilReady =
(metadata.scheduledEpochTime - twelveHoursFromNow) / 3600; (metadata.scheduledEpochTime - maxQueueTime) / 3600;
console.log( console.log(
`Task ${metadata.taskId} not ready yet (${hoursUntilReady.toFixed(1)} hours until queueable)`, `Task ${metadata.taskId} not ready yet (${hoursUntilReady.toFixed(1)} hours until queueable)`,
); );

View File

@@ -30,6 +30,10 @@ interface QueueTaskOptionalArgs {
env?: Cloudflare.Env; env?: Cloudflare.Env;
} }
export const MAX_DELAY_SECONDS = Duration.fromObject({ hours: 9 }).as(
"seconds",
);
export async function queueTask( export async function queueTask(
queueName: QueueName, queueName: QueueName,
body: QueueBody[QueueName], body: QueueBody[QueueName],
@@ -42,8 +46,6 @@ export async function queueTask(
req?.header(), req?.header(),
); );
const MAX_DELAY_SECONDS = Duration.fromObject({ hours: 9 }).as("seconds");
// If delay exceeds 9 hours, store in KV for later processing // If delay exceeds 9 hours, store in KV for later processing
if (scheduleTime > MAX_DELAY_SECONDS) { if (scheduleTime > MAX_DELAY_SECONDS) {
if (!env || !env.DELAYED_TASKS) { if (!env || !env.DELAYED_TASKS) {