feat: Increase maximum direct queue delay from 9 to 12 hours and cap retry delays at this new limit.
This commit is contained in:
18
src/index.ts
18
src/index.ts
@@ -7,7 +7,10 @@ import { maybeUpdateLastConnectedAt } from "~/controllers/maybeUpdateLastConnect
|
|||||||
import { AnilistUpdateType } from "~/libs/anilist/updateType";
|
import { AnilistUpdateType } from "~/libs/anilist/updateType";
|
||||||
import { calculateExponentialBackoff } from "~/libs/calculateExponentialBackoff";
|
import { calculateExponentialBackoff } from "~/libs/calculateExponentialBackoff";
|
||||||
import type { QueueName } from "~/libs/tasks/queueName.ts";
|
import type { QueueName } from "~/libs/tasks/queueName.ts";
|
||||||
import type { QueueBody } from "~/libs/tasks/queueTask";
|
import {
|
||||||
|
MAX_QUEUE_DELAY_SECONDS,
|
||||||
|
type QueueBody,
|
||||||
|
} from "~/libs/tasks/queueTask";
|
||||||
|
|
||||||
export const app = new OpenAPIHono<{ Bindings: Env }>();
|
export const app = new OpenAPIHono<{ Bindings: Env }>();
|
||||||
|
|
||||||
@@ -146,11 +149,14 @@ function onMessageQueue<QN extends QueueName>(
|
|||||||
);
|
);
|
||||||
console.error(error);
|
console.error(error);
|
||||||
message.retry({
|
message.retry({
|
||||||
delaySeconds: calculateExponentialBackoff({
|
delaySeconds: Math.min(
|
||||||
attempt: message.attempts,
|
calculateExponentialBackoff({
|
||||||
baseMin: retryDelayConfig[messageBatch.queue as QN]?.min,
|
attempt: message.attempts,
|
||||||
absCap: retryDelayConfig[messageBatch.queue as QN]?.max,
|
baseMin: retryDelayConfig[messageBatch.queue as QN]?.min,
|
||||||
}),
|
absCap: retryDelayConfig[messageBatch.queue as QN]?.max,
|
||||||
|
}),
|
||||||
|
MAX_QUEUE_DELAY_SECONDS,
|
||||||
|
),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ import { DateTime } from "luxon";
|
|||||||
|
|
||||||
import type { DelayedTaskMetadata } from "./delayedTask";
|
import type { DelayedTaskMetadata } from "./delayedTask";
|
||||||
import { deserializeDelayedTask } from "./delayedTask";
|
import { deserializeDelayedTask } from "./delayedTask";
|
||||||
import { MAX_DELAY_SECONDS, queueTask } from "./queueTask";
|
import { MAX_QUEUE_DELAY_SECONDS, queueTask } from "./queueTask";
|
||||||
|
|
||||||
const RETRY_ALERT_THRESHOLD = 3;
|
const RETRY_ALERT_THRESHOLD = 3;
|
||||||
|
|
||||||
@@ -27,7 +27,7 @@ export async function processDelayedTasks(env: Cloudflare.Env): Promise<void> {
|
|||||||
console.log(`Found ${keys.length} delayed tasks to check`);
|
console.log(`Found ${keys.length} delayed tasks to check`);
|
||||||
|
|
||||||
const currentTime = Math.floor(Date.now() / 1000);
|
const currentTime = Math.floor(Date.now() / 1000);
|
||||||
const maxQueueTime = currentTime + MAX_DELAY_SECONDS;
|
const maxQueueTime = currentTime + MAX_QUEUE_DELAY_SECONDS;
|
||||||
|
|
||||||
let processedCount = 0;
|
let processedCount = 0;
|
||||||
let queuedCount = 0;
|
let queuedCount = 0;
|
||||||
|
|||||||
@@ -81,8 +81,8 @@ describe("queueTask - delayed task handling", () => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("tasks with delay > 9 hours", () => {
|
describe("tasks with delay > 12 hours", () => {
|
||||||
it("stores task in KV when delay exceeds 9 hours", async () => {
|
it("stores task in KV when delay exceeds 12 hours", async () => {
|
||||||
await queueTask(
|
await queueTask(
|
||||||
"NEW_EPISODE",
|
"NEW_EPISODE",
|
||||||
{ aniListId: 111, episodeNumber: 4 },
|
{ aniListId: 111, episodeNumber: 4 },
|
||||||
@@ -98,12 +98,12 @@ describe("queueTask - delayed task handling", () => {
|
|||||||
expect(queueSendSpy).not.toHaveBeenCalled();
|
expect(queueSendSpy).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
it("stores task in KV when delay is 9 hours + 1 second", async () => {
|
it("stores task in KV when delay is 12 hours + 1 second", async () => {
|
||||||
await queueTask(
|
await queueTask(
|
||||||
"NEW_EPISODE",
|
"NEW_EPISODE",
|
||||||
{ aniListId: 222, episodeNumber: 5 },
|
{ aniListId: 222, episodeNumber: 5 },
|
||||||
{
|
{
|
||||||
scheduleConfig: { delay: { hours: 9, seconds: 1 } },
|
scheduleConfig: { delay: { hours: 12, seconds: 1 } },
|
||||||
env: mockEnv,
|
env: mockEnv,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
@@ -176,7 +176,7 @@ describe("queueTask - delayed task handling", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
describe("epoch time scheduling", () => {
|
describe("epoch time scheduling", () => {
|
||||||
it("queues directly when epoch time is within 9 hours", async () => {
|
it("queues directly when epoch time is within 12 hours", async () => {
|
||||||
const futureTime = Math.floor(Date.now() / 1000) + 3600; // 1 hour from now
|
const futureTime = Math.floor(Date.now() / 1000) + 3600; // 1 hour from now
|
||||||
|
|
||||||
await queueTask(
|
await queueTask(
|
||||||
@@ -192,7 +192,7 @@ describe("queueTask - delayed task handling", () => {
|
|||||||
expect(kvPutSpy).not.toHaveBeenCalled();
|
expect(kvPutSpy).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
it("stores in KV when epoch time is beyond 9 hours", async () => {
|
it("stores in KV when epoch time is beyond 12 hours", async () => {
|
||||||
const futureTime = Math.floor(Date.now() / 1000) + 24 * 3600; // 24 hours from now
|
const futureTime = Math.floor(Date.now() / 1000) + 24 * 3600; // 24 hours from now
|
||||||
|
|
||||||
await queueTask(
|
await queueTask(
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ interface QueueTaskOptionalArgs {
|
|||||||
env?: Cloudflare.Env;
|
env?: Cloudflare.Env;
|
||||||
}
|
}
|
||||||
|
|
||||||
export const MAX_DELAY_SECONDS = Duration.fromObject({ hours: 9 }).as(
|
export const MAX_QUEUE_DELAY_SECONDS = Duration.fromObject({ hours: 12 }).as(
|
||||||
"seconds",
|
"seconds",
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -46,8 +46,8 @@ export async function queueTask(
|
|||||||
req?.header(),
|
req?.header(),
|
||||||
);
|
);
|
||||||
|
|
||||||
// If delay exceeds 9 hours, store in KV for later processing
|
// If delay exceeds 12 hours, store in KV for later processing
|
||||||
if (scheduleTime > MAX_DELAY_SECONDS) {
|
if (scheduleTime > MAX_QUEUE_DELAY_SECONDS) {
|
||||||
if (!env || !env.DELAYED_TASKS) {
|
if (!env || !env.DELAYED_TASKS) {
|
||||||
throw new Error("DELAYED_TASKS KV namespace not available");
|
throw new Error("DELAYED_TASKS KV namespace not available");
|
||||||
}
|
}
|
||||||
@@ -132,7 +132,9 @@ function buildTask(
|
|||||||
scheduleTime = Duration.fromDurationLike(delay).as("second");
|
scheduleTime = Duration.fromDurationLike(delay).as("second");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
const authorizationHeader = headers?.["X-Anilist-Token"] ? { Authorization: `Bearer ${headers["X-Anilist-Token"]}` } : {};
|
const authorizationHeader = headers?.["X-Anilist-Token"]
|
||||||
|
? { Authorization: `Bearer ${headers["X-Anilist-Token"]}` }
|
||||||
|
: {};
|
||||||
|
|
||||||
switch (queueName) {
|
switch (queueName) {
|
||||||
case "ANILIST_UPDATES":
|
case "ANILIST_UPDATES":
|
||||||
|
|||||||
@@ -67,7 +67,7 @@ id = "c8db249d8ee7462b91f9c374321776e4"
|
|||||||
preview_id = "ff38240eb2aa4b1388c705f4974f5aec"
|
preview_id = "ff38240eb2aa4b1388c705f4974f5aec"
|
||||||
|
|
||||||
[triggers]
|
[triggers]
|
||||||
crons = ["0 */9 * * *"]
|
crons = ["0 */12 * * *"]
|
||||||
|
|
||||||
[[d1_databases]]
|
[[d1_databases]]
|
||||||
binding = "DB"
|
binding = "DB"
|
||||||
|
|||||||
Reference in New Issue
Block a user