feat(tasks): implement long-term delayed tasks with KV and Cron
This commit is contained in:
@@ -88,6 +88,12 @@ export default {
|
||||
break;
|
||||
}
|
||||
},
|
||||
async scheduled(event, env, ctx) {
|
||||
const { processDelayedTasks } = await import(
|
||||
"~/libs/tasks/processDelayedTasks"
|
||||
);
|
||||
await processDelayedTasks(env, ctx);
|
||||
},
|
||||
} satisfies ExportedHandler<Env>;
|
||||
|
||||
export { AnilistDurableObject as AnilistDo } from "~/libs/anilist/anilist-do.ts";
|
||||
|
||||
123
src/libs/tasks/delayedTask.spec.ts
Normal file
123
src/libs/tasks/delayedTask.spec.ts
Normal file
@@ -0,0 +1,123 @@
|
||||
import { DateTime } from "luxon";
|
||||
|
||||
import { beforeEach, describe, expect, it, mock } from "bun:test";
|
||||
|
||||
import type { DelayedTaskMetadata } from "./delayedTask";
|
||||
import {
|
||||
deserializeDelayedTask,
|
||||
generateTaskKey,
|
||||
serializeDelayedTask,
|
||||
} from "./delayedTask";
|
||||
|
||||
describe("delayedTask", () => {
|
||||
describe("generateTaskKey", () => {
|
||||
it("generates key with correct format", () => {
|
||||
const scheduledTime = 1732896000;
|
||||
const taskId = "abc-123-def";
|
||||
|
||||
const key = generateTaskKey(scheduledTime, taskId);
|
||||
|
||||
expect(key).toBe("delayed-task:1732896000:abc-123-def");
|
||||
});
|
||||
|
||||
it("generates unique keys for different timestamps", () => {
|
||||
const taskId = "same-id";
|
||||
const key1 = generateTaskKey(1000, taskId);
|
||||
const key2 = generateTaskKey(2000, taskId);
|
||||
|
||||
expect(key1).not.toBe(key2);
|
||||
});
|
||||
|
||||
it("generates unique keys for different task IDs", () => {
|
||||
const timestamp = 1000;
|
||||
const key1 = generateTaskKey(timestamp, "id-1");
|
||||
const key2 = generateTaskKey(timestamp, "id-2");
|
||||
|
||||
expect(key1).not.toBe(key2);
|
||||
});
|
||||
});
|
||||
|
||||
describe("serializeDelayedTask & deserializeDelayedTask", () => {
|
||||
let testMetadata: DelayedTaskMetadata;
|
||||
|
||||
beforeEach(() => {
|
||||
testMetadata = {
|
||||
queueName: "NEW_EPISODE",
|
||||
body: { aniListId: 12345, episodeNumber: 1 },
|
||||
headers: {
|
||||
"Content-Type": "application/json",
|
||||
"X-Anilist-Token": "test-token",
|
||||
},
|
||||
scheduledEpochTime: 1732896000,
|
||||
taskId: "test-task-id",
|
||||
createdAt: 1732800000,
|
||||
retryCount: 0,
|
||||
};
|
||||
});
|
||||
|
||||
it("serializes metadata to JSON string", () => {
|
||||
const serialized = serializeDelayedTask(testMetadata);
|
||||
|
||||
expect(serialized).toBeTypeOf("string");
|
||||
expect(() => JSON.parse(serialized)).not.toThrow();
|
||||
});
|
||||
|
||||
it("deserializes JSON string back to metadata", () => {
|
||||
const serialized = serializeDelayedTask(testMetadata);
|
||||
const deserialized = deserializeDelayedTask(serialized);
|
||||
|
||||
expect(deserialized).toEqual(testMetadata);
|
||||
});
|
||||
|
||||
it("preserves all metadata fields", () => {
|
||||
const serialized = serializeDelayedTask(testMetadata);
|
||||
const deserialized = deserializeDelayedTask(serialized);
|
||||
|
||||
expect(deserialized.queueName).toBe("NEW_EPISODE");
|
||||
expect(deserialized.body).toEqual({ aniListId: 12345, episodeNumber: 1 });
|
||||
expect(deserialized.headers).toEqual({
|
||||
"Content-Type": "application/json",
|
||||
"X-Anilist-Token": "test-token",
|
||||
});
|
||||
expect(deserialized.scheduledEpochTime).toBe(1732896000);
|
||||
expect(deserialized.taskId).toBe("test-task-id");
|
||||
expect(deserialized.createdAt).toBe(1732800000);
|
||||
expect(deserialized.retryCount).toBe(0);
|
||||
});
|
||||
|
||||
it("handles metadata without retryCount", () => {
|
||||
delete testMetadata.retryCount;
|
||||
const serialized = serializeDelayedTask(testMetadata);
|
||||
const deserialized = deserializeDelayedTask(serialized);
|
||||
|
||||
expect(deserialized.retryCount).toBeUndefined();
|
||||
});
|
||||
|
||||
it("handles ANILIST_UPDATES queue body", () => {
|
||||
const anilistMetadata: DelayedTaskMetadata = {
|
||||
queueName: "ANILIST_UPDATES",
|
||||
body: {
|
||||
deviceId: "device-123",
|
||||
watchStatus: null,
|
||||
titleId: 456,
|
||||
updateType: 0,
|
||||
},
|
||||
headers: { "Content-Type": "application/json" },
|
||||
scheduledEpochTime: 1732896000,
|
||||
taskId: "test-id",
|
||||
createdAt: 1732800000,
|
||||
};
|
||||
|
||||
const serialized = serializeDelayedTask(anilistMetadata);
|
||||
const deserialized = deserializeDelayedTask(serialized);
|
||||
|
||||
expect(deserialized.queueName).toBe("ANILIST_UPDATES");
|
||||
expect(deserialized.body).toEqual({
|
||||
deviceId: "device-123",
|
||||
watchStatus: null,
|
||||
titleId: 456,
|
||||
updateType: 0,
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
27
src/libs/tasks/delayedTask.ts
Normal file
27
src/libs/tasks/delayedTask.ts
Normal file
@@ -0,0 +1,27 @@
|
||||
import type { QueueName } from "./queueName";
|
||||
import type { QueueBody } from "./queueTask";
|
||||
|
||||
export interface DelayedTaskMetadata {
|
||||
queueName: QueueName;
|
||||
body: QueueBody[QueueName];
|
||||
headers: Record<string, string | undefined>;
|
||||
scheduledEpochTime: number;
|
||||
taskId: string;
|
||||
createdAt: number;
|
||||
retryCount?: number;
|
||||
}
|
||||
|
||||
export function serializeDelayedTask(metadata: DelayedTaskMetadata): string {
|
||||
return JSON.stringify(metadata);
|
||||
}
|
||||
|
||||
export function deserializeDelayedTask(json: string): DelayedTaskMetadata {
|
||||
return JSON.parse(json);
|
||||
}
|
||||
|
||||
export function generateTaskKey(
|
||||
scheduledEpochTime: number,
|
||||
taskId: string,
|
||||
): string {
|
||||
return `delayed-task:${scheduledEpochTime}:${taskId}`;
|
||||
}
|
||||
240
src/libs/tasks/processDelayedTasks.spec.ts
Normal file
240
src/libs/tasks/processDelayedTasks.spec.ts
Normal file
@@ -0,0 +1,240 @@
|
||||
import { beforeEach, describe, expect, it, mock } from "bun:test";
|
||||
|
||||
import { processDelayedTasks } from "./processDelayedTasks";
|
||||
|
||||
describe("processDelayedTasks", () => {
|
||||
let mockEnv: Cloudflare.Env;
|
||||
let mockCtx: ExecutionContext;
|
||||
let kvGetSpy: ReturnType<typeof mock>;
|
||||
let kvDeleteSpy: ReturnType<typeof mock>;
|
||||
let kvPutSpy: ReturnType<typeof mock>;
|
||||
let queueSendSpy: ReturnType<typeof mock>;
|
||||
|
||||
beforeEach(() => {
|
||||
kvGetSpy = mock(() => Promise.resolve(null));
|
||||
kvDeleteSpy = mock(() => Promise.resolve());
|
||||
kvPutSpy = mock(() => Promise.resolve());
|
||||
queueSendSpy = mock(() => Promise.resolve());
|
||||
|
||||
mockEnv = {
|
||||
DELAYED_TASKS: {
|
||||
get: kvGetSpy,
|
||||
delete: kvDeleteSpy,
|
||||
put: kvPutSpy,
|
||||
list: mock(() => Promise.resolve({ keys: [], list_complete: true })),
|
||||
getWithMetadata: mock(() =>
|
||||
Promise.resolve({ value: null, metadata: null }),
|
||||
),
|
||||
} as any,
|
||||
NEW_EPISODE: {
|
||||
send: queueSendSpy,
|
||||
} as any,
|
||||
ANILIST_UPDATES: {
|
||||
send: mock(() => Promise.resolve()),
|
||||
} as any,
|
||||
} as any;
|
||||
|
||||
mockCtx = {
|
||||
waitUntil: mock(() => {}),
|
||||
passThroughOnException: mock(() => {}),
|
||||
} as any;
|
||||
});
|
||||
|
||||
it("handles empty KV namespace", async () => {
|
||||
await processDelayedTasks(mockEnv, mockCtx);
|
||||
|
||||
expect(kvDeleteSpy).not.toHaveBeenCalled();
|
||||
expect(queueSendSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("queues tasks within 12 hours of scheduled time", async () => {
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
const scheduledTime = now + 6 * 3600; // 6 hours from now
|
||||
|
||||
const taskMetadata = {
|
||||
queueName: "NEW_EPISODE",
|
||||
body: { aniListId: 123, episodeNumber: 1 },
|
||||
headers: { "Content-Type": "application/json" },
|
||||
scheduledEpochTime: scheduledTime,
|
||||
taskId: "task-1",
|
||||
createdAt: now - 18 * 3600,
|
||||
retryCount: 0,
|
||||
};
|
||||
|
||||
mockEnv.DELAYED_TASKS.list = mock(() =>
|
||||
Promise.resolve({
|
||||
keys: [{ name: `delayed-task:${scheduledTime}:task-1` }],
|
||||
list_complete: true,
|
||||
}),
|
||||
);
|
||||
|
||||
kvGetSpy.mockReturnValue(Promise.resolve(JSON.stringify(taskMetadata)));
|
||||
|
||||
await processDelayedTasks(mockEnv, mockCtx);
|
||||
|
||||
expect(queueSendSpy).toHaveBeenCalledTimes(1);
|
||||
expect(kvDeleteSpy).toHaveBeenCalledTimes(1);
|
||||
expect(kvDeleteSpy).toHaveBeenCalledWith(
|
||||
`delayed-task:${scheduledTime}:task-1`,
|
||||
);
|
||||
});
|
||||
|
||||
it("does not queue tasks beyond 12 hours", async () => {
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
const scheduledTime = now + 24 * 3600; // 24 hours from now
|
||||
|
||||
const taskMetadata = {
|
||||
queueName: "NEW_EPISODE",
|
||||
body: { aniListId: 456, episodeNumber: 2 },
|
||||
headers: { "Content-Type": "application/json" },
|
||||
scheduledEpochTime: scheduledTime,
|
||||
taskId: "task-2",
|
||||
createdAt: now,
|
||||
retryCount: 0,
|
||||
};
|
||||
|
||||
mockEnv.DELAYED_TASKS.list = mock(() =>
|
||||
Promise.resolve({
|
||||
keys: [{ name: `delayed-task:${scheduledTime}:task-2` }],
|
||||
list_complete: true,
|
||||
}),
|
||||
);
|
||||
|
||||
kvGetSpy.mockReturnValue(Promise.resolve(JSON.stringify(taskMetadata)));
|
||||
|
||||
await processDelayedTasks(mockEnv, mockCtx);
|
||||
|
||||
expect(queueSendSpy).not.toHaveBeenCalled();
|
||||
expect(kvDeleteSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("increments retry count on queue failure", async () => {
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
const scheduledTime = now + 1 * 3600; // 1 hour from now
|
||||
|
||||
const taskMetadata = {
|
||||
queueName: "NEW_EPISODE",
|
||||
body: { aniListId: 789, episodeNumber: 3 },
|
||||
headers: { "Content-Type": "application/json" },
|
||||
scheduledEpochTime: scheduledTime,
|
||||
taskId: "task-3",
|
||||
createdAt: now - 23 * 3600,
|
||||
retryCount: 0,
|
||||
};
|
||||
|
||||
mockEnv.DELAYED_TASKS.list = mock(() =>
|
||||
Promise.resolve({
|
||||
keys: [{ name: `delayed-task:${scheduledTime}:task-3` }],
|
||||
list_complete: true,
|
||||
}),
|
||||
);
|
||||
|
||||
kvGetSpy.mockReturnValue(Promise.resolve(JSON.stringify(taskMetadata)));
|
||||
queueSendSpy.mockRejectedValue(new Error("Queue error"));
|
||||
|
||||
await processDelayedTasks(mockEnv, mockCtx);
|
||||
|
||||
expect(kvPutSpy).toHaveBeenCalledTimes(1);
|
||||
const updatedMetadata = JSON.parse(kvPutSpy.mock.calls[0][1]);
|
||||
expect(updatedMetadata.retryCount).toBe(1);
|
||||
expect(kvDeleteSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("logs alert after 3 failed attempts", async () => {
|
||||
const consoleErrorSpy = mock(() => {});
|
||||
const originalConsoleError = console.error;
|
||||
console.error = consoleErrorSpy as any;
|
||||
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
const scheduledTime = now + 1 * 3600;
|
||||
|
||||
const taskMetadata = {
|
||||
queueName: "NEW_EPISODE",
|
||||
body: { aniListId: 999, episodeNumber: 4 },
|
||||
headers: { "Content-Type": "application/json" },
|
||||
scheduledEpochTime: scheduledTime,
|
||||
taskId: "task-4",
|
||||
createdAt: now - 23 * 3600,
|
||||
retryCount: 2, // Will become 3 after this failure
|
||||
};
|
||||
|
||||
mockEnv.DELAYED_TASKS.list = mock(() =>
|
||||
Promise.resolve({
|
||||
keys: [{ name: `delayed-task:${scheduledTime}:task-4` }],
|
||||
list_complete: true,
|
||||
}),
|
||||
);
|
||||
|
||||
kvGetSpy.mockReturnValue(Promise.resolve(JSON.stringify(taskMetadata)));
|
||||
queueSendSpy.mockRejectedValue(new Error("Queue error"));
|
||||
|
||||
await processDelayedTasks(mockEnv, mockCtx);
|
||||
|
||||
// Check that alert was logged
|
||||
const alertCalls = consoleErrorSpy.mock.calls.filter((call: any) =>
|
||||
call[0]?.includes("🚨 ALERT"),
|
||||
);
|
||||
expect(alertCalls.length).toBeGreaterThan(0);
|
||||
|
||||
console.error = originalConsoleError;
|
||||
});
|
||||
|
||||
it("handles multiple tasks in single cron run", async () => {
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
|
||||
const task1Metadata = {
|
||||
queueName: "NEW_EPISODE",
|
||||
body: { aniListId: 100, episodeNumber: 1 },
|
||||
headers: { "Content-Type": "application/json" },
|
||||
scheduledEpochTime: now + 2 * 3600,
|
||||
taskId: "task-1",
|
||||
createdAt: now - 20 * 3600,
|
||||
retryCount: 0,
|
||||
};
|
||||
|
||||
const task2Metadata = {
|
||||
queueName: "NEW_EPISODE",
|
||||
body: { aniListId: 200, episodeNumber: 2 },
|
||||
headers: { "Content-Type": "application/json" },
|
||||
scheduledEpochTime: now + 5 * 3600,
|
||||
taskId: "task-2",
|
||||
createdAt: now - 19 * 3600,
|
||||
retryCount: 0,
|
||||
};
|
||||
|
||||
mockEnv.DELAYED_TASKS.list = mock(() =>
|
||||
Promise.resolve({
|
||||
keys: [
|
||||
{ name: `delayed-task:${task1Metadata.scheduledEpochTime}:task-1` },
|
||||
{ name: `delayed-task:${task2Metadata.scheduledEpochTime}:task-2` },
|
||||
],
|
||||
list_complete: true,
|
||||
}),
|
||||
);
|
||||
|
||||
kvGetSpy
|
||||
.mockReturnValueOnce(Promise.resolve(JSON.stringify(task1Metadata)))
|
||||
.mockReturnValueOnce(Promise.resolve(JSON.stringify(task2Metadata)));
|
||||
|
||||
await processDelayedTasks(mockEnv, mockCtx);
|
||||
|
||||
expect(queueSendSpy).toHaveBeenCalledTimes(2);
|
||||
expect(kvDeleteSpy).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("skips tasks with null values in KV", async () => {
|
||||
mockEnv.DELAYED_TASKS.list = mock(() =>
|
||||
Promise.resolve({
|
||||
keys: [{ name: "delayed-task:123:invalid" }],
|
||||
list_complete: true,
|
||||
}),
|
||||
);
|
||||
|
||||
kvGetSpy.mockReturnValue(Promise.resolve(null));
|
||||
|
||||
await processDelayedTasks(mockEnv, mockCtx);
|
||||
|
||||
expect(queueSendSpy).not.toHaveBeenCalled();
|
||||
expect(kvDeleteSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
117
src/libs/tasks/processDelayedTasks.ts
Normal file
117
src/libs/tasks/processDelayedTasks.ts
Normal file
@@ -0,0 +1,117 @@
|
||||
import { DateTime } from "luxon";
|
||||
|
||||
import type { DelayedTaskMetadata } from "./delayedTask";
|
||||
import { deserializeDelayedTask } from "./delayedTask";
|
||||
import { queueTask } from "./queueTask";
|
||||
|
||||
const MAX_DELAY_SECONDS = 12 * 60 * 60; // 43,200 seconds (12 hours)
|
||||
const RETRY_ALERT_THRESHOLD = 3;
|
||||
|
||||
export async function processDelayedTasks(
|
||||
env: Cloudflare.Env,
|
||||
ctx: ExecutionContext,
|
||||
): Promise<void> {
|
||||
console.log("Starting delayed task processing cron job");
|
||||
|
||||
const kvNamespace = env.DELAYED_TASKS;
|
||||
if (!kvNamespace) {
|
||||
console.error("DELAYED_TASKS KV namespace not available");
|
||||
return;
|
||||
}
|
||||
|
||||
// List all keys in the KV namespace
|
||||
const listResult = await kvNamespace.list({ prefix: "delayed-task:" });
|
||||
const keys = listResult.keys;
|
||||
|
||||
if (keys.length === 0) {
|
||||
console.log("No delayed tasks found in KV");
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`Found ${keys.length} delayed tasks to check`);
|
||||
|
||||
const currentTime = Math.floor(Date.now() / 1000);
|
||||
const twelveHoursFromNow = currentTime + MAX_DELAY_SECONDS;
|
||||
|
||||
let processedCount = 0;
|
||||
let queuedCount = 0;
|
||||
let errorCount = 0;
|
||||
|
||||
for (const key of keys) {
|
||||
try {
|
||||
const value = await kvNamespace.get(key.name);
|
||||
if (!value) {
|
||||
console.warn(`Task key ${key.name} has no value, skipping`);
|
||||
continue;
|
||||
}
|
||||
|
||||
const metadata: DelayedTaskMetadata = deserializeDelayedTask(value);
|
||||
processedCount++;
|
||||
|
||||
// Check if task is ready to be queued (within 12 hours of scheduled time)
|
||||
if (metadata.scheduledEpochTime <= twelveHoursFromNow) {
|
||||
const remainingDelay = Math.max(
|
||||
0,
|
||||
metadata.scheduledEpochTime - currentTime,
|
||||
);
|
||||
|
||||
console.log(
|
||||
`Queueing task ${metadata.taskId} with ${remainingDelay}s remaining delay`,
|
||||
);
|
||||
|
||||
try {
|
||||
// Queue the task with the remaining delay
|
||||
await queueTask(
|
||||
metadata.queueName,
|
||||
metadata.body as any, // Type assertion needed due to union type complexity
|
||||
{
|
||||
scheduleConfig: { epochTime: metadata.scheduledEpochTime },
|
||||
env,
|
||||
},
|
||||
);
|
||||
|
||||
// Successfully queued, delete from KV
|
||||
await kvNamespace.delete(key.name);
|
||||
queuedCount++;
|
||||
|
||||
console.log(
|
||||
`Successfully queued and removed task ${metadata.taskId}`,
|
||||
);
|
||||
} catch (error) {
|
||||
// Increment retry count and update in KV
|
||||
const retryCount = (metadata.retryCount || 0) + 1;
|
||||
metadata.retryCount = retryCount;
|
||||
|
||||
await kvNamespace.put(key.name, JSON.stringify(metadata));
|
||||
|
||||
errorCount++;
|
||||
console.error(
|
||||
`Failed to queue task ${metadata.taskId} (retry ${retryCount})`,
|
||||
error,
|
||||
);
|
||||
|
||||
// Alert if retry threshold reached
|
||||
if (retryCount >= RETRY_ALERT_THRESHOLD) {
|
||||
console.error(
|
||||
`🚨 ALERT: Task ${metadata.taskId} has failed to queue ${retryCount} times. ` +
|
||||
`Queue: ${metadata.queueName}, Scheduled: ${DateTime.fromSeconds(metadata.scheduledEpochTime).toISO()}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
const hoursUntilReady =
|
||||
(metadata.scheduledEpochTime - twelveHoursFromNow) / 3600;
|
||||
console.log(
|
||||
`Task ${metadata.taskId} not ready yet (${hoursUntilReady.toFixed(1)} hours until queueable)`,
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`Error processing task key ${key.name}:`, error);
|
||||
errorCount++;
|
||||
}
|
||||
}
|
||||
|
||||
console.log(
|
||||
`Delayed task processing complete. Processed: ${processedCount}, Queued: ${queuedCount}, Errors: ${errorCount}`,
|
||||
);
|
||||
}
|
||||
211
src/libs/tasks/queueTask.spec.ts
Normal file
211
src/libs/tasks/queueTask.spec.ts
Normal file
@@ -0,0 +1,211 @@
|
||||
import { beforeEach, describe, expect, it, mock, spyOn } from "bun:test";
|
||||
|
||||
import { queueTask } from "./queueTask";
|
||||
|
||||
describe("queueTask - delayed task handling", () => {
|
||||
const MAX_DELAY_SECONDS = 12 * 60 * 60; // 43,200 seconds
|
||||
|
||||
let mockEnv: Cloudflare.Env;
|
||||
let kvPutSpy: ReturnType<typeof mock>;
|
||||
let queueSendSpy: ReturnType<typeof mock>;
|
||||
|
||||
beforeEach(() => {
|
||||
kvPutSpy = mock(() => Promise.resolve());
|
||||
queueSendSpy = mock(() => Promise.resolve());
|
||||
|
||||
mockEnv = {
|
||||
DELAYED_TASKS: {
|
||||
put: kvPutSpy,
|
||||
get: mock(() => Promise.resolve(null)),
|
||||
delete: mock(() => Promise.resolve()),
|
||||
list: mock(() => Promise.resolve({ keys: [], list_complete: true })),
|
||||
getWithMetadata: mock(() =>
|
||||
Promise.resolve({ value: null, metadata: null }),
|
||||
),
|
||||
} as any,
|
||||
NEW_EPISODE: {
|
||||
send: queueSendSpy,
|
||||
} as any,
|
||||
ANILIST_UPDATES: {
|
||||
send: mock(() => Promise.resolve()),
|
||||
} as any,
|
||||
} as any;
|
||||
|
||||
// Mock crypto.randomUUID
|
||||
globalThis.crypto.randomUUID = mock(() => "test-uuid-123");
|
||||
});
|
||||
|
||||
describe("tasks with delay <= 12 hours", () => {
|
||||
it("queues task directly when delay is less than 12 hours", async () => {
|
||||
await queueTask(
|
||||
"NEW_EPISODE",
|
||||
{ aniListId: 123, episodeNumber: 1 },
|
||||
{
|
||||
scheduleConfig: { delay: { hours: 6 } },
|
||||
env: mockEnv,
|
||||
},
|
||||
);
|
||||
|
||||
// Should queue directly
|
||||
expect(queueSendSpy).toHaveBeenCalledTimes(1);
|
||||
// Should NOT store in KV
|
||||
expect(kvPutSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("queues task directly when delay is exactly 12 hours", async () => {
|
||||
await queueTask(
|
||||
"NEW_EPISODE",
|
||||
{ aniListId: 456, episodeNumber: 2 },
|
||||
{
|
||||
scheduleConfig: { delay: { hours: 12 } },
|
||||
env: mockEnv,
|
||||
},
|
||||
);
|
||||
|
||||
expect(queueSendSpy).toHaveBeenCalledTimes(1);
|
||||
expect(kvPutSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("passes correct delay to queue", async () => {
|
||||
await queueTask(
|
||||
"NEW_EPISODE",
|
||||
{ aniListId: 789, episodeNumber: 3 },
|
||||
{
|
||||
scheduleConfig: { delay: { hours: 3, minutes: 30 } },
|
||||
env: mockEnv,
|
||||
},
|
||||
);
|
||||
|
||||
const callArgs = queueSendSpy.mock.calls[0];
|
||||
expect(callArgs[1].delaySeconds).toBe(3 * 3600 + 30 * 60);
|
||||
});
|
||||
});
|
||||
|
||||
describe("tasks with delay > 12 hours", () => {
|
||||
it("stores task in KV when delay exceeds 12 hours", async () => {
|
||||
await queueTask(
|
||||
"NEW_EPISODE",
|
||||
{ aniListId: 111, episodeNumber: 4 },
|
||||
{
|
||||
scheduleConfig: { delay: { hours: 24 } },
|
||||
env: mockEnv,
|
||||
},
|
||||
);
|
||||
|
||||
// Should store in KV
|
||||
expect(kvPutSpy).toHaveBeenCalledTimes(1);
|
||||
// Should NOT queue directly
|
||||
expect(queueSendSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("stores task in KV when delay is 12 hours + 1 second", async () => {
|
||||
await queueTask(
|
||||
"NEW_EPISODE",
|
||||
{ aniListId: 222, episodeNumber: 5 },
|
||||
{
|
||||
scheduleConfig: { delay: { hours: 12, seconds: 1 } },
|
||||
env: mockEnv,
|
||||
},
|
||||
);
|
||||
|
||||
expect(kvPutSpy).toHaveBeenCalledTimes(1);
|
||||
expect(queueSendSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("generates correct KV key format", async () => {
|
||||
const nowSeconds = Math.floor(Date.now() / 1000);
|
||||
|
||||
await queueTask(
|
||||
"NEW_EPISODE",
|
||||
{ aniListId: 333, episodeNumber: 6 },
|
||||
{
|
||||
scheduleConfig: { delay: { hours: 48 } },
|
||||
env: mockEnv,
|
||||
},
|
||||
);
|
||||
|
||||
const kvKey = kvPutSpy.mock.calls[0][0];
|
||||
expect(kvKey).toMatch(/^delayed-task:\d+:test-uuid-123$/);
|
||||
|
||||
// Verify timestamp is approximately correct (within 1 second)
|
||||
const timestampMatch = kvKey.match(/delayed-task:(\d+):/);
|
||||
if (timestampMatch) {
|
||||
const storedTimestamp = parseInt(timestampMatch[1]);
|
||||
const expectedTimestamp = nowSeconds + 48 * 3600;
|
||||
expect(Math.abs(storedTimestamp - expectedTimestamp)).toBeLessThan(2);
|
||||
}
|
||||
});
|
||||
|
||||
it("stores correct metadata in KV", async () => {
|
||||
await queueTask(
|
||||
"NEW_EPISODE",
|
||||
{ aniListId: 444, episodeNumber: 7 },
|
||||
{
|
||||
scheduleConfig: { delay: { hours: 36 } },
|
||||
env: mockEnv,
|
||||
},
|
||||
);
|
||||
|
||||
const kvValue = kvPutSpy.mock.calls[0][1];
|
||||
const metadata = JSON.parse(kvValue);
|
||||
|
||||
expect(metadata.queueName).toBe("NEW_EPISODE");
|
||||
expect(metadata.body).toEqual({ aniListId: 444, episodeNumber: 7 });
|
||||
expect(metadata.taskId).toBe("test-uuid-123");
|
||||
expect(metadata.retryCount).toBe(0);
|
||||
expect(metadata.headers).toBeDefined();
|
||||
expect(metadata.scheduledEpochTime).toBeTypeOf("number");
|
||||
expect(metadata.createdAt).toBeTypeOf("number");
|
||||
});
|
||||
|
||||
it("throws error when DELAYED_TASKS KV is not available", async () => {
|
||||
const envWithoutKV = { ...mockEnv };
|
||||
delete (envWithoutKV as any).DELAYED_TASKS;
|
||||
|
||||
await expect(
|
||||
queueTask(
|
||||
"NEW_EPISODE",
|
||||
{ aniListId: 555, episodeNumber: 8 },
|
||||
{
|
||||
scheduleConfig: { delay: { hours: 24 } },
|
||||
env: envWithoutKV,
|
||||
},
|
||||
),
|
||||
).rejects.toThrow("DELAYED_TASKS KV namespace not available");
|
||||
});
|
||||
});
|
||||
|
||||
describe("epoch time scheduling", () => {
|
||||
it("queues directly when epoch time is within 12 hours", async () => {
|
||||
const futureTime = Math.floor(Date.now() / 1000) + 3600; // 1 hour from now
|
||||
|
||||
await queueTask(
|
||||
"NEW_EPISODE",
|
||||
{ aniListId: 666, episodeNumber: 9 },
|
||||
{
|
||||
scheduleConfig: { epochTime: futureTime },
|
||||
env: mockEnv,
|
||||
},
|
||||
);
|
||||
|
||||
expect(queueSendSpy).toHaveBeenCalledTimes(1);
|
||||
expect(kvPutSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("stores in KV when epoch time is beyond 12 hours", async () => {
|
||||
const futureTime = Math.floor(Date.now() / 1000) + 24 * 3600; // 24 hours from now
|
||||
|
||||
await queueTask(
|
||||
"NEW_EPISODE",
|
||||
{ aniListId: 777, episodeNumber: 10 },
|
||||
{
|
||||
scheduleConfig: { epochTime: futureTime },
|
||||
env: mockEnv,
|
||||
},
|
||||
);
|
||||
|
||||
expect(kvPutSpy).toHaveBeenCalledTimes(1);
|
||||
expect(queueSendSpy).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -39,6 +39,41 @@ export async function queueTask(
|
||||
body,
|
||||
req?.header(),
|
||||
);
|
||||
|
||||
const MAX_DELAY_SECONDS = 12 * 60 * 60; // 43,200 seconds (12 hours)
|
||||
|
||||
// If delay exceeds 12 hours, store in KV for later processing
|
||||
if (scheduleTime > MAX_DELAY_SECONDS) {
|
||||
if (!env || !env.DELAYED_TASKS) {
|
||||
throw new Error("DELAYED_TASKS KV namespace not available");
|
||||
}
|
||||
|
||||
const { generateTaskKey, serializeDelayedTask } = await import(
|
||||
"./delayedTask"
|
||||
);
|
||||
const taskId = crypto.randomUUID();
|
||||
const scheduledEpochTime = Math.floor(Date.now() / 1000) + scheduleTime;
|
||||
|
||||
const metadata = {
|
||||
queueName,
|
||||
body,
|
||||
headers,
|
||||
scheduledEpochTime,
|
||||
taskId,
|
||||
createdAt: Math.floor(Date.now() / 1000),
|
||||
retryCount: 0,
|
||||
};
|
||||
|
||||
const key = generateTaskKey(scheduledEpochTime, taskId);
|
||||
await env.DELAYED_TASKS.put(key, serializeDelayedTask(metadata));
|
||||
|
||||
console.log(
|
||||
`Task stored in KV for delayed execution. Scheduled for: ${new Date(scheduledEpochTime * 1000).toISOString()}, Key: ${key}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// Otherwise, queue directly
|
||||
const contentType =
|
||||
headers["Content-Type"] === "application/json" ? "json" : "text";
|
||||
if (!env) {
|
||||
@@ -85,10 +120,12 @@ function buildTask(
|
||||
if (scheduleConfig) {
|
||||
const { delay, epochTime } = scheduleConfig;
|
||||
if (epochTime) {
|
||||
console.log("epochTime", epochTime);
|
||||
scheduleTime = DateTime.fromSeconds(epochTime)
|
||||
.diffNow("second")
|
||||
.as("second");
|
||||
} else if (delay) {
|
||||
console.log("delay", delay);
|
||||
scheduleTime = Duration.fromDurationLike(delay).as("second");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,6 +45,14 @@ queue = "anilist-updates"
|
||||
[[queues.consumers]]
|
||||
queue = "new-episode"
|
||||
|
||||
[[kv_namespaces]]
|
||||
binding = "DELAYED_TASKS"
|
||||
id = "c8db249d8ee7462b91f9c374321776e4"
|
||||
preview_id = "ff38240eb2aa4b1388c705f4974f5aec"
|
||||
|
||||
[triggers]
|
||||
crons = ["0 */12 * * *"]
|
||||
|
||||
[[d1_databases]]
|
||||
binding = "DB"
|
||||
database_name = "aniplay"
|
||||
|
||||
Reference in New Issue
Block a user