From 336701a84b457907858f3fa11acce69cbb7dc264 Mon Sep 17 00:00:00 2001 From: Rushil Perera Date: Mon, 9 Sep 2024 03:53:34 -0500 Subject: [PATCH] feat: schedule next airing episode happens when new title is saved, or when new episode internal route is run successfully --- drizzle/0006_sticky_donald_blake.sql | 5 + drizzle/meta/0006_snapshot.json | 150 ++++++++++++++++++ drizzle/meta/_journal.json | 7 + src/controllers/internal/new-episode/index.ts | 23 ++- src/controllers/maybeUpdateLastConnectedAt.ts | 1 - src/controllers/watch-status/index.spec.ts | 15 +- src/controllers/watch-status/index.ts | 37 +++-- src/libs/anilist/getNextEpisodeAiringAt.ts | 28 ++++ src/libs/maybeScheduleNextAiringEpisode.ts | 32 ++++ src/mocks/anilist/nextAiringEpisode.ts | 16 ++ src/mocks/handlers.ts | 2 + src/mocks/qstash.ts | 7 +- src/models/schema.ts | 18 ++- src/models/titleMessages.ts | 31 ++++ 14 files changed, 353 insertions(+), 19 deletions(-) create mode 100644 drizzle/0006_sticky_donald_blake.sql create mode 100644 drizzle/meta/0006_snapshot.json create mode 100644 src/libs/anilist/getNextEpisodeAiringAt.ts create mode 100644 src/libs/maybeScheduleNextAiringEpisode.ts create mode 100644 src/mocks/anilist/nextAiringEpisode.ts create mode 100644 src/models/titleMessages.ts diff --git a/drizzle/0006_sticky_donald_blake.sql b/drizzle/0006_sticky_donald_blake.sql new file mode 100644 index 0000000..19a118d --- /dev/null +++ b/drizzle/0006_sticky_donald_blake.sql @@ -0,0 +1,5 @@ +CREATE TABLE `title_messages` ( + `title_id` integer NOT NULL, + `message_id` text NOT NULL, + PRIMARY KEY(`message_id`, `title_id`) +); diff --git a/drizzle/meta/0006_snapshot.json b/drizzle/meta/0006_snapshot.json new file mode 100644 index 0000000..1e7fe2b --- /dev/null +++ b/drizzle/meta/0006_snapshot.json @@ -0,0 +1,150 @@ +{ + "version": "6", + "dialect": "sqlite", + "id": "779bdeb8-3d3b-4429-8260-2ef628d0baa0", + "prevId": "bca1f597-6db1-4bf8-ab6b-a95c10d3f6a7", + "tables": { + "device_tokens": { + "name": "device_tokens", + "columns": { + "device_id": { + "name": "device_id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "token": { + "name": "token", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "username": { + "name": "username", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "last_connected_at": { + "name": "last_connected_at", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": "(CURRENT_TIMESTAMP)" + } + }, + "indexes": { + "device_tokens_token_unique": { + "name": "device_tokens_token_unique", + "columns": ["token"], + "isUnique": true + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {} + }, + "key_value": { + "name": "key_value", + "columns": { + "key": { + "name": "key", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "value": { + "name": "value", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {} + }, + "title_messages": { + "name": "title_messages", + "columns": { + "title_id": { + "name": "title_id", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "message_id": { + "name": "message_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": { + "title_messages_title_id_message_id_pk": { + "columns": ["message_id", "title_id"], + "name": "title_messages_title_id_message_id_pk" + } + }, + "uniqueConstraints": {} + }, + "watch_status": { + "name": "watch_status", + "columns": { + "device_id": { + "name": "device_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "title_id": { + "name": "title_id", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": { + "watch_status_device_id_device_tokens_device_id_fk": { + "name": "watch_status_device_id_device_tokens_device_id_fk", + "tableFrom": "watch_status", + "tableTo": "device_tokens", + "columnsFrom": ["device_id"], + "columnsTo": ["device_id"], + "onDelete": "no action", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "watch_status_device_id_title_id_pk": { + "columns": ["device_id", "title_id"], + "name": "watch_status_device_id_title_id_pk" + } + }, + "uniqueConstraints": {} + } + }, + "enums": {}, + "_meta": { + "schemas": {}, + "tables": {}, + "columns": {} + }, + "internal": { + "indexes": {} + } +} diff --git a/drizzle/meta/_journal.json b/drizzle/meta/_journal.json index bf9643f..876c11d 100644 --- a/drizzle/meta/_journal.json +++ b/drizzle/meta/_journal.json @@ -43,6 +43,13 @@ "when": 1725293569918, "tag": "0005_shiny_scarecrow", "breakpoints": true + }, + { + "idx": 6, + "version": "6", + "when": 1725836922065, + "tag": "0006_sticky_donald_blake", + "breakpoints": true } ] } diff --git a/src/controllers/internal/new-episode/index.ts b/src/controllers/internal/new-episode/index.ts index 8649db7..b17c289 100644 --- a/src/controllers/internal/new-episode/index.ts +++ b/src/controllers/internal/new-episode/index.ts @@ -1,14 +1,15 @@ import { zValidator } from "@hono/zod-validator"; -import { Hono } from "hono"; +import { Client } from "@upstash/qstash"; +import { Hono, type HonoRequest } from "hono"; import { env } from "hono/adapter"; import mapKeys from "lodash.mapkeys"; -import { DateTime } from "luxon"; import { z } from "zod"; import { Case, changeStringCase } from "~/libs/changeStringCase"; import type { AdminSdkCredentials } from "~/libs/fcm/getGoogleAuthToken"; import { sendFcmMessage } from "~/libs/fcm/sendFcmMessage"; import { getCurrentDomain } from "~/libs/getCurrentDomain"; +import { maybeScheduleNextAiringEpisode } from "~/libs/maybeScheduleNextAiringEpisode"; import { verifyQstashHeader } from "~/libs/qstash/verifyQstashHeader"; import { readEnvVariable } from "~/libs/readEnvVariable"; import { getTokensSubscribedToTitle } from "~/models/token"; @@ -48,6 +49,7 @@ app.post( `${domain}/episodes/${aniListId}`, ).then((res) => res.json()); if (!success) { + await scheduleRetry(readEnvVariable(c.env, "QSTASH_TOKEN"), c.req); return c.json(ErrorResponse, { status: 500 }); } @@ -56,6 +58,7 @@ app.post( (episode) => episode.number === episodeNumber, ); if (!episode) { + await scheduleRetry(readEnvVariable(c.env, "QSTASH_TOKEN"), c.req); return c.json(ErrorResponse, { status: 404 }); } @@ -73,6 +76,7 @@ app.post( }, ).then((res) => res.json()); if (!fetchUrlSuccess) { + await scheduleRetry(readEnvVariable(c.env, "QSTASH_TOKEN"), c.req); return c.json(ErrorResponse, { status: 500 }); } @@ -103,8 +107,23 @@ app.post( }), ); + await maybeScheduleNextAiringEpisode( + env(c, "workerd"), + c.req, + aniListId, + ); + return c.json(SuccessResponse, 200); }, ); +async function scheduleRetry(qstashToken: string, req: HonoRequest) { + return new Client({ token: qstashToken }).publishJSON({ + body: await req.text(), + url: req.url, + retries: 0, + delay: "1h", + }); +} + export default app; diff --git a/src/controllers/maybeUpdateLastConnectedAt.ts b/src/controllers/maybeUpdateLastConnectedAt.ts index 42d1f66..e2ce8dd 100644 --- a/src/controllers/maybeUpdateLastConnectedAt.ts +++ b/src/controllers/maybeUpdateLastConnectedAt.ts @@ -4,7 +4,6 @@ import { updateDeviceLastConnectedAt } from "~/models/token"; export const maybeUpdateLastConnectedAt = createMiddleware(async (c, next) => { const deviceId = await c.req.header("X-Aniplay-Device-Id"); - console.log("deviceId", deviceId); if (!deviceId) { return next(); } diff --git a/src/controllers/watch-status/index.spec.ts b/src/controllers/watch-status/index.spec.ts index c278b26..4aefa64 100644 --- a/src/controllers/watch-status/index.spec.ts +++ b/src/controllers/watch-status/index.spec.ts @@ -7,7 +7,11 @@ import { getTestDb } from "~/libs/test/getTestDb"; import { getTestEnv } from "~/libs/test/getTestEnv"; import { resetTestDb } from "~/libs/test/resetTestDb"; import { server } from "~/mocks"; -import { deviceTokensTable, watchStatusTable } from "~/models/schema"; +import { + deviceTokensTable, + titleMessagesTable, + watchStatusTable, +} from "~/models/schema"; server.listen(); @@ -96,6 +100,9 @@ describe("requests the /watch-status route", () => { await db .insert(deviceTokensTable) .values({ deviceId: "123", token: "asd" }); + await db + .insert(titleMessagesTable) + .values({ titleId: 10, messageId: "123" }); const res = await app.request( "/watch-status", @@ -122,6 +129,9 @@ describe("requests the /watch-status route", () => { await db .insert(deviceTokensTable) .values({ deviceId: "123", token: "asd" }); + await db + .insert(titleMessagesTable) + .values({ titleId: -1, messageId: "123" }); const res = await app.request( "/watch-status", @@ -148,6 +158,9 @@ describe("requests the /watch-status route", () => { await db .insert(deviceTokensTable) .values({ deviceId: "123", token: "asd" }); + await db + .insert(titleMessagesTable) + .values({ titleId: 139518, messageId: "123" }); const res = await app.request("/watch-status", { method: "POST", diff --git a/src/controllers/watch-status/index.ts b/src/controllers/watch-status/index.ts index 5fadcc3..6230ea1 100644 --- a/src/controllers/watch-status/index.ts +++ b/src/controllers/watch-status/index.ts @@ -1,8 +1,11 @@ import { OpenAPIHono, createRoute, z } from "@hono/zod-openapi"; +import { Client } from "@upstash/qstash"; import { env } from "hono/adapter"; +import { maybeScheduleNextAiringEpisode } from "~/libs/maybeScheduleNextAiringEpisode"; import { verifyQstashHeader } from "~/libs/qstash/verifyQstashHeader"; import { readEnvVariable } from "~/libs/readEnvVariable"; +import { deleteTitleMessage, getTitleMessage } from "~/models/titleMessages"; import { setWatchStatus } from "~/models/watchStatus"; import type { Env } from "~/types/env"; import { @@ -71,6 +74,7 @@ app.openapi(route, async (c) => { isRetrying = false, } = await c.req.json(); const aniListToken = c.req.header("X-AniList-Token"); + const client = new Client({ token: readEnvVariable(c.env, "QSTASH_TOKEN") }); if (isRetrying) { if (!(await verifyQstashHeader(env(c, "workerd"), c.req))) { @@ -84,6 +88,20 @@ app.openapi(route, async (c) => { Number(titleId), watchStatus, ); + if (wasAdded) { + await maybeScheduleNextAiringEpisode( + env(c, "workerd"), + c.req, + titleId, + ); + } else if (wasDeleted) { + const messageId = await getTitleMessage( + env(c, "workerd"), + titleId, + ); + await client.messages.delete(messageId); + await deleteTitleMessage(env(c, "workerd"), titleId); + } } catch (error) { console.error(new Error("Error setting watch status", { cause: error })); console.error(error); @@ -101,19 +119,12 @@ app.openapi(route, async (c) => { console.error( new Error("Failed to update watch status on Anilist", { cause: error }), ); - await import("@upstash/qstash") - .then( - ({ Client }) => - new Client({ token: readEnvVariable(c.env, "QSTASH_TOKEN") }), - ) - .then((client) => - client.publishJSON({ - url: c.req.url, - body: { deviceId, watchStatus, titleId, isRetrying: true }, - retries: 0, - delay: 60, - }), - ); + client.publishJSON({ + url: c.req.url, + body: { deviceId, watchStatus, titleId, isRetrying: true }, + retries: 0, + delay: 60, + }); } return c.json(SuccessResponse, { status: 200 }); diff --git a/src/libs/anilist/getNextEpisodeAiringAt.ts b/src/libs/anilist/getNextEpisodeAiringAt.ts new file mode 100644 index 0000000..25bf04a --- /dev/null +++ b/src/libs/anilist/getNextEpisodeAiringAt.ts @@ -0,0 +1,28 @@ +import { graphql } from "gql.tada"; +import { GraphQLClient } from "graphql-request"; + +const GetNextEpisodeAiringAtQuery = graphql(` + query GetNextEpisodeAiringAt($id: Int!) { + Media(id: $id) { + nextAiringEpisode { + episode + timeUntilAiring + } + } + } +`); + +export function getNextEpisodeTimeUntilAiring(aniListId: number) { + const client = new GraphQLClient("https://graphql.anilist.co/"); + + return client + .request(GetNextEpisodeAiringAtQuery, { id: aniListId }) + .then((data) => { + const nextAiring = data!.Media!.nextAiringEpisode; + if (!nextAiring) { + return null; + } + + return nextAiring; + }); +} diff --git a/src/libs/maybeScheduleNextAiringEpisode.ts b/src/libs/maybeScheduleNextAiringEpisode.ts new file mode 100644 index 0000000..a323c86 --- /dev/null +++ b/src/libs/maybeScheduleNextAiringEpisode.ts @@ -0,0 +1,32 @@ +import { Client } from "@upstash/qstash"; +import type { HonoRequest } from "hono"; + +import { setTitleMessage } from "~/models/titleMessages"; +import type { Env } from "~/types/env"; + +import { getNextEpisodeTimeUntilAiring } from "./anilist/getNextEpisodeAiringAt"; +import { getCurrentDomain } from "./getCurrentDomain"; + +export async function maybeScheduleNextAiringEpisode( + env: Env, + req: HonoRequest, + aniListId: number, +) { + const nextAiring = await getNextEpisodeTimeUntilAiring(aniListId); + if (!nextAiring) { + return; + } + + const { timeUntilAiring, episode: nextEpisode } = nextAiring; + const client = new Client({ token: env.QSTASH_TOKEN }); + + const domain = getCurrentDomain(req); + const { messageId } = await client.publishJSON({ + url: `${domain}/internal/new-episode`, + body: { aniListId, episode: nextEpisode }, + retries: 0, + delay: timeUntilAiring, + contentBasedDeduplication: true, + }); + await setTitleMessage(env, aniListId, messageId); +} diff --git a/src/mocks/anilist/nextAiringEpisode.ts b/src/mocks/anilist/nextAiringEpisode.ts new file mode 100644 index 0000000..4f86683 --- /dev/null +++ b/src/mocks/anilist/nextAiringEpisode.ts @@ -0,0 +1,16 @@ +import { HttpResponse, graphql } from "msw"; + +export function getAnilistNextAiringEpisode() { + return graphql.query( + "GetNextEpisodeAiringAt", + ({ variables: { titleId } }) => { + return HttpResponse.json({ + data: { + Media: { + nextAiringEpisode: null, + }, + }, + }); + }, + ); +} diff --git a/src/mocks/handlers.ts b/src/mocks/handlers.ts index 913d9bc..7213149 100644 --- a/src/mocks/handlers.ts +++ b/src/mocks/handlers.ts @@ -3,6 +3,7 @@ import { getAnifySources } from "./anify/sources"; import { getAnifyTitle } from "./anify/title"; import { deleteAnilistMediaListEntry } from "./anilist/deleteMediaListEntry"; import { getAnilistMediaListEntry } from "./anilist/mediaListEntry"; +import { getAnilistNextAiringEpisode } from "./anilist/nextAiringEpisode"; import { getAnilistSearchResults } from "./anilist/search"; import { getAnilistTitle } from "./anilist/title"; import { updateAnilistWatchStatus } from "./anilist/updateWatchStatus"; @@ -14,6 +15,7 @@ import { mockFcmMessageResponse } from "./fcm"; export const handlers = [ deleteAnilistMediaListEntry(), getAnilistMediaListEntry(), + getAnilistNextAiringEpisode(), getAnilistSearchResults(), getAnilistTitle(), updateAnilistWatchStatus(), diff --git a/src/mocks/qstash.ts b/src/mocks/qstash.ts index aed561f..5141d06 100644 --- a/src/mocks/qstash.ts +++ b/src/mocks/qstash.ts @@ -2,9 +2,14 @@ import { SignatureError } from "@upstash/qstash"; import { mock } from "bun:test"; +class MockQstashMessages { + delete = mock(); +} + class MockQstashClient { batchJSON = mock(); - publishJSON = mock(); + publishJSON = mock().mockResolvedValue({ messageId: "123" }); + messages = new MockQstashMessages(); } class MockQstashReceiver { diff --git a/src/models/schema.ts b/src/models/schema.ts index b5f141f..8439da5 100644 --- a/src/models/schema.ts +++ b/src/models/schema.ts @@ -34,4 +34,20 @@ export const keyValueTable = sqliteTable("key_value", { value: text("value").notNull(), }); -export const tables = [watchStatusTable, deviceTokensTable, keyValueTable]; +export const titleMessagesTable = sqliteTable( + "title_messages", + { + titleId: integer("title_id").notNull(), + messageId: text("message_id").notNull(), + }, + (table) => ({ + pk: primaryKey({ columns: [table.titleId, table.messageId] }), + }), +); + +export const tables = [ + watchStatusTable, + deviceTokensTable, + keyValueTable, + titleMessagesTable, +]; diff --git a/src/models/titleMessages.ts b/src/models/titleMessages.ts new file mode 100644 index 0000000..120ba3a --- /dev/null +++ b/src/models/titleMessages.ts @@ -0,0 +1,31 @@ +import { eq } from "drizzle-orm"; + +import type { Env } from "~/types/env"; + +import { getDb } from "./db"; +import { titleMessagesTable } from "./schema"; + +export function setTitleMessage(env: Env, titleId: number, messageId: string) { + return getDb(env) + .insert(titleMessagesTable) + .values({ titleId, messageId }) + .onConflictDoUpdate({ + set: { messageId }, + target: [titleMessagesTable.titleId], + }); +} + +export function getTitleMessage(env: Env, titleId: number) { + return getDb(env) + .select() + .from(titleMessagesTable) + .where(eq(titleMessagesTable.titleId, titleId)) + .then((results) => results[0].messageId); +} + +export function deleteTitleMessage(env: Env, titleId: number) { + return getDb(env) + .delete(titleMessagesTable) + .where(eq(titleMessagesTable.titleId, titleId)) + .run(); +}