feat: schedule next airing episode
happens when new title is saved, or when new episode internal route is run successfully
This commit is contained in:
5
drizzle/0006_sticky_donald_blake.sql
Normal file
5
drizzle/0006_sticky_donald_blake.sql
Normal file
@@ -0,0 +1,5 @@
|
||||
CREATE TABLE `title_messages` (
|
||||
`title_id` integer NOT NULL,
|
||||
`message_id` text NOT NULL,
|
||||
PRIMARY KEY(`message_id`, `title_id`)
|
||||
);
|
||||
150
drizzle/meta/0006_snapshot.json
Normal file
150
drizzle/meta/0006_snapshot.json
Normal file
@@ -0,0 +1,150 @@
|
||||
{
|
||||
"version": "6",
|
||||
"dialect": "sqlite",
|
||||
"id": "779bdeb8-3d3b-4429-8260-2ef628d0baa0",
|
||||
"prevId": "bca1f597-6db1-4bf8-ab6b-a95c10d3f6a7",
|
||||
"tables": {
|
||||
"device_tokens": {
|
||||
"name": "device_tokens",
|
||||
"columns": {
|
||||
"device_id": {
|
||||
"name": "device_id",
|
||||
"type": "text",
|
||||
"primaryKey": true,
|
||||
"notNull": true,
|
||||
"autoincrement": false
|
||||
},
|
||||
"token": {
|
||||
"name": "token",
|
||||
"type": "text",
|
||||
"primaryKey": false,
|
||||
"notNull": true,
|
||||
"autoincrement": false
|
||||
},
|
||||
"username": {
|
||||
"name": "username",
|
||||
"type": "text",
|
||||
"primaryKey": false,
|
||||
"notNull": false,
|
||||
"autoincrement": false
|
||||
},
|
||||
"last_connected_at": {
|
||||
"name": "last_connected_at",
|
||||
"type": "text",
|
||||
"primaryKey": false,
|
||||
"notNull": false,
|
||||
"autoincrement": false,
|
||||
"default": "(CURRENT_TIMESTAMP)"
|
||||
}
|
||||
},
|
||||
"indexes": {
|
||||
"device_tokens_token_unique": {
|
||||
"name": "device_tokens_token_unique",
|
||||
"columns": ["token"],
|
||||
"isUnique": true
|
||||
}
|
||||
},
|
||||
"foreignKeys": {},
|
||||
"compositePrimaryKeys": {},
|
||||
"uniqueConstraints": {}
|
||||
},
|
||||
"key_value": {
|
||||
"name": "key_value",
|
||||
"columns": {
|
||||
"key": {
|
||||
"name": "key",
|
||||
"type": "text",
|
||||
"primaryKey": true,
|
||||
"notNull": true,
|
||||
"autoincrement": false
|
||||
},
|
||||
"value": {
|
||||
"name": "value",
|
||||
"type": "text",
|
||||
"primaryKey": false,
|
||||
"notNull": true,
|
||||
"autoincrement": false
|
||||
}
|
||||
},
|
||||
"indexes": {},
|
||||
"foreignKeys": {},
|
||||
"compositePrimaryKeys": {},
|
||||
"uniqueConstraints": {}
|
||||
},
|
||||
"title_messages": {
|
||||
"name": "title_messages",
|
||||
"columns": {
|
||||
"title_id": {
|
||||
"name": "title_id",
|
||||
"type": "integer",
|
||||
"primaryKey": false,
|
||||
"notNull": true,
|
||||
"autoincrement": false
|
||||
},
|
||||
"message_id": {
|
||||
"name": "message_id",
|
||||
"type": "text",
|
||||
"primaryKey": false,
|
||||
"notNull": true,
|
||||
"autoincrement": false
|
||||
}
|
||||
},
|
||||
"indexes": {},
|
||||
"foreignKeys": {},
|
||||
"compositePrimaryKeys": {
|
||||
"title_messages_title_id_message_id_pk": {
|
||||
"columns": ["message_id", "title_id"],
|
||||
"name": "title_messages_title_id_message_id_pk"
|
||||
}
|
||||
},
|
||||
"uniqueConstraints": {}
|
||||
},
|
||||
"watch_status": {
|
||||
"name": "watch_status",
|
||||
"columns": {
|
||||
"device_id": {
|
||||
"name": "device_id",
|
||||
"type": "text",
|
||||
"primaryKey": false,
|
||||
"notNull": true,
|
||||
"autoincrement": false
|
||||
},
|
||||
"title_id": {
|
||||
"name": "title_id",
|
||||
"type": "integer",
|
||||
"primaryKey": false,
|
||||
"notNull": true,
|
||||
"autoincrement": false
|
||||
}
|
||||
},
|
||||
"indexes": {},
|
||||
"foreignKeys": {
|
||||
"watch_status_device_id_device_tokens_device_id_fk": {
|
||||
"name": "watch_status_device_id_device_tokens_device_id_fk",
|
||||
"tableFrom": "watch_status",
|
||||
"tableTo": "device_tokens",
|
||||
"columnsFrom": ["device_id"],
|
||||
"columnsTo": ["device_id"],
|
||||
"onDelete": "no action",
|
||||
"onUpdate": "no action"
|
||||
}
|
||||
},
|
||||
"compositePrimaryKeys": {
|
||||
"watch_status_device_id_title_id_pk": {
|
||||
"columns": ["device_id", "title_id"],
|
||||
"name": "watch_status_device_id_title_id_pk"
|
||||
}
|
||||
},
|
||||
"uniqueConstraints": {}
|
||||
}
|
||||
},
|
||||
"enums": {},
|
||||
"_meta": {
|
||||
"schemas": {},
|
||||
"tables": {},
|
||||
"columns": {}
|
||||
},
|
||||
"internal": {
|
||||
"indexes": {}
|
||||
}
|
||||
}
|
||||
@@ -43,6 +43,13 @@
|
||||
"when": 1725293569918,
|
||||
"tag": "0005_shiny_scarecrow",
|
||||
"breakpoints": true
|
||||
},
|
||||
{
|
||||
"idx": 6,
|
||||
"version": "6",
|
||||
"when": 1725836922065,
|
||||
"tag": "0006_sticky_donald_blake",
|
||||
"breakpoints": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -1,14 +1,15 @@
|
||||
import { zValidator } from "@hono/zod-validator";
|
||||
import { Hono } from "hono";
|
||||
import { Client } from "@upstash/qstash";
|
||||
import { Hono, type HonoRequest } from "hono";
|
||||
import { env } from "hono/adapter";
|
||||
import mapKeys from "lodash.mapkeys";
|
||||
import { DateTime } from "luxon";
|
||||
import { z } from "zod";
|
||||
|
||||
import { Case, changeStringCase } from "~/libs/changeStringCase";
|
||||
import type { AdminSdkCredentials } from "~/libs/fcm/getGoogleAuthToken";
|
||||
import { sendFcmMessage } from "~/libs/fcm/sendFcmMessage";
|
||||
import { getCurrentDomain } from "~/libs/getCurrentDomain";
|
||||
import { maybeScheduleNextAiringEpisode } from "~/libs/maybeScheduleNextAiringEpisode";
|
||||
import { verifyQstashHeader } from "~/libs/qstash/verifyQstashHeader";
|
||||
import { readEnvVariable } from "~/libs/readEnvVariable";
|
||||
import { getTokensSubscribedToTitle } from "~/models/token";
|
||||
@@ -48,6 +49,7 @@ app.post(
|
||||
`${domain}/episodes/${aniListId}`,
|
||||
).then((res) => res.json<EpisodesResponseSchema>());
|
||||
if (!success) {
|
||||
await scheduleRetry(readEnvVariable(c.env, "QSTASH_TOKEN"), c.req);
|
||||
return c.json(ErrorResponse, { status: 500 });
|
||||
}
|
||||
|
||||
@@ -56,6 +58,7 @@ app.post(
|
||||
(episode) => episode.number === episodeNumber,
|
||||
);
|
||||
if (!episode) {
|
||||
await scheduleRetry(readEnvVariable(c.env, "QSTASH_TOKEN"), c.req);
|
||||
return c.json(ErrorResponse, { status: 404 });
|
||||
}
|
||||
|
||||
@@ -73,6 +76,7 @@ app.post(
|
||||
},
|
||||
).then((res) => res.json<FetchUrlResponse>());
|
||||
if (!fetchUrlSuccess) {
|
||||
await scheduleRetry(readEnvVariable(c.env, "QSTASH_TOKEN"), c.req);
|
||||
return c.json(ErrorResponse, { status: 500 });
|
||||
}
|
||||
|
||||
@@ -103,8 +107,23 @@ app.post(
|
||||
}),
|
||||
);
|
||||
|
||||
await maybeScheduleNextAiringEpisode(
|
||||
env<Env, typeof c>(c, "workerd"),
|
||||
c.req,
|
||||
aniListId,
|
||||
);
|
||||
|
||||
return c.json(SuccessResponse, 200);
|
||||
},
|
||||
);
|
||||
|
||||
async function scheduleRetry(qstashToken: string, req: HonoRequest) {
|
||||
return new Client({ token: qstashToken }).publishJSON({
|
||||
body: await req.text(),
|
||||
url: req.url,
|
||||
retries: 0,
|
||||
delay: "1h",
|
||||
});
|
||||
}
|
||||
|
||||
export default app;
|
||||
|
||||
@@ -4,7 +4,6 @@ import { updateDeviceLastConnectedAt } from "~/models/token";
|
||||
|
||||
export const maybeUpdateLastConnectedAt = createMiddleware(async (c, next) => {
|
||||
const deviceId = await c.req.header("X-Aniplay-Device-Id");
|
||||
console.log("deviceId", deviceId);
|
||||
if (!deviceId) {
|
||||
return next();
|
||||
}
|
||||
|
||||
@@ -7,7 +7,11 @@ import { getTestDb } from "~/libs/test/getTestDb";
|
||||
import { getTestEnv } from "~/libs/test/getTestEnv";
|
||||
import { resetTestDb } from "~/libs/test/resetTestDb";
|
||||
import { server } from "~/mocks";
|
||||
import { deviceTokensTable, watchStatusTable } from "~/models/schema";
|
||||
import {
|
||||
deviceTokensTable,
|
||||
titleMessagesTable,
|
||||
watchStatusTable,
|
||||
} from "~/models/schema";
|
||||
|
||||
server.listen();
|
||||
|
||||
@@ -96,6 +100,9 @@ describe("requests the /watch-status route", () => {
|
||||
await db
|
||||
.insert(deviceTokensTable)
|
||||
.values({ deviceId: "123", token: "asd" });
|
||||
await db
|
||||
.insert(titleMessagesTable)
|
||||
.values({ titleId: 10, messageId: "123" });
|
||||
|
||||
const res = await app.request(
|
||||
"/watch-status",
|
||||
@@ -122,6 +129,9 @@ describe("requests the /watch-status route", () => {
|
||||
await db
|
||||
.insert(deviceTokensTable)
|
||||
.values({ deviceId: "123", token: "asd" });
|
||||
await db
|
||||
.insert(titleMessagesTable)
|
||||
.values({ titleId: -1, messageId: "123" });
|
||||
|
||||
const res = await app.request(
|
||||
"/watch-status",
|
||||
@@ -148,6 +158,9 @@ describe("requests the /watch-status route", () => {
|
||||
await db
|
||||
.insert(deviceTokensTable)
|
||||
.values({ deviceId: "123", token: "asd" });
|
||||
await db
|
||||
.insert(titleMessagesTable)
|
||||
.values({ titleId: 139518, messageId: "123" });
|
||||
|
||||
const res = await app.request("/watch-status", {
|
||||
method: "POST",
|
||||
|
||||
@@ -1,8 +1,11 @@
|
||||
import { OpenAPIHono, createRoute, z } from "@hono/zod-openapi";
|
||||
import { Client } from "@upstash/qstash";
|
||||
import { env } from "hono/adapter";
|
||||
|
||||
import { maybeScheduleNextAiringEpisode } from "~/libs/maybeScheduleNextAiringEpisode";
|
||||
import { verifyQstashHeader } from "~/libs/qstash/verifyQstashHeader";
|
||||
import { readEnvVariable } from "~/libs/readEnvVariable";
|
||||
import { deleteTitleMessage, getTitleMessage } from "~/models/titleMessages";
|
||||
import { setWatchStatus } from "~/models/watchStatus";
|
||||
import type { Env } from "~/types/env";
|
||||
import {
|
||||
@@ -71,6 +74,7 @@ app.openapi(route, async (c) => {
|
||||
isRetrying = false,
|
||||
} = await c.req.json<typeof UpdateWatchStatusRequest._type>();
|
||||
const aniListToken = c.req.header("X-AniList-Token");
|
||||
const client = new Client({ token: readEnvVariable(c.env, "QSTASH_TOKEN") });
|
||||
|
||||
if (isRetrying) {
|
||||
if (!(await verifyQstashHeader(env<Env, typeof c>(c, "workerd"), c.req))) {
|
||||
@@ -84,6 +88,20 @@ app.openapi(route, async (c) => {
|
||||
Number(titleId),
|
||||
watchStatus,
|
||||
);
|
||||
if (wasAdded) {
|
||||
await maybeScheduleNextAiringEpisode(
|
||||
env<Env, typeof c>(c, "workerd"),
|
||||
c.req,
|
||||
titleId,
|
||||
);
|
||||
} else if (wasDeleted) {
|
||||
const messageId = await getTitleMessage(
|
||||
env<Env, typeof c>(c, "workerd"),
|
||||
titleId,
|
||||
);
|
||||
await client.messages.delete(messageId);
|
||||
await deleteTitleMessage(env<Env, typeof c>(c, "workerd"), titleId);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(new Error("Error setting watch status", { cause: error }));
|
||||
console.error(error);
|
||||
@@ -101,19 +119,12 @@ app.openapi(route, async (c) => {
|
||||
console.error(
|
||||
new Error("Failed to update watch status on Anilist", { cause: error }),
|
||||
);
|
||||
await import("@upstash/qstash")
|
||||
.then(
|
||||
({ Client }) =>
|
||||
new Client({ token: readEnvVariable(c.env, "QSTASH_TOKEN") }),
|
||||
)
|
||||
.then((client) =>
|
||||
client.publishJSON({
|
||||
url: c.req.url,
|
||||
body: { deviceId, watchStatus, titleId, isRetrying: true },
|
||||
retries: 0,
|
||||
delay: 60,
|
||||
}),
|
||||
);
|
||||
client.publishJSON({
|
||||
url: c.req.url,
|
||||
body: { deviceId, watchStatus, titleId, isRetrying: true },
|
||||
retries: 0,
|
||||
delay: 60,
|
||||
});
|
||||
}
|
||||
|
||||
return c.json(SuccessResponse, { status: 200 });
|
||||
|
||||
28
src/libs/anilist/getNextEpisodeAiringAt.ts
Normal file
28
src/libs/anilist/getNextEpisodeAiringAt.ts
Normal file
@@ -0,0 +1,28 @@
|
||||
import { graphql } from "gql.tada";
|
||||
import { GraphQLClient } from "graphql-request";
|
||||
|
||||
const GetNextEpisodeAiringAtQuery = graphql(`
|
||||
query GetNextEpisodeAiringAt($id: Int!) {
|
||||
Media(id: $id) {
|
||||
nextAiringEpisode {
|
||||
episode
|
||||
timeUntilAiring
|
||||
}
|
||||
}
|
||||
}
|
||||
`);
|
||||
|
||||
export function getNextEpisodeTimeUntilAiring(aniListId: number) {
|
||||
const client = new GraphQLClient("https://graphql.anilist.co/");
|
||||
|
||||
return client
|
||||
.request(GetNextEpisodeAiringAtQuery, { id: aniListId })
|
||||
.then((data) => {
|
||||
const nextAiring = data!.Media!.nextAiringEpisode;
|
||||
if (!nextAiring) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return nextAiring;
|
||||
});
|
||||
}
|
||||
32
src/libs/maybeScheduleNextAiringEpisode.ts
Normal file
32
src/libs/maybeScheduleNextAiringEpisode.ts
Normal file
@@ -0,0 +1,32 @@
|
||||
import { Client } from "@upstash/qstash";
|
||||
import type { HonoRequest } from "hono";
|
||||
|
||||
import { setTitleMessage } from "~/models/titleMessages";
|
||||
import type { Env } from "~/types/env";
|
||||
|
||||
import { getNextEpisodeTimeUntilAiring } from "./anilist/getNextEpisodeAiringAt";
|
||||
import { getCurrentDomain } from "./getCurrentDomain";
|
||||
|
||||
export async function maybeScheduleNextAiringEpisode(
|
||||
env: Env,
|
||||
req: HonoRequest,
|
||||
aniListId: number,
|
||||
) {
|
||||
const nextAiring = await getNextEpisodeTimeUntilAiring(aniListId);
|
||||
if (!nextAiring) {
|
||||
return;
|
||||
}
|
||||
|
||||
const { timeUntilAiring, episode: nextEpisode } = nextAiring;
|
||||
const client = new Client({ token: env.QSTASH_TOKEN });
|
||||
|
||||
const domain = getCurrentDomain(req);
|
||||
const { messageId } = await client.publishJSON({
|
||||
url: `${domain}/internal/new-episode`,
|
||||
body: { aniListId, episode: nextEpisode },
|
||||
retries: 0,
|
||||
delay: timeUntilAiring,
|
||||
contentBasedDeduplication: true,
|
||||
});
|
||||
await setTitleMessage(env, aniListId, messageId);
|
||||
}
|
||||
16
src/mocks/anilist/nextAiringEpisode.ts
Normal file
16
src/mocks/anilist/nextAiringEpisode.ts
Normal file
@@ -0,0 +1,16 @@
|
||||
import { HttpResponse, graphql } from "msw";
|
||||
|
||||
export function getAnilistNextAiringEpisode() {
|
||||
return graphql.query(
|
||||
"GetNextEpisodeAiringAt",
|
||||
({ variables: { titleId } }) => {
|
||||
return HttpResponse.json({
|
||||
data: {
|
||||
Media: {
|
||||
nextAiringEpisode: null,
|
||||
},
|
||||
},
|
||||
});
|
||||
},
|
||||
);
|
||||
}
|
||||
@@ -3,6 +3,7 @@ import { getAnifySources } from "./anify/sources";
|
||||
import { getAnifyTitle } from "./anify/title";
|
||||
import { deleteAnilistMediaListEntry } from "./anilist/deleteMediaListEntry";
|
||||
import { getAnilistMediaListEntry } from "./anilist/mediaListEntry";
|
||||
import { getAnilistNextAiringEpisode } from "./anilist/nextAiringEpisode";
|
||||
import { getAnilistSearchResults } from "./anilist/search";
|
||||
import { getAnilistTitle } from "./anilist/title";
|
||||
import { updateAnilistWatchStatus } from "./anilist/updateWatchStatus";
|
||||
@@ -14,6 +15,7 @@ import { mockFcmMessageResponse } from "./fcm";
|
||||
export const handlers = [
|
||||
deleteAnilistMediaListEntry(),
|
||||
getAnilistMediaListEntry(),
|
||||
getAnilistNextAiringEpisode(),
|
||||
getAnilistSearchResults(),
|
||||
getAnilistTitle(),
|
||||
updateAnilistWatchStatus(),
|
||||
|
||||
@@ -2,9 +2,14 @@ import { SignatureError } from "@upstash/qstash";
|
||||
|
||||
import { mock } from "bun:test";
|
||||
|
||||
class MockQstashMessages {
|
||||
delete = mock();
|
||||
}
|
||||
|
||||
class MockQstashClient {
|
||||
batchJSON = mock();
|
||||
publishJSON = mock();
|
||||
publishJSON = mock().mockResolvedValue({ messageId: "123" });
|
||||
messages = new MockQstashMessages();
|
||||
}
|
||||
|
||||
class MockQstashReceiver {
|
||||
|
||||
@@ -34,4 +34,20 @@ export const keyValueTable = sqliteTable("key_value", {
|
||||
value: text("value").notNull(),
|
||||
});
|
||||
|
||||
export const tables = [watchStatusTable, deviceTokensTable, keyValueTable];
|
||||
export const titleMessagesTable = sqliteTable(
|
||||
"title_messages",
|
||||
{
|
||||
titleId: integer("title_id").notNull(),
|
||||
messageId: text("message_id").notNull(),
|
||||
},
|
||||
(table) => ({
|
||||
pk: primaryKey({ columns: [table.titleId, table.messageId] }),
|
||||
}),
|
||||
);
|
||||
|
||||
export const tables = [
|
||||
watchStatusTable,
|
||||
deviceTokensTable,
|
||||
keyValueTable,
|
||||
titleMessagesTable,
|
||||
];
|
||||
|
||||
31
src/models/titleMessages.ts
Normal file
31
src/models/titleMessages.ts
Normal file
@@ -0,0 +1,31 @@
|
||||
import { eq } from "drizzle-orm";
|
||||
|
||||
import type { Env } from "~/types/env";
|
||||
|
||||
import { getDb } from "./db";
|
||||
import { titleMessagesTable } from "./schema";
|
||||
|
||||
export function setTitleMessage(env: Env, titleId: number, messageId: string) {
|
||||
return getDb(env)
|
||||
.insert(titleMessagesTable)
|
||||
.values({ titleId, messageId })
|
||||
.onConflictDoUpdate({
|
||||
set: { messageId },
|
||||
target: [titleMessagesTable.titleId],
|
||||
});
|
||||
}
|
||||
|
||||
export function getTitleMessage(env: Env, titleId: number) {
|
||||
return getDb(env)
|
||||
.select()
|
||||
.from(titleMessagesTable)
|
||||
.where(eq(titleMessagesTable.titleId, titleId))
|
||||
.then((results) => results[0].messageId);
|
||||
}
|
||||
|
||||
export function deleteTitleMessage(env: Env, titleId: number) {
|
||||
return getDb(env)
|
||||
.delete(titleMessagesTable)
|
||||
.where(eq(titleMessagesTable.titleId, titleId))
|
||||
.run();
|
||||
}
|
||||
Reference in New Issue
Block a user