refactor: use fake Vitest Cloudflare environment for processDelayedTasks test
This commit is contained in:
@@ -1,204 +1,158 @@
|
|||||||
|
import { env } from "cloudflare:test";
|
||||||
|
import { DateTime } from "luxon";
|
||||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
|
||||||
|
import { getTestEnv } from "../test/getTestEnv";
|
||||||
import { processDelayedTasks } from "./processDelayedTasks";
|
import { processDelayedTasks } from "./processDelayedTasks";
|
||||||
|
|
||||||
describe("processDelayedTasks", () => {
|
describe("processDelayedTasks", () => {
|
||||||
let mockEnv: Cloudflare.Env;
|
beforeEach(async () => {
|
||||||
let mockCtx: ExecutionContext;
|
const tasksToDelete = await env.DELAYED_TASKS.list({
|
||||||
let kvGetSpy: ReturnType<typeof vi.fn>;
|
prefix: "delayed-task:",
|
||||||
let kvDeleteSpy: ReturnType<typeof vi.fn>;
|
});
|
||||||
let kvPutSpy: ReturnType<typeof vi.fn>;
|
console.log(`Found ${tasksToDelete.keys.length} tasks to delete`);
|
||||||
let queueSendSpy: ReturnType<typeof vi.fn>;
|
for (const task of tasksToDelete.keys) {
|
||||||
|
await env.DELAYED_TASKS.delete(task.name);
|
||||||
beforeEach(() => {
|
}
|
||||||
kvGetSpy = vi.fn(() => Promise.resolve(null));
|
|
||||||
kvDeleteSpy = vi.fn(() => Promise.resolve());
|
|
||||||
kvPutSpy = vi.fn(() => Promise.resolve());
|
|
||||||
queueSendSpy = vi.fn(() => Promise.resolve());
|
|
||||||
|
|
||||||
mockEnv = {
|
|
||||||
DELAYED_TASKS: {
|
|
||||||
get: kvGetSpy,
|
|
||||||
delete: kvDeleteSpy,
|
|
||||||
put: kvPutSpy,
|
|
||||||
list: vi.fn(() =>
|
|
||||||
Promise.resolve({
|
|
||||||
keys: [],
|
|
||||||
list_complete: true as const,
|
|
||||||
cacheStatus: null,
|
|
||||||
}),
|
|
||||||
),
|
|
||||||
getWithMetadata: vi.fn(() =>
|
|
||||||
Promise.resolve({ value: null, metadata: null }),
|
|
||||||
),
|
|
||||||
} as any,
|
|
||||||
NEW_EPISODE: {
|
|
||||||
send: queueSendSpy,
|
|
||||||
} as any,
|
|
||||||
ANILIST_UPDATES: {
|
|
||||||
send: vi.fn(() => Promise.resolve()),
|
|
||||||
} as any,
|
|
||||||
} as any;
|
|
||||||
|
|
||||||
mockCtx = {
|
|
||||||
waitUntil: vi.fn(() => {}),
|
|
||||||
passThroughOnException: vi.fn(() => {}),
|
|
||||||
} as any;
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it("handles empty KV namespace", async () => {
|
it("handles empty KV namespace", async () => {
|
||||||
await processDelayedTasks(mockEnv, mockCtx);
|
await processDelayedTasks(env);
|
||||||
|
|
||||||
expect(kvDeleteSpy).not.toHaveBeenCalled();
|
await expect(
|
||||||
expect(queueSendSpy).not.toHaveBeenCalled();
|
env.DELAYED_TASKS.list({ prefix: "delayed-task:" }).then(
|
||||||
|
(result) => result.keys,
|
||||||
|
),
|
||||||
|
).resolves.toHaveLength(0);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("queues tasks within 12 hours of scheduled time", async () => {
|
it("queues tasks within 9 hours of scheduled time", async () => {
|
||||||
const now = Math.floor(Date.now() / 1000);
|
const now = DateTime.now();
|
||||||
const scheduledTime = now + 6 * 3600; // 6 hours from now
|
const scheduledTime = now.plus({ hours: 6 }).toSeconds();
|
||||||
|
|
||||||
const taskMetadata = {
|
const taskMetadata = {
|
||||||
queueName: "NEW_EPISODE",
|
queueName: "NEW_EPISODE",
|
||||||
body: { aniListId: 123, episodeNumber: 1 },
|
body: { aniListId: 123, episodeNumber: 1 },
|
||||||
headers: { "Content-Type": "application/json" },
|
headers: { "Content-Type": "application/json" },
|
||||||
scheduledEpochTime: scheduledTime,
|
scheduledEpochTime: scheduledTime,
|
||||||
taskId: "task-1",
|
taskId: "task-1",
|
||||||
createdAt: now - 18 * 3600,
|
createdAt: now.minus({ hours: 18 }).toSeconds(),
|
||||||
retryCount: 0,
|
retryCount: 0,
|
||||||
};
|
};
|
||||||
|
await env.DELAYED_TASKS.put(
|
||||||
mockEnv.DELAYED_TASKS.list = vi.fn(() =>
|
|
||||||
Promise.resolve({
|
|
||||||
keys: [{ name: `delayed-task:${scheduledTime}:task-1` }],
|
|
||||||
list_complete: true as const,
|
|
||||||
cacheStatus: null,
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
|
|
||||||
kvGetSpy.mockReturnValue(Promise.resolve(JSON.stringify(taskMetadata)));
|
|
||||||
|
|
||||||
await processDelayedTasks(mockEnv, mockCtx);
|
|
||||||
|
|
||||||
expect(queueSendSpy).toHaveBeenCalledTimes(1);
|
|
||||||
expect(kvDeleteSpy).toHaveBeenCalledTimes(1);
|
|
||||||
expect(kvDeleteSpy).toHaveBeenCalledWith(
|
|
||||||
`delayed-task:${scheduledTime}:task-1`,
|
`delayed-task:${scheduledTime}:task-1`,
|
||||||
|
JSON.stringify(taskMetadata),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
await processDelayedTasks(env);
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
env.DELAYED_TASKS.get(`delayed-task:${scheduledTime}:task-1`),
|
||||||
|
).resolves.toBeNull();
|
||||||
});
|
});
|
||||||
|
|
||||||
it("does not queue tasks beyond 12 hours", async () => {
|
it("does not queue tasks beyond 9 hours", async () => {
|
||||||
const now = Math.floor(Date.now() / 1000);
|
const now = DateTime.now();
|
||||||
const scheduledTime = now + 24 * 3600; // 24 hours from now
|
const scheduledTime = now.plus({ hours: 24 }).toSeconds();
|
||||||
|
|
||||||
const taskMetadata = {
|
const taskMetadata = {
|
||||||
queueName: "NEW_EPISODE",
|
queueName: "NEW_EPISODE",
|
||||||
body: { aniListId: 456, episodeNumber: 2 },
|
body: { aniListId: 456, episodeNumber: 2 },
|
||||||
headers: { "Content-Type": "application/json" },
|
headers: { "Content-Type": "application/json" },
|
||||||
scheduledEpochTime: scheduledTime,
|
scheduledEpochTime: scheduledTime,
|
||||||
taskId: "task-2",
|
taskId: "task-2",
|
||||||
createdAt: now,
|
createdAt: now.toSeconds(),
|
||||||
retryCount: 0,
|
retryCount: 0,
|
||||||
};
|
};
|
||||||
|
await env.DELAYED_TASKS.put(
|
||||||
mockEnv.DELAYED_TASKS.list = vi.fn(() =>
|
`delayed-task:${scheduledTime}:task-2`,
|
||||||
Promise.resolve({
|
JSON.stringify(taskMetadata),
|
||||||
keys: [{ name: `delayed-task:${scheduledTime}:task-2` }],
|
|
||||||
list_complete: true as const,
|
|
||||||
cacheStatus: null,
|
|
||||||
}),
|
|
||||||
);
|
);
|
||||||
|
|
||||||
kvGetSpy.mockReturnValue(Promise.resolve(JSON.stringify(taskMetadata)));
|
await processDelayedTasks(env);
|
||||||
|
|
||||||
await processDelayedTasks(mockEnv, mockCtx);
|
await expect(
|
||||||
|
env.DELAYED_TASKS.get(`delayed-task:${scheduledTime}:task-2`),
|
||||||
expect(queueSendSpy).not.toHaveBeenCalled();
|
).resolves.toBeTruthy();
|
||||||
expect(kvDeleteSpy).not.toHaveBeenCalled();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it("increments retry count on queue failure", async () => {
|
it("increments retry count on queue failure", async () => {
|
||||||
const now = Math.floor(Date.now() / 1000);
|
const now = DateTime.now();
|
||||||
const scheduledTime = now + 1 * 3600; // 1 hour from now
|
const scheduledTime = now.plus({ hours: 1 }).toSeconds();
|
||||||
|
|
||||||
const taskMetadata = {
|
const taskMetadata = {
|
||||||
queueName: "NEW_EPISODE",
|
queueName: "NEW_EPISODE",
|
||||||
body: { aniListId: 789, episodeNumber: 3 },
|
body: { aniListId: 789, episodeNumber: 3 },
|
||||||
headers: { "Content-Type": "application/json" },
|
headers: { "Content-Type": "application/json" },
|
||||||
scheduledEpochTime: scheduledTime,
|
scheduledEpochTime: scheduledTime,
|
||||||
taskId: "task-3",
|
taskId: "task-3",
|
||||||
createdAt: now - 23 * 3600,
|
createdAt: now.minus({ hours: 23 }).toSeconds(),
|
||||||
retryCount: 0,
|
retryCount: 0,
|
||||||
};
|
};
|
||||||
|
const mockEnv = getTestEnv({
|
||||||
mockEnv.DELAYED_TASKS.list = vi.fn(() =>
|
NEW_EPISODE: {
|
||||||
Promise.resolve({
|
send: vi.fn().mockRejectedValue(new Error("Queue error")),
|
||||||
keys: [{ name: `delayed-task:${scheduledTime}:task-3` }],
|
sendBatch: vi.fn().mockRejectedValue(new Error("Queue error")),
|
||||||
list_complete: true as const,
|
},
|
||||||
cacheStatus: null,
|
});
|
||||||
}),
|
await mockEnv.DELAYED_TASKS.put(
|
||||||
|
`delayed-task:${scheduledTime}:task-3`,
|
||||||
|
JSON.stringify(taskMetadata),
|
||||||
);
|
);
|
||||||
|
|
||||||
kvGetSpy.mockReturnValue(Promise.resolve(JSON.stringify(taskMetadata)));
|
await processDelayedTasks(mockEnv);
|
||||||
queueSendSpy.mockRejectedValue(new Error("Queue error"));
|
|
||||||
|
|
||||||
await processDelayedTasks(mockEnv, mockCtx);
|
const updatedMetadata = JSON.parse(
|
||||||
|
(await mockEnv.DELAYED_TASKS.get(
|
||||||
expect(kvPutSpy).toHaveBeenCalledTimes(1);
|
`delayed-task:${scheduledTime}:task-3`,
|
||||||
const updatedMetadata = JSON.parse(kvPutSpy.mock.calls[0][1]);
|
))!,
|
||||||
|
);
|
||||||
expect(updatedMetadata.retryCount).toBe(1);
|
expect(updatedMetadata.retryCount).toBe(1);
|
||||||
expect(kvDeleteSpy).not.toHaveBeenCalled();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it("logs alert after 3 failed attempts", async () => {
|
it("logs alert after 3 failed attempts", async () => {
|
||||||
const consoleErrorSpy = vi.fn(() => {});
|
const consoleErrorSpy = vi.fn(() => {});
|
||||||
const originalConsoleError = console.error;
|
const originalConsoleError = console.error;
|
||||||
console.error = consoleErrorSpy as any;
|
console.error = consoleErrorSpy as any;
|
||||||
|
const now = DateTime.now();
|
||||||
const now = Math.floor(Date.now() / 1000);
|
const scheduledTime = now.plus({ hours: 1 }).toSeconds();
|
||||||
const scheduledTime = now + 1 * 3600;
|
|
||||||
|
|
||||||
const taskMetadata = {
|
const taskMetadata = {
|
||||||
queueName: "NEW_EPISODE",
|
queueName: "NEW_EPISODE",
|
||||||
body: { aniListId: 999, episodeNumber: 4 },
|
body: { aniListId: 789, episodeNumber: 4 },
|
||||||
headers: { "Content-Type": "application/json" },
|
headers: { "Content-Type": "application/json" },
|
||||||
scheduledEpochTime: scheduledTime,
|
scheduledEpochTime: scheduledTime,
|
||||||
taskId: "task-4",
|
taskId: "task-4",
|
||||||
createdAt: now - 23 * 3600,
|
createdAt: now.minus({ hours: 23 }).toSeconds(),
|
||||||
retryCount: 2, // Will become 3 after this failure
|
retryCount: 2,
|
||||||
};
|
};
|
||||||
|
const mockEnv = getTestEnv({
|
||||||
mockEnv.DELAYED_TASKS.list = vi.fn(() =>
|
NEW_EPISODE: {
|
||||||
Promise.resolve({
|
send: vi.fn().mockRejectedValue(new Error("Queue error")),
|
||||||
keys: [{ name: `delayed-task:${scheduledTime}:task-4` }],
|
sendBatch: vi.fn().mockRejectedValue(new Error("Queue error")),
|
||||||
list_complete: true as const,
|
},
|
||||||
cacheStatus: null,
|
});
|
||||||
}),
|
await mockEnv.DELAYED_TASKS.put(
|
||||||
|
`delayed-task:${scheduledTime}:task-4`,
|
||||||
|
JSON.stringify(taskMetadata),
|
||||||
);
|
);
|
||||||
|
|
||||||
kvGetSpy.mockReturnValue(Promise.resolve(JSON.stringify(taskMetadata)));
|
await processDelayedTasks(mockEnv);
|
||||||
queueSendSpy.mockRejectedValue(new Error("Queue error"));
|
|
||||||
|
|
||||||
await processDelayedTasks(mockEnv, mockCtx);
|
|
||||||
|
|
||||||
// Check that alert was logged
|
// Check that alert was logged
|
||||||
const alertCalls = consoleErrorSpy.mock.calls.filter((call: any) =>
|
const alertCalls = consoleErrorSpy.mock.calls.filter((call: any) =>
|
||||||
call[0]?.includes("🚨 ALERT"),
|
call[0]?.includes("🚨 ALERT"),
|
||||||
);
|
);
|
||||||
expect(alertCalls.length).toBeGreaterThan(0);
|
expect(alertCalls.length).toBeGreaterThan(0);
|
||||||
|
|
||||||
console.error = originalConsoleError;
|
console.error = originalConsoleError;
|
||||||
});
|
});
|
||||||
|
|
||||||
it("handles multiple tasks in single cron run", async () => {
|
it("handles multiple tasks in single cron run", async () => {
|
||||||
const now = Math.floor(Date.now() / 1000);
|
const now = DateTime.now();
|
||||||
|
|
||||||
const task1Metadata = {
|
const task1Metadata = {
|
||||||
queueName: "NEW_EPISODE",
|
queueName: "NEW_EPISODE",
|
||||||
body: { aniListId: 100, episodeNumber: 1 },
|
body: { aniListId: 100, episodeNumber: 1 },
|
||||||
headers: { "Content-Type": "application/json" },
|
headers: { "Content-Type": "application/json" },
|
||||||
scheduledEpochTime: now + 2 * 3600,
|
scheduledEpochTime: now.plus({ hours: 2 }).toSeconds(),
|
||||||
taskId: "task-1",
|
taskId: "task-1",
|
||||||
createdAt: now - 20 * 3600,
|
createdAt: now.minus({ hours: 20 }).toSeconds(),
|
||||||
retryCount: 0,
|
retryCount: 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -206,47 +160,53 @@ describe("processDelayedTasks", () => {
|
|||||||
queueName: "NEW_EPISODE",
|
queueName: "NEW_EPISODE",
|
||||||
body: { aniListId: 200, episodeNumber: 2 },
|
body: { aniListId: 200, episodeNumber: 2 },
|
||||||
headers: { "Content-Type": "application/json" },
|
headers: { "Content-Type": "application/json" },
|
||||||
scheduledEpochTime: now + 5 * 3600,
|
scheduledEpochTime: now.plus({ hours: 5 }).toSeconds(),
|
||||||
taskId: "task-2",
|
taskId: "task-2",
|
||||||
createdAt: now - 19 * 3600,
|
createdAt: now.minus({ hours: 19 }).toSeconds(),
|
||||||
retryCount: 0,
|
retryCount: 0,
|
||||||
};
|
};
|
||||||
|
await env.DELAYED_TASKS.put(
|
||||||
mockEnv.DELAYED_TASKS.list = vi.fn(() =>
|
`delayed-task:${task1Metadata.scheduledEpochTime}:task-1`,
|
||||||
Promise.resolve({
|
JSON.stringify(task1Metadata),
|
||||||
keys: [
|
);
|
||||||
{ name: `delayed-task:${task1Metadata.scheduledEpochTime}:task-1` },
|
await env.DELAYED_TASKS.put(
|
||||||
{ name: `delayed-task:${task2Metadata.scheduledEpochTime}:task-2` },
|
`delayed-task:${task2Metadata.scheduledEpochTime}:task-2`,
|
||||||
],
|
JSON.stringify(task2Metadata),
|
||||||
list_complete: true as const,
|
|
||||||
cacheStatus: null,
|
|
||||||
}),
|
|
||||||
);
|
);
|
||||||
|
|
||||||
kvGetSpy
|
await processDelayedTasks(env);
|
||||||
.mockReturnValueOnce(Promise.resolve(JSON.stringify(task1Metadata)))
|
|
||||||
.mockReturnValueOnce(Promise.resolve(JSON.stringify(task2Metadata)));
|
|
||||||
|
|
||||||
await processDelayedTasks(mockEnv, mockCtx);
|
await expect(
|
||||||
|
env.DELAYED_TASKS.get(
|
||||||
expect(queueSendSpy).toHaveBeenCalledTimes(2);
|
`delayed-task:${task1Metadata.scheduledEpochTime}:task-1`,
|
||||||
expect(kvDeleteSpy).toHaveBeenCalledTimes(2);
|
),
|
||||||
|
).resolves.toBeNull();
|
||||||
|
await expect(
|
||||||
|
env.DELAYED_TASKS.get(
|
||||||
|
`delayed-task:${task2Metadata.scheduledEpochTime}:task-2`,
|
||||||
|
),
|
||||||
|
).resolves.toBeNull();
|
||||||
});
|
});
|
||||||
|
|
||||||
it("skips tasks with null values in KV", async () => {
|
it("skips tasks with null values in KV", async () => {
|
||||||
mockEnv.DELAYED_TASKS.list = vi.fn(() =>
|
const queueSendSpy = vi.fn().mockResolvedValue(undefined);
|
||||||
Promise.resolve({
|
const mockEnv = getTestEnv({
|
||||||
keys: [{ name: "delayed-task:123:invalid" }],
|
NEW_EPISODE: {
|
||||||
list_complete: true as const,
|
send: queueSendSpy,
|
||||||
cacheStatus: null,
|
sendBatch: queueSendSpy,
|
||||||
}),
|
},
|
||||||
);
|
ANILIST_UPDATES: {
|
||||||
|
send: queueSendSpy,
|
||||||
|
sendBatch: queueSendSpy,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
await mockEnv.DELAYED_TASKS.put(`delayed-task:123:invalid`, null);
|
||||||
|
|
||||||
kvGetSpy.mockReturnValue(Promise.resolve(null));
|
await processDelayedTasks(mockEnv);
|
||||||
|
|
||||||
await processDelayedTasks(mockEnv, mockCtx);
|
|
||||||
|
|
||||||
expect(queueSendSpy).not.toHaveBeenCalled();
|
expect(queueSendSpy).not.toHaveBeenCalled();
|
||||||
expect(kvDeleteSpy).not.toHaveBeenCalled();
|
await expect(
|
||||||
|
mockEnv.DELAYED_TASKS.get(`delayed-task:123:invalid`),
|
||||||
|
).resolves.toBeNull();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user