refactor: replace qstash with Google Cloud Tasks

This commit is contained in:
2024-10-05 14:06:57 -04:00
parent 85712ff0cf
commit 44ffa703b9
21 changed files with 354 additions and 207 deletions

View File

@@ -1,24 +0,0 @@
import { Client } from "@upstash/qstash";
import { deleteTitleMessage, getTitleMessage } from "~/models/titleMessages";
import type { Env } from "~/types/env";
import { readEnvVariable } from "./readEnvVariable";
export async function deleteMessageIdForTitle(env: Env, titleId: number) {
const messageId = await getTitleMessage(env, titleId);
if (!messageId) {
return;
}
try {
const client = new Client({ token: readEnvVariable(env, "QSTASH_TOKEN") });
await client.messages.delete(messageId);
} catch (error) {
if (!error.message.includes("not found")) {
throw error;
}
}
await deleteTitleMessage(env, titleId);
}

View File

@@ -1,7 +1,5 @@
import { Client } from "@upstash/qstash";
import type { HonoRequest } from "hono";
import { setTitleMessage } from "~/models/titleMessages";
import {
addUnreleasedTitle,
removeUnreleasedTitle,
@@ -9,8 +7,8 @@ import {
import type { Env } from "~/types/env";
import { getNextEpisodeTimeUntilAiring } from "./anilist/getNextEpisodeAiringAt";
import { deleteMessageIdForTitle } from "./deleteMessageIdForTitle";
import { getCurrentDomain } from "./getCurrentDomain";
import { queueTask } from "./tasks/queueTask";
export async function maybeScheduleNextAiringEpisode(
env: Env,
@@ -26,24 +24,17 @@ export async function maybeScheduleNextAiringEpisode(
if (!nextAiring) {
if (status === "NOT_YET_RELEASED") {
await addUnreleasedTitle(env, aniListId);
} else {
await deleteMessageIdForTitle(env, aniListId);
}
return;
}
const { airingAt, episode: nextEpisode } = nextAiring;
const client = new Client({ token: env.QSTASH_TOKEN });
const { messageId } = await client.publishJSON({
url: `${domain}/internal/new-episode`,
body: { aniListId, episodeNumber: nextEpisode },
retries: 3,
notBefore: airingAt,
});
await Promise.allSettled([
setTitleMessage(env, aniListId, messageId),
removeUnreleasedTitle(env, aniListId),
]);
await queueTask(
env,
"new-episode",
{ aniListId, episodeNumber: nextEpisode },
{ req, scheduleConfig: { epochTime: airingAt } },
);
await removeUnreleasedTitle(env, aniListId);
}

View File

@@ -1,35 +0,0 @@
import { Receiver, SignatureError } from "@upstash/qstash";
import type { HonoRequest } from "hono";
import type { Env } from "~/types/env";
export async function verifyQstashHeader(
env: Env,
req: HonoRequest,
): Promise<boolean> {
const signature = req.header("Upstash-Signature");
if (!signature) {
return Promise.resolve(false);
}
try {
const receiver = new Receiver({
currentSigningKey: env.QSTASH_CURRENT_SIGNING_KEY,
nextSigningKey: env.QSTASH_NEXT_SIGNING_KEY,
});
return await receiver.verify({
body: await req.text(),
signature,
url: req.url.startsWith("http://localhost")
? req.url
: req.url.replace("http://", "https://"),
});
} catch (error) {
if (error instanceof SignatureError) {
return Promise.resolve(false);
}
throw error;
}
}

7
src/libs/tasks/id.ts Normal file
View File

@@ -0,0 +1,7 @@
export function buildNewEpisodeTaskId(aniListId: number) {
return `${aniListId}`;
}
export function buildAnilistRetryTaskId(deviceId: string, titleId: number) {
return `${deviceId}-${titleId}`;
}

View File

@@ -0,0 +1 @@
export type QueueName = "anilist" | "new-episode";

130
src/libs/tasks/queueTask.ts Normal file
View File

@@ -0,0 +1,130 @@
import type { HonoRequest } from "hono";
import { DateTime, type DurationLike } from "luxon";
import type { Env } from "~/types/env";
import type { WatchStatus } from "~/types/title/watchStatus";
import { getAdminSdkCredentials } from "../gcloud/getAdminSdkCredentials";
import { getGoogleAuthToken } from "../gcloud/getGoogleAuthToken";
import { getCurrentDomain } from "../getCurrentDomain";
import { buildAnilistRetryTaskId, buildNewEpisodeTaskId } from "./id";
import type { QueueName } from "./queueName";
type QueueBody = {
anilist: {
deviceId: string;
watchStatus: WatchStatus | null;
titleId: number;
isRetrying: true;
};
"new-episode": { aniListId: number; episodeNumber: number };
};
type ScheduleConfig =
| { delay: DurationLike; epochTime: never }
| { epochTime: number; delay: never };
interface QueueTaskOptionalArgs {
taskId?: string;
scheduleConfig?: ScheduleConfig;
req?: HonoRequest;
}
export async function queueTask(
env: Env,
queueName: QueueName,
body: QueueBody[QueueName],
{ taskId, scheduleConfig, req }: QueueTaskOptionalArgs = {},
) {
const domain = req
? getCurrentDomain(req)
: "https://aniplay-v2.rururu.workers.dev";
if (!domain) {
console.log("Skipping queue task due to local domain", queueName, body);
return;
}
const adminSdkCredentials = getAdminSdkCredentials(env);
const { projectId } = adminSdkCredentials;
await fetch(
`https://content-cloudtasks.googleapis.com/v2/projects/${projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks?alt=json`,
{
headers: {
Authorization: `Bearer ${await getGoogleAuthToken(adminSdkCredentials)}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
task: buildTask(
projectId,
queueName,
taskId,
scheduleConfig,
domain,
body,
req.header(),
),
}),
method: "POST",
},
);
}
function buildTask(
projectId: string,
queueName: QueueName,
taskId: string | undefined,
scheduleConfig: ScheduleConfig | undefined,
domain: string,
body: QueueBody[QueueName],
headers: Record<string, string>,
) {
let scheduleTime: string | undefined;
if (scheduleConfig) {
const { delay, epochTime } = scheduleConfig;
if (epochTime) {
scheduleTime = DateTime.fromSeconds(epochTime).toUTC().toISO();
} else if (delay) {
scheduleTime = DateTime.now().plus(delay).toUTC().toISO();
}
}
switch (queueName) {
case "new-episode":
const { aniListId } = body as QueueBody["new-episode"];
taskId ??= buildNewEpisodeTaskId(aniListId);
return {
name: `projects/${projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks/${taskId}`,
scheduleTime,
httpRequest: {
url: `${domain}/internal/new-episode`,
httpMethod: "POST",
body: JSON.stringify(body),
headers: {
"Content-Type": "application/json",
"X-Anilist-Token": headers["X-Anilist-Token"],
},
},
};
case "anilist":
const { deviceId, titleId } = body as QueueBody["anilist"];
taskId ??= buildAnilistRetryTaskId(deviceId, titleId);
return {
name: `projects/${projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks/${taskId}`,
scheduleTime,
httpRequest: {
url: `${domain}/watch-status`,
httpMethod: "POST",
body: JSON.stringify(body),
headers: {
"Content-Type": "application/json",
"X-Anilist-Token": headers["X-Anilist-Token"],
},
},
};
default:
throw new Error(`Unknown queue name: ${queueName}`);
}
}

View File

@@ -0,0 +1,24 @@
import type { Env } from "~/types/env";
import { getAdminSdkCredentials } from "../gcloud/getAdminSdkCredentials";
import { getGoogleAuthToken } from "../gcloud/getGoogleAuthToken";
import type { QueueName } from "./queueName";
export async function removeTask(
env: Env,
queueName: QueueName,
taskId: string,
) {
const adminSdkCredentials = getAdminSdkCredentials(env);
const { projectId } = adminSdkCredentials;
await fetch(
`https://content-cloudtasks.googleapis.com/v2/projects/${projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks/${taskId}`,
{
headers: {
Authorization: `Bearer ${await getGoogleAuthToken(adminSdkCredentials)}`,
},
method: "DELETE",
},
);
}