Compare commits
6 Commits
80a6f67ead
...
eb6dc545e2
| Author | SHA1 | Date | |
|---|---|---|---|
| eb6dc545e2 | |||
| a99961df51 | |||
| d5b113c884 | |||
| 6eb42f6a33 | |||
| 05df043fbe | |||
| fb7990b274 |
@@ -6,6 +6,7 @@
|
|||||||
"type": "module",
|
"type": "module",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"dev": "wrangler dev src/index.ts --port 8080",
|
"dev": "wrangler dev src/index.ts --port 8080",
|
||||||
|
"deploy": "wrangler deploy --minify src/index.ts",
|
||||||
"env:generate": "tsx src/scripts/generateEnv.ts",
|
"env:generate": "tsx src/scripts/generateEnv.ts",
|
||||||
"env:verify": "tsx src/scripts/verifyEnv.ts",
|
"env:verify": "tsx src/scripts/verifyEnv.ts",
|
||||||
"db:generate": "drizzle-kit generate",
|
"db:generate": "drizzle-kit generate",
|
||||||
|
|||||||
@@ -1,204 +1,158 @@
|
|||||||
|
import { env } from "cloudflare:test";
|
||||||
|
import { DateTime } from "luxon";
|
||||||
import { beforeEach, describe, expect, it, vi } from "vitest";
|
import { beforeEach, describe, expect, it, vi } from "vitest";
|
||||||
|
|
||||||
|
import { getTestEnv } from "../test/getTestEnv";
|
||||||
import { processDelayedTasks } from "./processDelayedTasks";
|
import { processDelayedTasks } from "./processDelayedTasks";
|
||||||
|
|
||||||
describe("processDelayedTasks", () => {
|
describe("processDelayedTasks", () => {
|
||||||
let mockEnv: Cloudflare.Env;
|
beforeEach(async () => {
|
||||||
let mockCtx: ExecutionContext;
|
const tasksToDelete = await env.DELAYED_TASKS.list({
|
||||||
let kvGetSpy: ReturnType<typeof vi.fn>;
|
prefix: "delayed-task:",
|
||||||
let kvDeleteSpy: ReturnType<typeof vi.fn>;
|
});
|
||||||
let kvPutSpy: ReturnType<typeof vi.fn>;
|
console.log(`Found ${tasksToDelete.keys.length} tasks to delete`);
|
||||||
let queueSendSpy: ReturnType<typeof vi.fn>;
|
for (const task of tasksToDelete.keys) {
|
||||||
|
await env.DELAYED_TASKS.delete(task.name);
|
||||||
beforeEach(() => {
|
}
|
||||||
kvGetSpy = vi.fn(() => Promise.resolve(null));
|
|
||||||
kvDeleteSpy = vi.fn(() => Promise.resolve());
|
|
||||||
kvPutSpy = vi.fn(() => Promise.resolve());
|
|
||||||
queueSendSpy = vi.fn(() => Promise.resolve());
|
|
||||||
|
|
||||||
mockEnv = {
|
|
||||||
DELAYED_TASKS: {
|
|
||||||
get: kvGetSpy,
|
|
||||||
delete: kvDeleteSpy,
|
|
||||||
put: kvPutSpy,
|
|
||||||
list: vi.fn(() =>
|
|
||||||
Promise.resolve({
|
|
||||||
keys: [],
|
|
||||||
list_complete: true as const,
|
|
||||||
cacheStatus: null,
|
|
||||||
}),
|
|
||||||
),
|
|
||||||
getWithMetadata: vi.fn(() =>
|
|
||||||
Promise.resolve({ value: null, metadata: null }),
|
|
||||||
),
|
|
||||||
} as any,
|
|
||||||
NEW_EPISODE: {
|
|
||||||
send: queueSendSpy,
|
|
||||||
} as any,
|
|
||||||
ANILIST_UPDATES: {
|
|
||||||
send: vi.fn(() => Promise.resolve()),
|
|
||||||
} as any,
|
|
||||||
} as any;
|
|
||||||
|
|
||||||
mockCtx = {
|
|
||||||
waitUntil: vi.fn(() => {}),
|
|
||||||
passThroughOnException: vi.fn(() => {}),
|
|
||||||
} as any;
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it("handles empty KV namespace", async () => {
|
it("handles empty KV namespace", async () => {
|
||||||
await processDelayedTasks(mockEnv, mockCtx);
|
await processDelayedTasks(env);
|
||||||
|
|
||||||
expect(kvDeleteSpy).not.toHaveBeenCalled();
|
await expect(
|
||||||
expect(queueSendSpy).not.toHaveBeenCalled();
|
env.DELAYED_TASKS.list({ prefix: "delayed-task:" }).then(
|
||||||
|
(result) => result.keys,
|
||||||
|
),
|
||||||
|
).resolves.toHaveLength(0);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("queues tasks within 12 hours of scheduled time", async () => {
|
it("queues tasks within 9 hours of scheduled time", async () => {
|
||||||
const now = Math.floor(Date.now() / 1000);
|
const now = DateTime.now();
|
||||||
const scheduledTime = now + 6 * 3600; // 6 hours from now
|
const scheduledTime = now.plus({ hours: 6 }).toSeconds();
|
||||||
|
|
||||||
const taskMetadata = {
|
const taskMetadata = {
|
||||||
queueName: "NEW_EPISODE",
|
queueName: "NEW_EPISODE",
|
||||||
body: { aniListId: 123, episodeNumber: 1 },
|
body: { aniListId: 123, episodeNumber: 1 },
|
||||||
headers: { "Content-Type": "application/json" },
|
headers: { "Content-Type": "application/json" },
|
||||||
scheduledEpochTime: scheduledTime,
|
scheduledEpochTime: scheduledTime,
|
||||||
taskId: "task-1",
|
taskId: "task-1",
|
||||||
createdAt: now - 18 * 3600,
|
createdAt: now.minus({ hours: 18 }).toSeconds(),
|
||||||
retryCount: 0,
|
retryCount: 0,
|
||||||
};
|
};
|
||||||
|
await env.DELAYED_TASKS.put(
|
||||||
mockEnv.DELAYED_TASKS.list = vi.fn(() =>
|
|
||||||
Promise.resolve({
|
|
||||||
keys: [{ name: `delayed-task:${scheduledTime}:task-1` }],
|
|
||||||
list_complete: true as const,
|
|
||||||
cacheStatus: null,
|
|
||||||
}),
|
|
||||||
);
|
|
||||||
|
|
||||||
kvGetSpy.mockReturnValue(Promise.resolve(JSON.stringify(taskMetadata)));
|
|
||||||
|
|
||||||
await processDelayedTasks(mockEnv, mockCtx);
|
|
||||||
|
|
||||||
expect(queueSendSpy).toHaveBeenCalledTimes(1);
|
|
||||||
expect(kvDeleteSpy).toHaveBeenCalledTimes(1);
|
|
||||||
expect(kvDeleteSpy).toHaveBeenCalledWith(
|
|
||||||
`delayed-task:${scheduledTime}:task-1`,
|
`delayed-task:${scheduledTime}:task-1`,
|
||||||
|
JSON.stringify(taskMetadata),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
await processDelayedTasks(env);
|
||||||
|
|
||||||
|
await expect(
|
||||||
|
env.DELAYED_TASKS.get(`delayed-task:${scheduledTime}:task-1`),
|
||||||
|
).resolves.toBeNull();
|
||||||
});
|
});
|
||||||
|
|
||||||
it("does not queue tasks beyond 12 hours", async () => {
|
it("does not queue tasks beyond 9 hours", async () => {
|
||||||
const now = Math.floor(Date.now() / 1000);
|
const now = DateTime.now();
|
||||||
const scheduledTime = now + 24 * 3600; // 24 hours from now
|
const scheduledTime = now.plus({ hours: 24 }).toSeconds();
|
||||||
|
|
||||||
const taskMetadata = {
|
const taskMetadata = {
|
||||||
queueName: "NEW_EPISODE",
|
queueName: "NEW_EPISODE",
|
||||||
body: { aniListId: 456, episodeNumber: 2 },
|
body: { aniListId: 456, episodeNumber: 2 },
|
||||||
headers: { "Content-Type": "application/json" },
|
headers: { "Content-Type": "application/json" },
|
||||||
scheduledEpochTime: scheduledTime,
|
scheduledEpochTime: scheduledTime,
|
||||||
taskId: "task-2",
|
taskId: "task-2",
|
||||||
createdAt: now,
|
createdAt: now.toSeconds(),
|
||||||
retryCount: 0,
|
retryCount: 0,
|
||||||
};
|
};
|
||||||
|
await env.DELAYED_TASKS.put(
|
||||||
mockEnv.DELAYED_TASKS.list = vi.fn(() =>
|
`delayed-task:${scheduledTime}:task-2`,
|
||||||
Promise.resolve({
|
JSON.stringify(taskMetadata),
|
||||||
keys: [{ name: `delayed-task:${scheduledTime}:task-2` }],
|
|
||||||
list_complete: true as const,
|
|
||||||
cacheStatus: null,
|
|
||||||
}),
|
|
||||||
);
|
);
|
||||||
|
|
||||||
kvGetSpy.mockReturnValue(Promise.resolve(JSON.stringify(taskMetadata)));
|
await processDelayedTasks(env);
|
||||||
|
|
||||||
await processDelayedTasks(mockEnv, mockCtx);
|
await expect(
|
||||||
|
env.DELAYED_TASKS.get(`delayed-task:${scheduledTime}:task-2`),
|
||||||
expect(queueSendSpy).not.toHaveBeenCalled();
|
).resolves.toBeTruthy();
|
||||||
expect(kvDeleteSpy).not.toHaveBeenCalled();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it("increments retry count on queue failure", async () => {
|
it("increments retry count on queue failure", async () => {
|
||||||
const now = Math.floor(Date.now() / 1000);
|
const now = DateTime.now();
|
||||||
const scheduledTime = now + 1 * 3600; // 1 hour from now
|
const scheduledTime = now.plus({ hours: 1 }).toSeconds();
|
||||||
|
|
||||||
const taskMetadata = {
|
const taskMetadata = {
|
||||||
queueName: "NEW_EPISODE",
|
queueName: "NEW_EPISODE",
|
||||||
body: { aniListId: 789, episodeNumber: 3 },
|
body: { aniListId: 789, episodeNumber: 3 },
|
||||||
headers: { "Content-Type": "application/json" },
|
headers: { "Content-Type": "application/json" },
|
||||||
scheduledEpochTime: scheduledTime,
|
scheduledEpochTime: scheduledTime,
|
||||||
taskId: "task-3",
|
taskId: "task-3",
|
||||||
createdAt: now - 23 * 3600,
|
createdAt: now.minus({ hours: 23 }).toSeconds(),
|
||||||
retryCount: 0,
|
retryCount: 0,
|
||||||
};
|
};
|
||||||
|
const mockEnv = getTestEnv({
|
||||||
mockEnv.DELAYED_TASKS.list = vi.fn(() =>
|
NEW_EPISODE: {
|
||||||
Promise.resolve({
|
send: vi.fn().mockRejectedValue(new Error("Queue error")),
|
||||||
keys: [{ name: `delayed-task:${scheduledTime}:task-3` }],
|
sendBatch: vi.fn().mockRejectedValue(new Error("Queue error")),
|
||||||
list_complete: true as const,
|
},
|
||||||
cacheStatus: null,
|
});
|
||||||
}),
|
await mockEnv.DELAYED_TASKS.put(
|
||||||
|
`delayed-task:${scheduledTime}:task-3`,
|
||||||
|
JSON.stringify(taskMetadata),
|
||||||
);
|
);
|
||||||
|
|
||||||
kvGetSpy.mockReturnValue(Promise.resolve(JSON.stringify(taskMetadata)));
|
await processDelayedTasks(mockEnv);
|
||||||
queueSendSpy.mockRejectedValue(new Error("Queue error"));
|
|
||||||
|
|
||||||
await processDelayedTasks(mockEnv, mockCtx);
|
const updatedMetadata = JSON.parse(
|
||||||
|
(await mockEnv.DELAYED_TASKS.get(
|
||||||
expect(kvPutSpy).toHaveBeenCalledTimes(1);
|
`delayed-task:${scheduledTime}:task-3`,
|
||||||
const updatedMetadata = JSON.parse(kvPutSpy.mock.calls[0][1]);
|
))!,
|
||||||
|
);
|
||||||
expect(updatedMetadata.retryCount).toBe(1);
|
expect(updatedMetadata.retryCount).toBe(1);
|
||||||
expect(kvDeleteSpy).not.toHaveBeenCalled();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it("logs alert after 3 failed attempts", async () => {
|
it("logs alert after 3 failed attempts", async () => {
|
||||||
const consoleErrorSpy = vi.fn(() => {});
|
const consoleErrorSpy = vi.fn(() => {});
|
||||||
const originalConsoleError = console.error;
|
const originalConsoleError = console.error;
|
||||||
console.error = consoleErrorSpy as any;
|
console.error = consoleErrorSpy as any;
|
||||||
|
const now = DateTime.now();
|
||||||
const now = Math.floor(Date.now() / 1000);
|
const scheduledTime = now.plus({ hours: 1 }).toSeconds();
|
||||||
const scheduledTime = now + 1 * 3600;
|
|
||||||
|
|
||||||
const taskMetadata = {
|
const taskMetadata = {
|
||||||
queueName: "NEW_EPISODE",
|
queueName: "NEW_EPISODE",
|
||||||
body: { aniListId: 999, episodeNumber: 4 },
|
body: { aniListId: 789, episodeNumber: 4 },
|
||||||
headers: { "Content-Type": "application/json" },
|
headers: { "Content-Type": "application/json" },
|
||||||
scheduledEpochTime: scheduledTime,
|
scheduledEpochTime: scheduledTime,
|
||||||
taskId: "task-4",
|
taskId: "task-4",
|
||||||
createdAt: now - 23 * 3600,
|
createdAt: now.minus({ hours: 23 }).toSeconds(),
|
||||||
retryCount: 2, // Will become 3 after this failure
|
retryCount: 2,
|
||||||
};
|
};
|
||||||
|
const mockEnv = getTestEnv({
|
||||||
mockEnv.DELAYED_TASKS.list = vi.fn(() =>
|
NEW_EPISODE: {
|
||||||
Promise.resolve({
|
send: vi.fn().mockRejectedValue(new Error("Queue error")),
|
||||||
keys: [{ name: `delayed-task:${scheduledTime}:task-4` }],
|
sendBatch: vi.fn().mockRejectedValue(new Error("Queue error")),
|
||||||
list_complete: true as const,
|
},
|
||||||
cacheStatus: null,
|
});
|
||||||
}),
|
await mockEnv.DELAYED_TASKS.put(
|
||||||
|
`delayed-task:${scheduledTime}:task-4`,
|
||||||
|
JSON.stringify(taskMetadata),
|
||||||
);
|
);
|
||||||
|
|
||||||
kvGetSpy.mockReturnValue(Promise.resolve(JSON.stringify(taskMetadata)));
|
await processDelayedTasks(mockEnv);
|
||||||
queueSendSpy.mockRejectedValue(new Error("Queue error"));
|
|
||||||
|
|
||||||
await processDelayedTasks(mockEnv, mockCtx);
|
|
||||||
|
|
||||||
// Check that alert was logged
|
// Check that alert was logged
|
||||||
const alertCalls = consoleErrorSpy.mock.calls.filter((call: any) =>
|
const alertCalls = consoleErrorSpy.mock.calls.filter((call: any) =>
|
||||||
call[0]?.includes("🚨 ALERT"),
|
call[0]?.includes("🚨 ALERT"),
|
||||||
);
|
);
|
||||||
expect(alertCalls.length).toBeGreaterThan(0);
|
expect(alertCalls.length).toBeGreaterThan(0);
|
||||||
|
|
||||||
console.error = originalConsoleError;
|
console.error = originalConsoleError;
|
||||||
});
|
});
|
||||||
|
|
||||||
it("handles multiple tasks in single cron run", async () => {
|
it("handles multiple tasks in single cron run", async () => {
|
||||||
const now = Math.floor(Date.now() / 1000);
|
const now = DateTime.now();
|
||||||
|
|
||||||
const task1Metadata = {
|
const task1Metadata = {
|
||||||
queueName: "NEW_EPISODE",
|
queueName: "NEW_EPISODE",
|
||||||
body: { aniListId: 100, episodeNumber: 1 },
|
body: { aniListId: 100, episodeNumber: 1 },
|
||||||
headers: { "Content-Type": "application/json" },
|
headers: { "Content-Type": "application/json" },
|
||||||
scheduledEpochTime: now + 2 * 3600,
|
scheduledEpochTime: now.plus({ hours: 2 }).toSeconds(),
|
||||||
taskId: "task-1",
|
taskId: "task-1",
|
||||||
createdAt: now - 20 * 3600,
|
createdAt: now.minus({ hours: 20 }).toSeconds(),
|
||||||
retryCount: 0,
|
retryCount: 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -206,47 +160,53 @@ describe("processDelayedTasks", () => {
|
|||||||
queueName: "NEW_EPISODE",
|
queueName: "NEW_EPISODE",
|
||||||
body: { aniListId: 200, episodeNumber: 2 },
|
body: { aniListId: 200, episodeNumber: 2 },
|
||||||
headers: { "Content-Type": "application/json" },
|
headers: { "Content-Type": "application/json" },
|
||||||
scheduledEpochTime: now + 5 * 3600,
|
scheduledEpochTime: now.plus({ hours: 5 }).toSeconds(),
|
||||||
taskId: "task-2",
|
taskId: "task-2",
|
||||||
createdAt: now - 19 * 3600,
|
createdAt: now.minus({ hours: 19 }).toSeconds(),
|
||||||
retryCount: 0,
|
retryCount: 0,
|
||||||
};
|
};
|
||||||
|
await env.DELAYED_TASKS.put(
|
||||||
mockEnv.DELAYED_TASKS.list = vi.fn(() =>
|
`delayed-task:${task1Metadata.scheduledEpochTime}:task-1`,
|
||||||
Promise.resolve({
|
JSON.stringify(task1Metadata),
|
||||||
keys: [
|
);
|
||||||
{ name: `delayed-task:${task1Metadata.scheduledEpochTime}:task-1` },
|
await env.DELAYED_TASKS.put(
|
||||||
{ name: `delayed-task:${task2Metadata.scheduledEpochTime}:task-2` },
|
`delayed-task:${task2Metadata.scheduledEpochTime}:task-2`,
|
||||||
],
|
JSON.stringify(task2Metadata),
|
||||||
list_complete: true as const,
|
|
||||||
cacheStatus: null,
|
|
||||||
}),
|
|
||||||
);
|
);
|
||||||
|
|
||||||
kvGetSpy
|
await processDelayedTasks(env);
|
||||||
.mockReturnValueOnce(Promise.resolve(JSON.stringify(task1Metadata)))
|
|
||||||
.mockReturnValueOnce(Promise.resolve(JSON.stringify(task2Metadata)));
|
|
||||||
|
|
||||||
await processDelayedTasks(mockEnv, mockCtx);
|
await expect(
|
||||||
|
env.DELAYED_TASKS.get(
|
||||||
expect(queueSendSpy).toHaveBeenCalledTimes(2);
|
`delayed-task:${task1Metadata.scheduledEpochTime}:task-1`,
|
||||||
expect(kvDeleteSpy).toHaveBeenCalledTimes(2);
|
),
|
||||||
|
).resolves.toBeNull();
|
||||||
|
await expect(
|
||||||
|
env.DELAYED_TASKS.get(
|
||||||
|
`delayed-task:${task2Metadata.scheduledEpochTime}:task-2`,
|
||||||
|
),
|
||||||
|
).resolves.toBeNull();
|
||||||
});
|
});
|
||||||
|
|
||||||
it("skips tasks with null values in KV", async () => {
|
it("skips tasks with null values in KV", async () => {
|
||||||
mockEnv.DELAYED_TASKS.list = vi.fn(() =>
|
const queueSendSpy = vi.fn().mockResolvedValue(undefined);
|
||||||
Promise.resolve({
|
const mockEnv = getTestEnv({
|
||||||
keys: [{ name: "delayed-task:123:invalid" }],
|
NEW_EPISODE: {
|
||||||
list_complete: true as const,
|
send: queueSendSpy,
|
||||||
cacheStatus: null,
|
sendBatch: queueSendSpy,
|
||||||
}),
|
},
|
||||||
);
|
ANILIST_UPDATES: {
|
||||||
|
send: queueSendSpy,
|
||||||
|
sendBatch: queueSendSpy,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
await mockEnv.DELAYED_TASKS.put(`delayed-task:123:invalid`, null);
|
||||||
|
|
||||||
kvGetSpy.mockReturnValue(Promise.resolve(null));
|
await processDelayedTasks(mockEnv);
|
||||||
|
|
||||||
await processDelayedTasks(mockEnv, mockCtx);
|
|
||||||
|
|
||||||
expect(queueSendSpy).not.toHaveBeenCalled();
|
expect(queueSendSpy).not.toHaveBeenCalled();
|
||||||
expect(kvDeleteSpy).not.toHaveBeenCalled();
|
await expect(
|
||||||
|
mockEnv.DELAYED_TASKS.get(`delayed-task:123:invalid`),
|
||||||
|
).resolves.toBeNull();
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -2,15 +2,11 @@ import { DateTime } from "luxon";
|
|||||||
|
|
||||||
import type { DelayedTaskMetadata } from "./delayedTask";
|
import type { DelayedTaskMetadata } from "./delayedTask";
|
||||||
import { deserializeDelayedTask } from "./delayedTask";
|
import { deserializeDelayedTask } from "./delayedTask";
|
||||||
import { queueTask } from "./queueTask";
|
import { MAX_DELAY_SECONDS, queueTask } from "./queueTask";
|
||||||
|
|
||||||
const MAX_DELAY_SECONDS = 12 * 60 * 60; // 43,200 seconds (12 hours)
|
|
||||||
const RETRY_ALERT_THRESHOLD = 3;
|
const RETRY_ALERT_THRESHOLD = 3;
|
||||||
|
|
||||||
export async function processDelayedTasks(
|
export async function processDelayedTasks(env: Cloudflare.Env): Promise<void> {
|
||||||
env: Cloudflare.Env,
|
|
||||||
ctx: ExecutionContext,
|
|
||||||
): Promise<void> {
|
|
||||||
console.log("Starting delayed task processing cron job");
|
console.log("Starting delayed task processing cron job");
|
||||||
|
|
||||||
const kvNamespace = env.DELAYED_TASKS;
|
const kvNamespace = env.DELAYED_TASKS;
|
||||||
@@ -31,7 +27,7 @@ export async function processDelayedTasks(
|
|||||||
console.log(`Found ${keys.length} delayed tasks to check`);
|
console.log(`Found ${keys.length} delayed tasks to check`);
|
||||||
|
|
||||||
const currentTime = Math.floor(Date.now() / 1000);
|
const currentTime = Math.floor(Date.now() / 1000);
|
||||||
const twelveHoursFromNow = currentTime + MAX_DELAY_SECONDS;
|
const maxQueueTime = currentTime + MAX_DELAY_SECONDS;
|
||||||
|
|
||||||
let processedCount = 0;
|
let processedCount = 0;
|
||||||
let queuedCount = 0;
|
let queuedCount = 0;
|
||||||
@@ -40,16 +36,17 @@ export async function processDelayedTasks(
|
|||||||
for (const key of keys) {
|
for (const key of keys) {
|
||||||
try {
|
try {
|
||||||
const value = await kvNamespace.get(key.name);
|
const value = await kvNamespace.get(key.name);
|
||||||
if (!value) {
|
if (!value || value == "null") {
|
||||||
console.warn(`Task key ${key.name} has no value, skipping`);
|
console.warn(`Task key ${key.name} has no value, removing`);
|
||||||
|
await kvNamespace.delete(key.name);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
const metadata: DelayedTaskMetadata = deserializeDelayedTask(value);
|
const metadata: DelayedTaskMetadata = deserializeDelayedTask(value);
|
||||||
processedCount++;
|
processedCount++;
|
||||||
|
|
||||||
// Check if task is ready to be queued (within 12 hours of scheduled time)
|
// Check if task is ready to be queued (within 9 hours of scheduled time)
|
||||||
if (metadata.scheduledEpochTime <= twelveHoursFromNow) {
|
if (metadata.scheduledEpochTime <= maxQueueTime) {
|
||||||
const remainingDelay = Math.max(
|
const remainingDelay = Math.max(
|
||||||
0,
|
0,
|
||||||
metadata.scheduledEpochTime - currentTime,
|
metadata.scheduledEpochTime - currentTime,
|
||||||
@@ -100,7 +97,7 @@ export async function processDelayedTasks(
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
const hoursUntilReady =
|
const hoursUntilReady =
|
||||||
(metadata.scheduledEpochTime - twelveHoursFromNow) / 3600;
|
(metadata.scheduledEpochTime - maxQueueTime) / 3600;
|
||||||
console.log(
|
console.log(
|
||||||
`Task ${metadata.taskId} not ready yet (${hoursUntilReady.toFixed(1)} hours until queueable)`,
|
`Task ${metadata.taskId} not ready yet (${hoursUntilReady.toFixed(1)} hours until queueable)`,
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -30,6 +30,10 @@ interface QueueTaskOptionalArgs {
|
|||||||
env?: Cloudflare.Env;
|
env?: Cloudflare.Env;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const MAX_DELAY_SECONDS = Duration.fromObject({ hours: 9 }).as(
|
||||||
|
"seconds",
|
||||||
|
);
|
||||||
|
|
||||||
export async function queueTask(
|
export async function queueTask(
|
||||||
queueName: QueueName,
|
queueName: QueueName,
|
||||||
body: QueueBody[QueueName],
|
body: QueueBody[QueueName],
|
||||||
@@ -42,8 +46,6 @@ export async function queueTask(
|
|||||||
req?.header(),
|
req?.header(),
|
||||||
);
|
);
|
||||||
|
|
||||||
const MAX_DELAY_SECONDS = Duration.fromObject({ hours: 9 }).as("seconds");
|
|
||||||
|
|
||||||
// If delay exceeds 9 hours, store in KV for later processing
|
// If delay exceeds 9 hours, store in KV for later processing
|
||||||
if (scheduleTime > MAX_DELAY_SECONDS) {
|
if (scheduleTime > MAX_DELAY_SECONDS) {
|
||||||
if (!env || !env.DELAYED_TASKS) {
|
if (!env || !env.DELAYED_TASKS) {
|
||||||
|
|||||||
@@ -8,10 +8,12 @@ export function getTestEnvVariables(): Cloudflare.Env {
|
|||||||
export function getTestEnv({
|
export function getTestEnv({
|
||||||
ADMIN_SDK_JSON = '{"client_email": "test@test.com", "project_id": "test-26g38"}',
|
ADMIN_SDK_JSON = '{"client_email": "test@test.com", "project_id": "test-26g38"}',
|
||||||
LOG_DB_QUERIES = "false",
|
LOG_DB_QUERIES = "false",
|
||||||
|
...mockEnv
|
||||||
}: Partial<Cloudflare.Env> = {}): Cloudflare.Env {
|
}: Partial<Cloudflare.Env> = {}): Cloudflare.Env {
|
||||||
return {
|
return {
|
||||||
...env,
|
...env,
|
||||||
ADMIN_SDK_JSON,
|
ADMIN_SDK_JSON,
|
||||||
LOG_DB_QUERIES,
|
LOG_DB_QUERIES,
|
||||||
|
...mockEnv,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
145
worker-configuration.d.ts
vendored
145
worker-configuration.d.ts
vendored
@@ -2,32 +2,32 @@
|
|||||||
// Generated by Wrangler by running `wrangler types` (hash: df24977940a31745cb42d562b6645de2)
|
// Generated by Wrangler by running `wrangler types` (hash: df24977940a31745cb42d562b6645de2)
|
||||||
// Runtime types generated with workerd@1.20251210.0 2025-11-28 nodejs_compat
|
// Runtime types generated with workerd@1.20251210.0 2025-11-28 nodejs_compat
|
||||||
declare namespace Cloudflare {
|
declare namespace Cloudflare {
|
||||||
interface GlobalProps {
|
interface GlobalProps {
|
||||||
mainModule: typeof import("./src/index");
|
mainModule: typeof import("./src/index");
|
||||||
durableNamespaces: "AnilistDo";
|
durableNamespaces: "AnilistDo";
|
||||||
}
|
}
|
||||||
interface Env {
|
interface Env {
|
||||||
DELAYED_TASKS: KVNamespace;
|
DELAYED_TASKS: KVNamespace;
|
||||||
ADMIN_SDK_JSON: string;
|
ADMIN_SDK_JSON: string;
|
||||||
CLOUDFLARE_TOKEN: string;
|
CLOUDFLARE_TOKEN: string;
|
||||||
CLOUDFLARE_D1_TOKEN: string;
|
CLOUDFLARE_D1_TOKEN: string;
|
||||||
CLOUDFLARE_ACCOUNT_ID: string;
|
CLOUDFLARE_ACCOUNT_ID: string;
|
||||||
CLOUDFLARE_DATABASE_ID: string;
|
CLOUDFLARE_DATABASE_ID: string;
|
||||||
PROXY_URL: string;
|
PROXY_URL: string;
|
||||||
USE_MOCK_DATA: string;
|
USE_MOCK_DATA: string;
|
||||||
LOG_DB_QUERIES: string;
|
LOG_DB_QUERIES: string;
|
||||||
ANILIST_DO: DurableObjectNamespace<import("./src/index").AnilistDo>;
|
ANILIST_DO: DurableObjectNamespace<import("./src/index").AnilistDo>;
|
||||||
DB: D1Database;
|
DB: D1Database;
|
||||||
ANILIST_UPDATES: Queue;
|
ANILIST_UPDATES: Queue;
|
||||||
NEW_EPISODE: Queue;
|
NEW_EPISODE: Queue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
interface Env extends Cloudflare.Env {}
|
interface Env extends Cloudflare.Env { }
|
||||||
type StringifyValues<EnvType extends Record<string, unknown>> = {
|
type StringifyValues<EnvType extends Record<string, unknown>> = {
|
||||||
[Binding in keyof EnvType]: EnvType[Binding] extends string ? EnvType[Binding] : string;
|
[Binding in keyof EnvType]: EnvType[Binding] extends string ? EnvType[Binding] : string;
|
||||||
};
|
};
|
||||||
declare namespace NodeJS {
|
declare namespace NodeJS {
|
||||||
interface ProcessEnv extends StringifyValues<Pick<Cloudflare.Env, "ADMIN_SDK_JSON" | "CLOUDFLARE_TOKEN" | "CLOUDFLARE_D1_TOKEN" | "CLOUDFLARE_ACCOUNT_ID" | "CLOUDFLARE_DATABASE_ID" | "PROXY_URL" | "USE_MOCK_DATA" | "LOG_DB_QUERIES">> {}
|
interface ProcessEnv extends StringifyValues<Pick<Cloudflare.Env, "ADMIN_SDK_JSON" | "CLOUDFLARE_TOKEN" | "CLOUDFLARE_D1_TOKEN" | "CLOUDFLARE_ACCOUNT_ID" | "CLOUDFLARE_DATABASE_ID" | "PROXY_URL" | "USE_MOCK_DATA" | "LOG_DB_QUERIES">> { }
|
||||||
}
|
}
|
||||||
|
|
||||||
// Begin runtime types
|
// Begin runtime types
|
||||||
@@ -1644,7 +1644,7 @@ declare abstract class Body {
|
|||||||
*/
|
*/
|
||||||
declare var Response: {
|
declare var Response: {
|
||||||
prototype: Response;
|
prototype: Response;
|
||||||
new (body?: BodyInit | null, init?: ResponseInit): Response;
|
new(body?: BodyInit | null, init?: ResponseInit): Response;
|
||||||
error(): Response;
|
error(): Response;
|
||||||
redirect(url: string, status?: number): Response;
|
redirect(url: string, status?: number): Response;
|
||||||
json(any: any, maybeInit?: (ResponseInit | Response)): Response;
|
json(any: any, maybeInit?: (ResponseInit | Response)): Response;
|
||||||
@@ -2192,7 +2192,7 @@ interface ReadableStream<R = any> {
|
|||||||
*/
|
*/
|
||||||
declare const ReadableStream: {
|
declare const ReadableStream: {
|
||||||
prototype: ReadableStream;
|
prototype: ReadableStream;
|
||||||
new (underlyingSource: UnderlyingByteSource, strategy?: QueuingStrategy<Uint8Array>): ReadableStream<Uint8Array>;
|
new(underlyingSource: UnderlyingByteSource, strategy?: QueuingStrategy<Uint8Array>): ReadableStream<Uint8Array>;
|
||||||
new <R = any>(underlyingSource?: UnderlyingSource<R>, strategy?: QueuingStrategy<R>): ReadableStream<R>;
|
new <R = any>(underlyingSource?: UnderlyingSource<R>, strategy?: QueuingStrategy<R>): ReadableStream<R>;
|
||||||
};
|
};
|
||||||
/**
|
/**
|
||||||
@@ -3034,7 +3034,7 @@ type WebSocketEventMap = {
|
|||||||
*/
|
*/
|
||||||
declare var WebSocket: {
|
declare var WebSocket: {
|
||||||
prototype: WebSocket;
|
prototype: WebSocket;
|
||||||
new (url: string, protocols?: (string[] | string)): WebSocket;
|
new(url: string, protocols?: (string[] | string)): WebSocket;
|
||||||
readonly READY_STATE_CONNECTING: number;
|
readonly READY_STATE_CONNECTING: number;
|
||||||
readonly CONNECTING: number;
|
readonly CONNECTING: number;
|
||||||
readonly READY_STATE_OPEN: number;
|
readonly READY_STATE_OPEN: number;
|
||||||
@@ -3091,7 +3091,7 @@ interface WebSocket extends EventTarget<WebSocketEventMap> {
|
|||||||
extensions: string | null;
|
extensions: string | null;
|
||||||
}
|
}
|
||||||
declare const WebSocketPair: {
|
declare const WebSocketPair: {
|
||||||
new (): {
|
new(): {
|
||||||
0: WebSocket;
|
0: WebSocket;
|
||||||
1: WebSocket;
|
1: WebSocket;
|
||||||
};
|
};
|
||||||
@@ -9413,21 +9413,21 @@ interface IncomingRequestCfPropertiesTLSClientAuthPlaceholder {
|
|||||||
certNotAfter: "";
|
certNotAfter: "";
|
||||||
}
|
}
|
||||||
/** Possible outcomes of TLS verification */
|
/** Possible outcomes of TLS verification */
|
||||||
declare type CertVerificationStatus =
|
declare type CertVerificationStatus =
|
||||||
/** Authentication succeeded */
|
/** Authentication succeeded */
|
||||||
"SUCCESS"
|
"SUCCESS"
|
||||||
/** No certificate was presented */
|
/** No certificate was presented */
|
||||||
| "NONE"
|
| "NONE"
|
||||||
/** Failed because the certificate was self-signed */
|
/** Failed because the certificate was self-signed */
|
||||||
| "FAILED:self signed certificate"
|
| "FAILED:self signed certificate"
|
||||||
/** Failed because the certificate failed a trust chain check */
|
/** Failed because the certificate failed a trust chain check */
|
||||||
| "FAILED:unable to verify the first certificate"
|
| "FAILED:unable to verify the first certificate"
|
||||||
/** Failed because the certificate not yet valid */
|
/** Failed because the certificate not yet valid */
|
||||||
| "FAILED:certificate is not yet valid"
|
| "FAILED:certificate is not yet valid"
|
||||||
/** Failed because the certificate is expired */
|
/** Failed because the certificate is expired */
|
||||||
| "FAILED:certificate has expired"
|
| "FAILED:certificate has expired"
|
||||||
/** Failed for another unspecified reason */
|
/** Failed for another unspecified reason */
|
||||||
| "FAILED";
|
| "FAILED";
|
||||||
/**
|
/**
|
||||||
* An upstream endpoint's response to a TCP `keepalive` message from Cloudflare.
|
* An upstream endpoint's response to a TCP `keepalive` message from Cloudflare.
|
||||||
*/
|
*/
|
||||||
@@ -9477,15 +9477,15 @@ interface D1ExecResult {
|
|||||||
count: number;
|
count: number;
|
||||||
duration: number;
|
duration: number;
|
||||||
}
|
}
|
||||||
type D1SessionConstraint =
|
type D1SessionConstraint =
|
||||||
// Indicates that the first query should go to the primary, and the rest queries
|
// Indicates that the first query should go to the primary, and the rest queries
|
||||||
// using the same D1DatabaseSession will go to any replica that is consistent with
|
// using the same D1DatabaseSession will go to any replica that is consistent with
|
||||||
// the bookmark maintained by the session (returned by the first query).
|
// the bookmark maintained by the session (returned by the first query).
|
||||||
'first-primary'
|
'first-primary'
|
||||||
// Indicates that the first query can go anywhere (primary or replica), and the rest queries
|
// Indicates that the first query can go anywhere (primary or replica), and the rest queries
|
||||||
// using the same D1DatabaseSession will go to any replica that is consistent with
|
// using the same D1DatabaseSession will go to any replica that is consistent with
|
||||||
// the bookmark maintained by the session (returned by the first query).
|
// the bookmark maintained by the session (returned by the first query).
|
||||||
| 'first-unconstrained';
|
| 'first-unconstrained';
|
||||||
type D1SessionBookmark = string;
|
type D1SessionBookmark = string;
|
||||||
declare abstract class D1Database {
|
declare abstract class D1Database {
|
||||||
prepare(query: string): D1PreparedStatement;
|
prepare(query: string): D1PreparedStatement;
|
||||||
@@ -9599,7 +9599,7 @@ declare type EmailExportedHandler<Env = unknown> = (message: ForwardableEmailMes
|
|||||||
declare module "cloudflare:email" {
|
declare module "cloudflare:email" {
|
||||||
let _EmailMessage: {
|
let _EmailMessage: {
|
||||||
prototype: EmailMessage;
|
prototype: EmailMessage;
|
||||||
new (from: string, to: string, raw: ReadableStream | string): EmailMessage;
|
new(from: string, to: string, raw: ReadableStream | string): EmailMessage;
|
||||||
};
|
};
|
||||||
export { _EmailMessage as EmailMessage };
|
export { _EmailMessage as EmailMessage };
|
||||||
}
|
}
|
||||||
@@ -10058,17 +10058,17 @@ declare namespace Rpc {
|
|||||||
// The reason for using a generic type here is to build a serializable subset of structured
|
// The reason for using a generic type here is to build a serializable subset of structured
|
||||||
// cloneable composite types. This allows types defined with the "interface" keyword to pass the
|
// cloneable composite types. This allows types defined with the "interface" keyword to pass the
|
||||||
// serializable check as well. Otherwise, only types defined with the "type" keyword would pass.
|
// serializable check as well. Otherwise, only types defined with the "type" keyword would pass.
|
||||||
type Serializable<T> =
|
type Serializable<T> =
|
||||||
// Structured cloneables
|
// Structured cloneables
|
||||||
BaseType
|
BaseType
|
||||||
// Structured cloneable composites
|
// Structured cloneable composites
|
||||||
| Map<T extends Map<infer U, unknown> ? Serializable<U> : never, T extends Map<unknown, infer U> ? Serializable<U> : never> | Set<T extends Set<infer U> ? Serializable<U> : never> | ReadonlyArray<T extends ReadonlyArray<infer U> ? Serializable<U> : never> | {
|
| Map<T extends Map<infer U, unknown> ? Serializable<U> : never, T extends Map<unknown, infer U> ? Serializable<U> : never> | Set<T extends Set<infer U> ? Serializable<U> : never> | ReadonlyArray<T extends ReadonlyArray<infer U> ? Serializable<U> : never> | {
|
||||||
[K in keyof T]: K extends number | string ? Serializable<T[K]> : never;
|
[K in keyof T]: K extends number | string ? Serializable<T[K]> : never;
|
||||||
}
|
}
|
||||||
// Special types
|
// Special types
|
||||||
| Stub<Stubable>
|
| Stub<Stubable>
|
||||||
// Serialized as stubs, see `Stubify`
|
// Serialized as stubs, see `Stubify`
|
||||||
| Stubable;
|
| Stubable;
|
||||||
// Base type for all RPC stubs, including common memory management methods.
|
// Base type for all RPC stubs, including common memory management methods.
|
||||||
// `T` is used as a marker type for unwrapping `Stub`s later.
|
// `T` is used as a marker type for unwrapping `Stub`s later.
|
||||||
interface StubBase<T extends Stubable> extends Disposable {
|
interface StubBase<T extends Stubable> extends Disposable {
|
||||||
@@ -10083,8 +10083,8 @@ declare namespace Rpc {
|
|||||||
type Stubify<T> = T extends Stubable ? Stub<T> : T extends Map<infer K, infer V> ? Map<Stubify<K>, Stubify<V>> : T extends Set<infer V> ? Set<Stubify<V>> : T extends Array<infer V> ? Array<Stubify<V>> : T extends ReadonlyArray<infer V> ? ReadonlyArray<Stubify<V>> : T extends BaseType ? T : T extends {
|
type Stubify<T> = T extends Stubable ? Stub<T> : T extends Map<infer K, infer V> ? Map<Stubify<K>, Stubify<V>> : T extends Set<infer V> ? Set<Stubify<V>> : T extends Array<infer V> ? Array<Stubify<V>> : T extends ReadonlyArray<infer V> ? ReadonlyArray<Stubify<V>> : T extends BaseType ? T : T extends {
|
||||||
[key: string | number]: any;
|
[key: string | number]: any;
|
||||||
} ? {
|
} ? {
|
||||||
[K in keyof T]: Stubify<T[K]>;
|
[K in keyof T]: Stubify<T[K]>;
|
||||||
} : T;
|
} : T;
|
||||||
// Recursively rewrite all `Stub<T>`s with the corresponding `T`s.
|
// Recursively rewrite all `Stub<T>`s with the corresponding `T`s.
|
||||||
// Note we use `StubBase` instead of `Stub` here to avoid circular dependencies:
|
// Note we use `StubBase` instead of `Stub` here to avoid circular dependencies:
|
||||||
// `Stub` depends on `Provider`, which depends on `Unstubify`, which would depend on `Stub`.
|
// `Stub` depends on `Provider`, which depends on `Unstubify`, which would depend on `Stub`.
|
||||||
@@ -10092,8 +10092,8 @@ declare namespace Rpc {
|
|||||||
type Unstubify<T> = T extends StubBase<infer V> ? V : T extends Map<infer K, infer V> ? Map<Unstubify<K>, Unstubify<V>> : T extends Set<infer V> ? Set<Unstubify<V>> : T extends Array<infer V> ? Array<Unstubify<V>> : T extends ReadonlyArray<infer V> ? ReadonlyArray<Unstubify<V>> : T extends BaseType ? T : T extends {
|
type Unstubify<T> = T extends StubBase<infer V> ? V : T extends Map<infer K, infer V> ? Map<Unstubify<K>, Unstubify<V>> : T extends Set<infer V> ? Set<Unstubify<V>> : T extends Array<infer V> ? Array<Unstubify<V>> : T extends ReadonlyArray<infer V> ? ReadonlyArray<Unstubify<V>> : T extends BaseType ? T : T extends {
|
||||||
[key: string | number]: unknown;
|
[key: string | number]: unknown;
|
||||||
} ? {
|
} ? {
|
||||||
[K in keyof T]: Unstubify<T[K]>;
|
[K in keyof T]: Unstubify<T[K]>;
|
||||||
} : T;
|
} : T;
|
||||||
type UnstubifyAll<A extends any[]> = {
|
type UnstubifyAll<A extends any[]> = {
|
||||||
[I in keyof A]: Unstubify<A[I]>;
|
[I in keyof A]: Unstubify<A[I]>;
|
||||||
};
|
};
|
||||||
@@ -10166,7 +10166,7 @@ declare namespace Cloudflare {
|
|||||||
[K in keyof MainModule]: LoopbackForExport<MainModule[K]>
|
[K in keyof MainModule]: LoopbackForExport<MainModule[K]>
|
||||||
// If the export is listed in `durableNamespaces`, then it is also a
|
// If the export is listed in `durableNamespaces`, then it is also a
|
||||||
// DurableObjectNamespace.
|
// DurableObjectNamespace.
|
||||||
& (K extends GlobalProp<"durableNamespaces", never> ? MainModule[K] extends new (...args: any[]) => infer DoInstance ? DoInstance extends Rpc.DurableObjectBranded ? DurableObjectNamespace<DoInstance> : DurableObjectNamespace<undefined> : DurableObjectNamespace<undefined> : {});
|
& (K extends GlobalProp<"durableNamespaces", never> ? MainModule[K] extends new (...args: any[]) => infer DoInstance ? DoInstance extends Rpc.DurableObjectBranded ? DurableObjectNamespace<DoInstance> : DurableObjectNamespace<undefined> : DurableObjectNamespace<undefined> : {});
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
declare namespace CloudflareWorkersModule {
|
declare namespace CloudflareWorkersModule {
|
||||||
@@ -10251,6 +10251,9 @@ declare namespace CloudflareWorkersModule {
|
|||||||
export const env: Cloudflare.Env;
|
export const env: Cloudflare.Env;
|
||||||
export const exports: Cloudflare.Exports;
|
export const exports: Cloudflare.Exports;
|
||||||
}
|
}
|
||||||
|
declare module 'cloudflare:test' {
|
||||||
|
export = CloudflareWorkersModule;
|
||||||
|
}
|
||||||
declare module 'cloudflare:workers' {
|
declare module 'cloudflare:workers' {
|
||||||
export = CloudflareWorkersModule;
|
export = CloudflareWorkersModule;
|
||||||
}
|
}
|
||||||
@@ -10822,10 +10825,10 @@ interface WorkflowInstanceCreateOptions<PARAMS = unknown> {
|
|||||||
}
|
}
|
||||||
type InstanceStatus = {
|
type InstanceStatus = {
|
||||||
status: 'queued' // means that instance is waiting to be started (see concurrency limits)
|
status: 'queued' // means that instance is waiting to be started (see concurrency limits)
|
||||||
| 'running' | 'paused' | 'errored' | 'terminated' // user terminated the instance while it was running
|
| 'running' | 'paused' | 'errored' | 'terminated' // user terminated the instance while it was running
|
||||||
| 'complete' | 'waiting' // instance is hibernating and waiting for sleep or event to finish
|
| 'complete' | 'waiting' // instance is hibernating and waiting for sleep or event to finish
|
||||||
| 'waitingForPause' // instance is finishing the current work to pause
|
| 'waitingForPause' // instance is finishing the current work to pause
|
||||||
| 'unknown';
|
| 'unknown';
|
||||||
error?: {
|
error?: {
|
||||||
name: string;
|
name: string;
|
||||||
message: string;
|
message: string;
|
||||||
|
|||||||
@@ -39,6 +39,14 @@ deleted_classes = ["AnilistDo"]
|
|||||||
tag = "v4"
|
tag = "v4"
|
||||||
new_sqlite_classes = ["AnilistDo"]
|
new_sqlite_classes = ["AnilistDo"]
|
||||||
|
|
||||||
|
[[migrations]]
|
||||||
|
tag = "v5"
|
||||||
|
deleted_classes = ["AnilistDo"]
|
||||||
|
|
||||||
|
[[migrations]]
|
||||||
|
tag = "v6"
|
||||||
|
new_sqlite_classes = ["AnilistDo"]
|
||||||
|
|
||||||
[[queues.producers]]
|
[[queues.producers]]
|
||||||
queue = "anilist-updates"
|
queue = "anilist-updates"
|
||||||
binding = "ANILIST_UPDATES"
|
binding = "ANILIST_UPDATES"
|
||||||
@@ -59,7 +67,7 @@ id = "c8db249d8ee7462b91f9c374321776e4"
|
|||||||
preview_id = "ff38240eb2aa4b1388c705f4974f5aec"
|
preview_id = "ff38240eb2aa4b1388c705f4974f5aec"
|
||||||
|
|
||||||
[triggers]
|
[triggers]
|
||||||
crons = ["0 */12 * * *"]
|
crons = ["0 */9 * * *"]
|
||||||
|
|
||||||
[[d1_databases]]
|
[[d1_databases]]
|
||||||
binding = "DB"
|
binding = "DB"
|
||||||
|
|||||||
Reference in New Issue
Block a user