diff --git a/src/libs/tasks/processDelayedTasks.spec.ts b/src/libs/tasks/processDelayedTasks.spec.ts index 889ba35..5b59409 100644 --- a/src/libs/tasks/processDelayedTasks.spec.ts +++ b/src/libs/tasks/processDelayedTasks.spec.ts @@ -1,204 +1,158 @@ +import { env } from "cloudflare:test"; +import { DateTime } from "luxon"; import { beforeEach, describe, expect, it, vi } from "vitest"; +import { getTestEnv } from "../test/getTestEnv"; import { processDelayedTasks } from "./processDelayedTasks"; describe("processDelayedTasks", () => { - let mockEnv: Cloudflare.Env; - let mockCtx: ExecutionContext; - let kvGetSpy: ReturnType; - let kvDeleteSpy: ReturnType; - let kvPutSpy: ReturnType; - let queueSendSpy: ReturnType; - - beforeEach(() => { - kvGetSpy = vi.fn(() => Promise.resolve(null)); - kvDeleteSpy = vi.fn(() => Promise.resolve()); - kvPutSpy = vi.fn(() => Promise.resolve()); - queueSendSpy = vi.fn(() => Promise.resolve()); - - mockEnv = { - DELAYED_TASKS: { - get: kvGetSpy, - delete: kvDeleteSpy, - put: kvPutSpy, - list: vi.fn(() => - Promise.resolve({ - keys: [], - list_complete: true as const, - cacheStatus: null, - }), - ), - getWithMetadata: vi.fn(() => - Promise.resolve({ value: null, metadata: null }), - ), - } as any, - NEW_EPISODE: { - send: queueSendSpy, - } as any, - ANILIST_UPDATES: { - send: vi.fn(() => Promise.resolve()), - } as any, - } as any; - - mockCtx = { - waitUntil: vi.fn(() => {}), - passThroughOnException: vi.fn(() => {}), - } as any; + beforeEach(async () => { + const tasksToDelete = await env.DELAYED_TASKS.list({ + prefix: "delayed-task:", + }); + console.log(`Found ${tasksToDelete.keys.length} tasks to delete`); + for (const task of tasksToDelete.keys) { + await env.DELAYED_TASKS.delete(task.name); + } }); it("handles empty KV namespace", async () => { - await processDelayedTasks(mockEnv, mockCtx); + await processDelayedTasks(env); - expect(kvDeleteSpy).not.toHaveBeenCalled(); - expect(queueSendSpy).not.toHaveBeenCalled(); + await expect( + env.DELAYED_TASKS.list({ prefix: "delayed-task:" }).then( + (result) => result.keys, + ), + ).resolves.toHaveLength(0); }); - it("queues tasks within 12 hours of scheduled time", async () => { - const now = Math.floor(Date.now() / 1000); - const scheduledTime = now + 6 * 3600; // 6 hours from now - + it("queues tasks within 9 hours of scheduled time", async () => { + const now = DateTime.now(); + const scheduledTime = now.plus({ hours: 6 }).toSeconds(); const taskMetadata = { queueName: "NEW_EPISODE", body: { aniListId: 123, episodeNumber: 1 }, headers: { "Content-Type": "application/json" }, scheduledEpochTime: scheduledTime, taskId: "task-1", - createdAt: now - 18 * 3600, + createdAt: now.minus({ hours: 18 }).toSeconds(), retryCount: 0, }; - - mockEnv.DELAYED_TASKS.list = vi.fn(() => - Promise.resolve({ - keys: [{ name: `delayed-task:${scheduledTime}:task-1` }], - list_complete: true as const, - cacheStatus: null, - }), - ); - - kvGetSpy.mockReturnValue(Promise.resolve(JSON.stringify(taskMetadata))); - - await processDelayedTasks(mockEnv, mockCtx); - - expect(queueSendSpy).toHaveBeenCalledTimes(1); - expect(kvDeleteSpy).toHaveBeenCalledTimes(1); - expect(kvDeleteSpy).toHaveBeenCalledWith( + await env.DELAYED_TASKS.put( `delayed-task:${scheduledTime}:task-1`, + JSON.stringify(taskMetadata), ); + + await processDelayedTasks(env); + + await expect( + env.DELAYED_TASKS.get(`delayed-task:${scheduledTime}:task-1`), + ).resolves.toBeNull(); }); - it("does not queue tasks beyond 12 hours", async () => { - const now = Math.floor(Date.now() / 1000); - const scheduledTime = now + 24 * 3600; // 24 hours from now - + it("does not queue tasks beyond 9 hours", async () => { + const now = DateTime.now(); + const scheduledTime = now.plus({ hours: 24 }).toSeconds(); const taskMetadata = { queueName: "NEW_EPISODE", body: { aniListId: 456, episodeNumber: 2 }, headers: { "Content-Type": "application/json" }, scheduledEpochTime: scheduledTime, taskId: "task-2", - createdAt: now, + createdAt: now.toSeconds(), retryCount: 0, }; - - mockEnv.DELAYED_TASKS.list = vi.fn(() => - Promise.resolve({ - keys: [{ name: `delayed-task:${scheduledTime}:task-2` }], - list_complete: true as const, - cacheStatus: null, - }), + await env.DELAYED_TASKS.put( + `delayed-task:${scheduledTime}:task-2`, + JSON.stringify(taskMetadata), ); - kvGetSpy.mockReturnValue(Promise.resolve(JSON.stringify(taskMetadata))); + await processDelayedTasks(env); - await processDelayedTasks(mockEnv, mockCtx); - - expect(queueSendSpy).not.toHaveBeenCalled(); - expect(kvDeleteSpy).not.toHaveBeenCalled(); + await expect( + env.DELAYED_TASKS.get(`delayed-task:${scheduledTime}:task-2`), + ).resolves.toBeTruthy(); }); it("increments retry count on queue failure", async () => { - const now = Math.floor(Date.now() / 1000); - const scheduledTime = now + 1 * 3600; // 1 hour from now - + const now = DateTime.now(); + const scheduledTime = now.plus({ hours: 1 }).toSeconds(); const taskMetadata = { queueName: "NEW_EPISODE", body: { aniListId: 789, episodeNumber: 3 }, headers: { "Content-Type": "application/json" }, scheduledEpochTime: scheduledTime, taskId: "task-3", - createdAt: now - 23 * 3600, + createdAt: now.minus({ hours: 23 }).toSeconds(), retryCount: 0, }; - - mockEnv.DELAYED_TASKS.list = vi.fn(() => - Promise.resolve({ - keys: [{ name: `delayed-task:${scheduledTime}:task-3` }], - list_complete: true as const, - cacheStatus: null, - }), + const mockEnv = getTestEnv({ + NEW_EPISODE: { + send: vi.fn().mockRejectedValue(new Error("Queue error")), + sendBatch: vi.fn().mockRejectedValue(new Error("Queue error")), + }, + }); + await mockEnv.DELAYED_TASKS.put( + `delayed-task:${scheduledTime}:task-3`, + JSON.stringify(taskMetadata), ); - kvGetSpy.mockReturnValue(Promise.resolve(JSON.stringify(taskMetadata))); - queueSendSpy.mockRejectedValue(new Error("Queue error")); + await processDelayedTasks(mockEnv); - await processDelayedTasks(mockEnv, mockCtx); - - expect(kvPutSpy).toHaveBeenCalledTimes(1); - const updatedMetadata = JSON.parse(kvPutSpy.mock.calls[0][1]); + const updatedMetadata = JSON.parse( + (await mockEnv.DELAYED_TASKS.get( + `delayed-task:${scheduledTime}:task-3`, + ))!, + ); expect(updatedMetadata.retryCount).toBe(1); - expect(kvDeleteSpy).not.toHaveBeenCalled(); }); it("logs alert after 3 failed attempts", async () => { const consoleErrorSpy = vi.fn(() => {}); const originalConsoleError = console.error; console.error = consoleErrorSpy as any; - - const now = Math.floor(Date.now() / 1000); - const scheduledTime = now + 1 * 3600; - + const now = DateTime.now(); + const scheduledTime = now.plus({ hours: 1 }).toSeconds(); const taskMetadata = { queueName: "NEW_EPISODE", - body: { aniListId: 999, episodeNumber: 4 }, + body: { aniListId: 789, episodeNumber: 4 }, headers: { "Content-Type": "application/json" }, scheduledEpochTime: scheduledTime, taskId: "task-4", - createdAt: now - 23 * 3600, - retryCount: 2, // Will become 3 after this failure + createdAt: now.minus({ hours: 23 }).toSeconds(), + retryCount: 2, }; - - mockEnv.DELAYED_TASKS.list = vi.fn(() => - Promise.resolve({ - keys: [{ name: `delayed-task:${scheduledTime}:task-4` }], - list_complete: true as const, - cacheStatus: null, - }), + const mockEnv = getTestEnv({ + NEW_EPISODE: { + send: vi.fn().mockRejectedValue(new Error("Queue error")), + sendBatch: vi.fn().mockRejectedValue(new Error("Queue error")), + }, + }); + await mockEnv.DELAYED_TASKS.put( + `delayed-task:${scheduledTime}:task-4`, + JSON.stringify(taskMetadata), ); - kvGetSpy.mockReturnValue(Promise.resolve(JSON.stringify(taskMetadata))); - queueSendSpy.mockRejectedValue(new Error("Queue error")); - - await processDelayedTasks(mockEnv, mockCtx); + await processDelayedTasks(mockEnv); // Check that alert was logged const alertCalls = consoleErrorSpy.mock.calls.filter((call: any) => call[0]?.includes("🚨 ALERT"), ); expect(alertCalls.length).toBeGreaterThan(0); - console.error = originalConsoleError; }); it("handles multiple tasks in single cron run", async () => { - const now = Math.floor(Date.now() / 1000); + const now = DateTime.now(); const task1Metadata = { queueName: "NEW_EPISODE", body: { aniListId: 100, episodeNumber: 1 }, headers: { "Content-Type": "application/json" }, - scheduledEpochTime: now + 2 * 3600, + scheduledEpochTime: now.plus({ hours: 2 }).toSeconds(), taskId: "task-1", - createdAt: now - 20 * 3600, + createdAt: now.minus({ hours: 20 }).toSeconds(), retryCount: 0, }; @@ -206,47 +160,53 @@ describe("processDelayedTasks", () => { queueName: "NEW_EPISODE", body: { aniListId: 200, episodeNumber: 2 }, headers: { "Content-Type": "application/json" }, - scheduledEpochTime: now + 5 * 3600, + scheduledEpochTime: now.plus({ hours: 5 }).toSeconds(), taskId: "task-2", - createdAt: now - 19 * 3600, + createdAt: now.minus({ hours: 19 }).toSeconds(), retryCount: 0, }; - - mockEnv.DELAYED_TASKS.list = vi.fn(() => - Promise.resolve({ - keys: [ - { name: `delayed-task:${task1Metadata.scheduledEpochTime}:task-1` }, - { name: `delayed-task:${task2Metadata.scheduledEpochTime}:task-2` }, - ], - list_complete: true as const, - cacheStatus: null, - }), + await env.DELAYED_TASKS.put( + `delayed-task:${task1Metadata.scheduledEpochTime}:task-1`, + JSON.stringify(task1Metadata), + ); + await env.DELAYED_TASKS.put( + `delayed-task:${task2Metadata.scheduledEpochTime}:task-2`, + JSON.stringify(task2Metadata), ); - kvGetSpy - .mockReturnValueOnce(Promise.resolve(JSON.stringify(task1Metadata))) - .mockReturnValueOnce(Promise.resolve(JSON.stringify(task2Metadata))); + await processDelayedTasks(env); - await processDelayedTasks(mockEnv, mockCtx); - - expect(queueSendSpy).toHaveBeenCalledTimes(2); - expect(kvDeleteSpy).toHaveBeenCalledTimes(2); + await expect( + env.DELAYED_TASKS.get( + `delayed-task:${task1Metadata.scheduledEpochTime}:task-1`, + ), + ).resolves.toBeNull(); + await expect( + env.DELAYED_TASKS.get( + `delayed-task:${task2Metadata.scheduledEpochTime}:task-2`, + ), + ).resolves.toBeNull(); }); it("skips tasks with null values in KV", async () => { - mockEnv.DELAYED_TASKS.list = vi.fn(() => - Promise.resolve({ - keys: [{ name: "delayed-task:123:invalid" }], - list_complete: true as const, - cacheStatus: null, - }), - ); + const queueSendSpy = vi.fn().mockResolvedValue(undefined); + const mockEnv = getTestEnv({ + NEW_EPISODE: { + send: queueSendSpy, + sendBatch: queueSendSpy, + }, + ANILIST_UPDATES: { + send: queueSendSpy, + sendBatch: queueSendSpy, + }, + }); + await mockEnv.DELAYED_TASKS.put(`delayed-task:123:invalid`, null); - kvGetSpy.mockReturnValue(Promise.resolve(null)); - - await processDelayedTasks(mockEnv, mockCtx); + await processDelayedTasks(mockEnv); expect(queueSendSpy).not.toHaveBeenCalled(); - expect(kvDeleteSpy).not.toHaveBeenCalled(); + await expect( + mockEnv.DELAYED_TASKS.get(`delayed-task:123:invalid`), + ).resolves.toBeNull(); }); });