diff --git a/bun.lockb b/bun.lockb index 7e6fd7e..ded1c28 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/bunfig.toml b/bunfig.toml index b9357dc..a53b5e8 100644 --- a/bunfig.toml +++ b/bunfig.toml @@ -3,5 +3,4 @@ preload = [ "./testSetup.ts", "./src/mocks/consumet.ts", "./src/mocks/getGoogleAuthToken.ts", - "./src/mocks/qstash.ts", ] diff --git a/drizzle/0009_outstanding_trauma.sql b/drizzle/0009_outstanding_trauma.sql new file mode 100644 index 0000000..c66ad34 --- /dev/null +++ b/drizzle/0009_outstanding_trauma.sql @@ -0,0 +1 @@ +DROP TABLE `title_messages`; \ No newline at end of file diff --git a/drizzle/meta/0009_snapshot.json b/drizzle/meta/0009_snapshot.json new file mode 100644 index 0000000..531d829 --- /dev/null +++ b/drizzle/meta/0009_snapshot.json @@ -0,0 +1,138 @@ +{ + "version": "6", + "dialect": "sqlite", + "id": "2934b006-eebb-447b-a435-40da68bb896c", + "prevId": "efff8189-76e3-4944-b536-13121dcbe7b3", + "tables": { + "device_tokens": { + "name": "device_tokens", + "columns": { + "device_id": { + "name": "device_id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "token": { + "name": "token", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "username": { + "name": "username", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "last_connected_at": { + "name": "last_connected_at", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": "(CURRENT_TIMESTAMP)" + } + }, + "indexes": { + "device_tokens_token_unique": { + "name": "device_tokens_token_unique", + "columns": ["token"], + "isUnique": true + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {} + }, + "key_value": { + "name": "key_value", + "columns": { + "key": { + "name": "key", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "value": { + "name": "value", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {} + }, + "unreleased_titles": { + "name": "unreleased_titles", + "columns": { + "title_id": { + "name": "title_id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {} + }, + "watch_status": { + "name": "watch_status", + "columns": { + "device_id": { + "name": "device_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "title_id": { + "name": "title_id", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": { + "watch_status_device_id_device_tokens_device_id_fk": { + "name": "watch_status_device_id_device_tokens_device_id_fk", + "tableFrom": "watch_status", + "tableTo": "device_tokens", + "columnsFrom": ["device_id"], + "columnsTo": ["device_id"], + "onDelete": "no action", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "watch_status_device_id_title_id_pk": { + "columns": ["device_id", "title_id"], + "name": "watch_status_device_id_title_id_pk" + } + }, + "uniqueConstraints": {} + } + }, + "enums": {}, + "_meta": { + "schemas": {}, + "tables": {}, + "columns": {} + }, + "internal": { + "indexes": {} + } +} diff --git a/drizzle/meta/_journal.json b/drizzle/meta/_journal.json index 748fb39..e8a5cec 100644 --- a/drizzle/meta/_journal.json +++ b/drizzle/meta/_journal.json @@ -64,6 +64,13 @@ "when": 1726945478448, "tag": "0008_faulty_bushwacker", "breakpoints": true + }, + { + "idx": 9, + "version": "6", + "when": 1728151566508, + "tag": "0009_outstanding_trauma", + "breakpoints": true } ] } diff --git a/package.json b/package.json index 58be08a..20f8833 100644 --- a/package.json +++ b/package.json @@ -22,7 +22,6 @@ "@hono/zod-openapi": "^0.12.2", "@hono/zod-validator": "^0.2.2", "@libsql/client": "^0.6.2", - "@upstash/qstash": "^2.7.5", "drizzle-orm": "^0.31.4", "gql.tada": "^1.8.7", "graphql-request": "^7.1.0", diff --git a/src/controllers/internal/new-episode/index.ts b/src/controllers/internal/new-episode/index.ts index 128e0ce..10c2826 100644 --- a/src/controllers/internal/new-episode/index.ts +++ b/src/controllers/internal/new-episode/index.ts @@ -7,14 +7,12 @@ import { fetchEpisodeUrlFromAllProviders } from "~/controllers/episodes/getEpiso import { getAdminSdkCredentials } from "~/libs/gcloud/getAdminSdkCredentials"; import { sendFcmMessage } from "~/libs/gcloud/sendFcmMessage"; import { maybeScheduleNextAiringEpisode } from "~/libs/maybeScheduleNextAiringEpisode"; -import { verifyQstashHeader } from "~/libs/qstash/verifyQstashHeader"; import { getTokensSubscribedToTitle } from "~/models/token"; import { isWatchingTitle } from "~/models/watchStatus"; import type { Env } from "~/types/env"; import { AniListIdSchema, EpisodeNumberSchema, - ErrorResponse, SuccessResponse, } from "~/types/schema"; @@ -38,10 +36,6 @@ app.post( `Internal new episode route, aniListId: ${aniListId}, episodeNumber: ${episodeNumber}`, ); - if (!(await verifyQstashHeader(env(c, "workerd"), c.req))) { - return c.json(ErrorResponse, { status: 401 }); - } - if (!(await isWatchingTitle(env(c, "workerd"), aniListId))) { console.log(`Title ${aniListId} is no longer being watched`); return c.json( diff --git a/src/controllers/internal/upcoming-titles/index.ts b/src/controllers/internal/upcoming-titles/index.ts index ee44111..70bc807 100644 --- a/src/controllers/internal/upcoming-titles/index.ts +++ b/src/controllers/internal/upcoming-titles/index.ts @@ -4,19 +4,14 @@ import { DateTime } from "luxon"; import { getAdminSdkCredentials } from "~/libs/gcloud/getAdminSdkCredentials"; import { sendFcmMessage } from "~/libs/gcloud/sendFcmMessage"; -import { verifyQstashHeader } from "~/libs/qstash/verifyQstashHeader"; import type { Env } from "~/types/env"; -import { ErrorResponse, SuccessResponse } from "~/types/schema"; +import { SuccessResponse } from "~/types/schema"; import { getUpcomingTitlesFromAnilist } from "./anilist"; const app = new Hono(); app.post("/", async (c) => { - if (!(await verifyQstashHeader(env(c, "workerd"), c.req))) { - return c.json(ErrorResponse, { status: 401 }); - } - const titles = await getUpcomingTitlesFromAnilist( env(c, "workerd"), c.req, diff --git a/src/controllers/watch-status/index.spec.ts b/src/controllers/watch-status/index.spec.ts index 4aefa64..c278b26 100644 --- a/src/controllers/watch-status/index.spec.ts +++ b/src/controllers/watch-status/index.spec.ts @@ -7,11 +7,7 @@ import { getTestDb } from "~/libs/test/getTestDb"; import { getTestEnv } from "~/libs/test/getTestEnv"; import { resetTestDb } from "~/libs/test/resetTestDb"; import { server } from "~/mocks"; -import { - deviceTokensTable, - titleMessagesTable, - watchStatusTable, -} from "~/models/schema"; +import { deviceTokensTable, watchStatusTable } from "~/models/schema"; server.listen(); @@ -100,9 +96,6 @@ describe("requests the /watch-status route", () => { await db .insert(deviceTokensTable) .values({ deviceId: "123", token: "asd" }); - await db - .insert(titleMessagesTable) - .values({ titleId: 10, messageId: "123" }); const res = await app.request( "/watch-status", @@ -129,9 +122,6 @@ describe("requests the /watch-status route", () => { await db .insert(deviceTokensTable) .values({ deviceId: "123", token: "asd" }); - await db - .insert(titleMessagesTable) - .values({ titleId: -1, messageId: "123" }); const res = await app.request( "/watch-status", @@ -158,9 +148,6 @@ describe("requests the /watch-status route", () => { await db .insert(deviceTokensTable) .values({ deviceId: "123", token: "asd" }); - await db - .insert(titleMessagesTable) - .values({ titleId: 139518, messageId: "123" }); const res = await app.request("/watch-status", { method: "POST", diff --git a/src/controllers/watch-status/index.ts b/src/controllers/watch-status/index.ts index a45668a..9b237c7 100644 --- a/src/controllers/watch-status/index.ts +++ b/src/controllers/watch-status/index.ts @@ -1,12 +1,10 @@ import { OpenAPIHono, createRoute, z } from "@hono/zod-openapi"; -import { Client } from "@upstash/qstash"; import { env } from "hono/adapter"; -import { deleteMessageIdForTitle } from "~/libs/deleteMessageIdForTitle"; import { maybeScheduleNextAiringEpisode } from "~/libs/maybeScheduleNextAiringEpisode"; -import { verifyQstashHeader } from "~/libs/qstash/verifyQstashHeader"; -import { readEnvVariable } from "~/libs/readEnvVariable"; -import { deleteTitleMessage, getTitleMessage } from "~/models/titleMessages"; +import { buildNewEpisodeTaskId } from "~/libs/tasks/id"; +import { queueTask } from "~/libs/tasks/queueTask"; +import { removeTask } from "~/libs/tasks/removeTask"; import { setWatchStatus } from "~/models/watchStatus"; import type { Env } from "~/types/env"; import { @@ -75,13 +73,8 @@ app.openapi(route, async (c) => { isRetrying = false, } = await c.req.json(); const aniListToken = c.req.header("X-AniList-Token"); - const client = new Client({ token: readEnvVariable(c.env, "QSTASH_TOKEN") }); - if (isRetrying) { - if (!(await verifyQstashHeader(env(c, "workerd"), c.req))) { - return c.json(ErrorResponse, { status: 401 }); - } - } else { + if (!isRetrying) { try { const { wasAdded, wasDeleted } = await setWatchStatus( env(c, "workerd"), @@ -96,9 +89,10 @@ app.openapi(route, async (c) => { titleId, ); } else if (wasDeleted) { - await deleteMessageIdForTitle( + await removeTask( env(c, "workerd"), - titleId, + "new-episode", + buildNewEpisodeTaskId(titleId), ); } } catch (error) { @@ -118,12 +112,21 @@ app.openapi(route, async (c) => { console.error( new Error("Failed to update watch status on Anilist", { cause: error }), ); - client.publishJSON({ - url: c.req.url, - body: { deviceId, watchStatus, titleId, isRetrying: true }, - retries: 3, - delay: "1m", - }); + if (isRetrying) { + return c.json(ErrorResponse, { status: 500 }); + } + + await queueTask( + env(c, "workerd"), + "anilist", + { + deviceId, + watchStatus, + titleId, + isRetrying: true, + }, + { req: c.req, scheduleConfig: { delay: { minute: 1 } } }, + ); } return c.json(SuccessResponse, { status: 200 }); diff --git a/src/libs/deleteMessageIdForTitle.ts b/src/libs/deleteMessageIdForTitle.ts deleted file mode 100644 index f11221a..0000000 --- a/src/libs/deleteMessageIdForTitle.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { Client } from "@upstash/qstash"; - -import { deleteTitleMessage, getTitleMessage } from "~/models/titleMessages"; -import type { Env } from "~/types/env"; - -import { readEnvVariable } from "./readEnvVariable"; - -export async function deleteMessageIdForTitle(env: Env, titleId: number) { - const messageId = await getTitleMessage(env, titleId); - if (!messageId) { - return; - } - - try { - const client = new Client({ token: readEnvVariable(env, "QSTASH_TOKEN") }); - await client.messages.delete(messageId); - } catch (error) { - if (!error.message.includes("not found")) { - throw error; - } - } - - await deleteTitleMessage(env, titleId); -} diff --git a/src/libs/maybeScheduleNextAiringEpisode.ts b/src/libs/maybeScheduleNextAiringEpisode.ts index 65d0ee6..62cf56d 100644 --- a/src/libs/maybeScheduleNextAiringEpisode.ts +++ b/src/libs/maybeScheduleNextAiringEpisode.ts @@ -1,7 +1,5 @@ -import { Client } from "@upstash/qstash"; import type { HonoRequest } from "hono"; -import { setTitleMessage } from "~/models/titleMessages"; import { addUnreleasedTitle, removeUnreleasedTitle, @@ -9,8 +7,8 @@ import { import type { Env } from "~/types/env"; import { getNextEpisodeTimeUntilAiring } from "./anilist/getNextEpisodeAiringAt"; -import { deleteMessageIdForTitle } from "./deleteMessageIdForTitle"; import { getCurrentDomain } from "./getCurrentDomain"; +import { queueTask } from "./tasks/queueTask"; export async function maybeScheduleNextAiringEpisode( env: Env, @@ -26,24 +24,17 @@ export async function maybeScheduleNextAiringEpisode( if (!nextAiring) { if (status === "NOT_YET_RELEASED") { await addUnreleasedTitle(env, aniListId); - } else { - await deleteMessageIdForTitle(env, aniListId); } return; } const { airingAt, episode: nextEpisode } = nextAiring; - const client = new Client({ token: env.QSTASH_TOKEN }); - - const { messageId } = await client.publishJSON({ - url: `${domain}/internal/new-episode`, - body: { aniListId, episodeNumber: nextEpisode }, - retries: 3, - notBefore: airingAt, - }); - await Promise.allSettled([ - setTitleMessage(env, aniListId, messageId), - removeUnreleasedTitle(env, aniListId), - ]); + await queueTask( + env, + "new-episode", + { aniListId, episodeNumber: nextEpisode }, + { req, scheduleConfig: { epochTime: airingAt } }, + ); + await removeUnreleasedTitle(env, aniListId); } diff --git a/src/libs/qstash/verifyQstashHeader.ts b/src/libs/qstash/verifyQstashHeader.ts deleted file mode 100644 index 8dd5c75..0000000 --- a/src/libs/qstash/verifyQstashHeader.ts +++ /dev/null @@ -1,35 +0,0 @@ -import { Receiver, SignatureError } from "@upstash/qstash"; -import type { HonoRequest } from "hono"; - -import type { Env } from "~/types/env"; - -export async function verifyQstashHeader( - env: Env, - req: HonoRequest, -): Promise { - const signature = req.header("Upstash-Signature"); - if (!signature) { - return Promise.resolve(false); - } - - try { - const receiver = new Receiver({ - currentSigningKey: env.QSTASH_CURRENT_SIGNING_KEY, - nextSigningKey: env.QSTASH_NEXT_SIGNING_KEY, - }); - - return await receiver.verify({ - body: await req.text(), - signature, - url: req.url.startsWith("http://localhost") - ? req.url - : req.url.replace("http://", "https://"), - }); - } catch (error) { - if (error instanceof SignatureError) { - return Promise.resolve(false); - } - - throw error; - } -} diff --git a/src/libs/tasks/id.ts b/src/libs/tasks/id.ts new file mode 100644 index 0000000..8810fb5 --- /dev/null +++ b/src/libs/tasks/id.ts @@ -0,0 +1,7 @@ +export function buildNewEpisodeTaskId(aniListId: number) { + return `${aniListId}`; +} + +export function buildAnilistRetryTaskId(deviceId: string, titleId: number) { + return `${deviceId}-${titleId}`; +} diff --git a/src/libs/tasks/queueName.ts b/src/libs/tasks/queueName.ts new file mode 100644 index 0000000..0e61e25 --- /dev/null +++ b/src/libs/tasks/queueName.ts @@ -0,0 +1 @@ +export type QueueName = "anilist" | "new-episode"; diff --git a/src/libs/tasks/queueTask.ts b/src/libs/tasks/queueTask.ts new file mode 100644 index 0000000..691a738 --- /dev/null +++ b/src/libs/tasks/queueTask.ts @@ -0,0 +1,130 @@ +import type { HonoRequest } from "hono"; +import { DateTime, type DurationLike } from "luxon"; + +import type { Env } from "~/types/env"; +import type { WatchStatus } from "~/types/title/watchStatus"; + +import { getAdminSdkCredentials } from "../gcloud/getAdminSdkCredentials"; +import { getGoogleAuthToken } from "../gcloud/getGoogleAuthToken"; +import { getCurrentDomain } from "../getCurrentDomain"; +import { buildAnilistRetryTaskId, buildNewEpisodeTaskId } from "./id"; +import type { QueueName } from "./queueName"; + +type QueueBody = { + anilist: { + deviceId: string; + watchStatus: WatchStatus | null; + titleId: number; + isRetrying: true; + }; + "new-episode": { aniListId: number; episodeNumber: number }; +}; + +type ScheduleConfig = + | { delay: DurationLike; epochTime: never } + | { epochTime: number; delay: never }; + +interface QueueTaskOptionalArgs { + taskId?: string; + scheduleConfig?: ScheduleConfig; + req?: HonoRequest; +} + +export async function queueTask( + env: Env, + queueName: QueueName, + body: QueueBody[QueueName], + { taskId, scheduleConfig, req }: QueueTaskOptionalArgs = {}, +) { + const domain = req + ? getCurrentDomain(req) + : "https://aniplay-v2.rururu.workers.dev"; + if (!domain) { + console.log("Skipping queue task due to local domain", queueName, body); + return; + } + + const adminSdkCredentials = getAdminSdkCredentials(env); + const { projectId } = adminSdkCredentials; + + await fetch( + `https://content-cloudtasks.googleapis.com/v2/projects/${projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks?alt=json`, + { + headers: { + Authorization: `Bearer ${await getGoogleAuthToken(adminSdkCredentials)}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + task: buildTask( + projectId, + queueName, + taskId, + scheduleConfig, + domain, + body, + req.header(), + ), + }), + method: "POST", + }, + ); +} + +function buildTask( + projectId: string, + queueName: QueueName, + taskId: string | undefined, + scheduleConfig: ScheduleConfig | undefined, + domain: string, + body: QueueBody[QueueName], + headers: Record, +) { + let scheduleTime: string | undefined; + if (scheduleConfig) { + const { delay, epochTime } = scheduleConfig; + if (epochTime) { + scheduleTime = DateTime.fromSeconds(epochTime).toUTC().toISO(); + } else if (delay) { + scheduleTime = DateTime.now().plus(delay).toUTC().toISO(); + } + } + + switch (queueName) { + case "new-episode": + const { aniListId } = body as QueueBody["new-episode"]; + taskId ??= buildNewEpisodeTaskId(aniListId); + + return { + name: `projects/${projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks/${taskId}`, + scheduleTime, + httpRequest: { + url: `${domain}/internal/new-episode`, + httpMethod: "POST", + body: JSON.stringify(body), + headers: { + "Content-Type": "application/json", + "X-Anilist-Token": headers["X-Anilist-Token"], + }, + }, + }; + case "anilist": + const { deviceId, titleId } = body as QueueBody["anilist"]; + taskId ??= buildAnilistRetryTaskId(deviceId, titleId); + + return { + name: `projects/${projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks/${taskId}`, + scheduleTime, + httpRequest: { + url: `${domain}/watch-status`, + httpMethod: "POST", + body: JSON.stringify(body), + headers: { + "Content-Type": "application/json", + "X-Anilist-Token": headers["X-Anilist-Token"], + }, + }, + }; + default: + throw new Error(`Unknown queue name: ${queueName}`); + } +} diff --git a/src/libs/tasks/removeTask.ts b/src/libs/tasks/removeTask.ts new file mode 100644 index 0000000..1211dd9 --- /dev/null +++ b/src/libs/tasks/removeTask.ts @@ -0,0 +1,24 @@ +import type { Env } from "~/types/env"; + +import { getAdminSdkCredentials } from "../gcloud/getAdminSdkCredentials"; +import { getGoogleAuthToken } from "../gcloud/getGoogleAuthToken"; +import type { QueueName } from "./queueName"; + +export async function removeTask( + env: Env, + queueName: QueueName, + taskId: string, +) { + const adminSdkCredentials = getAdminSdkCredentials(env); + const { projectId } = adminSdkCredentials; + + await fetch( + `https://content-cloudtasks.googleapis.com/v2/projects/${projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks/${taskId}`, + { + headers: { + Authorization: `Bearer ${await getGoogleAuthToken(adminSdkCredentials)}`, + }, + method: "DELETE", + }, + ); +} diff --git a/src/mocks/qstash.ts b/src/mocks/qstash.ts deleted file mode 100644 index 5141d06..0000000 --- a/src/mocks/qstash.ts +++ /dev/null @@ -1,23 +0,0 @@ -import { SignatureError } from "@upstash/qstash"; - -import { mock } from "bun:test"; - -class MockQstashMessages { - delete = mock(); -} - -class MockQstashClient { - batchJSON = mock(); - publishJSON = mock().mockResolvedValue({ messageId: "123" }); - messages = new MockQstashMessages(); -} - -class MockQstashReceiver { - verify = mock(); -} - -mock.module("@upstash/qstash", () => ({ - Client: MockQstashClient, - Receiver: MockQstashReceiver, - SignatureError, -})); diff --git a/src/models/schema.ts b/src/models/schema.ts index a67eb1f..101fca4 100644 --- a/src/models/schema.ts +++ b/src/models/schema.ts @@ -36,11 +36,6 @@ export const keyValueTable = sqliteTable("key_value", { value: text("value").notNull(), }); -export const titleMessagesTable = sqliteTable("title_messages", { - titleId: integer("title_id").notNull().primaryKey(), - messageId: text("message_id").notNull(), -}); - /** Used to keep track of titles that haven't been released yet and the time when the first episode will be released is unknown */ export const unreleasedTitlesTable = sqliteTable("unreleased_titles", { titleId: integer("title_id").notNull().primaryKey(), @@ -51,6 +46,5 @@ export const tables = [ watchStatusTable, deviceTokensTable, keyValueTable, - titleMessagesTable, unreleasedTitlesTable, ]; diff --git a/src/models/titleMessages.ts b/src/models/titleMessages.ts deleted file mode 100644 index 9d6604f..0000000 --- a/src/models/titleMessages.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { eq } from "drizzle-orm"; - -import type { Env } from "~/types/env"; - -import { getDb } from "./db"; -import { titleMessagesTable } from "./schema"; - -export function setTitleMessage(env: Env, titleId: number, messageId: string) { - return getDb(env) - .insert(titleMessagesTable) - .values({ titleId, messageId }) - .onConflictDoUpdate({ - set: { messageId }, - target: [titleMessagesTable.titleId], - }); -} - -export function getTitleMessage( - env: Env, - titleId: number, -): Promise { - return getDb(env) - .select() - .from(titleMessagesTable) - .where(eq(titleMessagesTable.titleId, titleId)) - .then((results) => results[0]?.messageId); -} - -export function deleteTitleMessage(env: Env, titleId: number) { - return getDb(env) - .delete(titleMessagesTable) - .where(eq(titleMessagesTable.titleId, titleId)) - .run(); -} diff --git a/src/scripts/initializeNextEpisodeQueue.ts b/src/scripts/initializeNextEpisodeQueue.ts index 0fdd177..31a83fa 100644 --- a/src/scripts/initializeNextEpisodeQueue.ts +++ b/src/scripts/initializeNextEpisodeQueue.ts @@ -1,6 +1,5 @@ -import { Client } from "@upstash/qstash"; - import { fetchTitleFromAnilist } from "~/libs/anilist/getTitle"; +import { queueTask } from "~/libs/tasks/queueTask"; import { getDb } from "~/models/db"; import { watchStatusTable } from "~/models/schema"; @@ -85,15 +84,10 @@ async function triggerNextEpisodeRoute(titleId: number) { return success; }); } else { - return new Client({ token: process.env.QSTASH_TOKEN }) - .publishJSON({ - url: "https://aniplay-v2.rururu.workers.dev/internal/new-episode", - body: { - aniListId: titleId, - episodeNumber: mostRecentEpisodeNumber, - }, - retries: 3, - }) + return queueTask(process.env, "new-episode", { + aniListId: titleId, + episodeNumber: mostRecentEpisodeNumber, + }) .then(() => true) .catch((error) => { console.error( @@ -117,16 +111,15 @@ async function triggerNextEpisodeRoute(titleId: number) { } } - return new Client({ token: process.env.QSTASH_TOKEN }) - .publishJSON({ - url: "https://aniplay-v2.rururu.workers.dev/internal/new-episode", - body: { - aniListId: titleId, - episodeNumber: title.nextAiringEpisode.episode, - }, - retries: 3, - notBefore: title.nextAiringEpisode.airingAt, - }) + return queueTask( + process.env, + "new-episode", + { + aniListId: titleId, + episodeNumber: title.nextAiringEpisode.episode, + }, + { scheduleConfig: { epochTime: title.nextAiringEpisode.airingAt } }, + ) .then(() => true) .catch((error) => { console.error(