diff --git a/.gitignore b/.gitignore index f31104c..3d2ad7e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,10 @@ node_modules dist +.mf .wrangler .dev.vars *.db *.db-* -.env \ No newline at end of file +.env +src/libs/anilist/anilist-do.ts +.idea/ChatHistory_schema_v3.xml diff --git a/drizzle.config.ts b/drizzle.config.ts index 3144e97..973d585 100644 --- a/drizzle.config.ts +++ b/drizzle.config.ts @@ -1,12 +1,16 @@ +import { config } from "dotenv"; import { defineConfig } from "drizzle-kit"; +config({ path: ".dev.vars" }); + export default defineConfig({ schema: "./src/models/schema.ts", out: "./drizzle", - driver: "turso", + driver: "d1-http", dialect: "sqlite", dbCredentials: { - url: process.env.TURSO_URL, - authToken: process.env.TURSO_AUTH_TOKEN, + accountId: process.env.CLOUDFLARE_ACCOUNT_ID!, + databaseId: process.env.CLOUDFLARE_DATABASE_ID!, + token: process.env.CLOUDFLARE_D1_TOKEN!, }, }); diff --git a/package.json b/package.json index 979642f..a0c8637 100644 --- a/package.json +++ b/package.json @@ -23,7 +23,6 @@ "@hono/swagger-ui": "^0.5.1", "@hono/zod-openapi": "^0.19.5", "@hono/zod-validator": "^0.2.2", - "@libsql/client": "0.15.4", "drizzle-orm": "^0.44.7", "gql.tada": "^1.8.10", "graphql": "^16.12.0", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1e2e658..bcd48dd 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -22,9 +22,6 @@ importers: "@hono/zod-validator": specifier: ^0.2.2 version: 0.2.2(hono@4.10.4)(zod@3.25.76) - "@libsql/client": - specifier: 0.15.4 - version: 0.15.4 drizzle-orm: specifier: ^0.44.7 version: 0.44.7(@cloudflare/workers-types@4.20251014.0)(@libsql/client@0.15.4)(bun-types@1.3.1(@types/react@19.2.2)) @@ -4997,10 +4994,12 @@ snapshots: transitivePeerDependencies: - bufferutil - utf-8-validate + optional: true "@libsql/core@0.15.15": dependencies: js-base64: 3.7.8 + optional: true "@libsql/darwin-arm64@0.5.22": optional: true @@ -5017,8 +5016,10 @@ snapshots: transitivePeerDependencies: - bufferutil - utf-8-validate + optional: true - "@libsql/isomorphic-fetch@0.3.1": {} + "@libsql/isomorphic-fetch@0.3.1": + optional: true "@libsql/isomorphic-ws@0.1.5": dependencies: @@ -5027,6 +5028,7 @@ snapshots: transitivePeerDependencies: - bufferutil - utf-8-validate + optional: true "@libsql/linux-arm-gnueabihf@0.5.22": optional: true @@ -5058,7 +5060,8 @@ snapshots: outvariant: 1.4.3 strict-event-emitter: 0.5.1 - "@neon-rs/load@0.0.4": {} + "@neon-rs/load@0.0.4": + optional: true "@nodelib/fs.scandir@2.1.5": dependencies: @@ -5262,6 +5265,7 @@ snapshots: "@types/ws@8.18.1": dependencies: "@types/node": 24.9.2 + optional: true "@vitest/expect@3.2.4": dependencies: @@ -5520,7 +5524,8 @@ snapshots: csstype@3.2.2: {} - data-uri-to-buffer@4.0.1: {} + data-uri-to-buffer@4.0.1: + optional: true debug@4.4.3: dependencies: @@ -5538,7 +5543,8 @@ snapshots: delayed-stream@1.0.0: {} - detect-libc@2.0.2: {} + detect-libc@2.0.2: + optional: true detect-libc@2.1.2: {} @@ -5762,6 +5768,7 @@ snapshots: dependencies: node-domexception: 1.0.0 web-streams-polyfill: 3.3.3 + optional: true fill-range@7.1.1: dependencies: @@ -5791,6 +5798,7 @@ snapshots: formdata-polyfill@4.0.10: dependencies: fetch-blob: 3.2.0 + optional: true fsevents@2.3.3: optional: true @@ -5979,7 +5987,8 @@ snapshots: jose@5.10.0: {} - js-base64@3.7.8: {} + js-base64@3.7.8: + optional: true js-tokens@4.0.0: {} @@ -6016,6 +6025,7 @@ snapshots: "@libsql/linux-x64-gnu": 0.5.22 "@libsql/linux-x64-musl": 0.5.22 "@libsql/win32-x64-msvc": 0.5.22 + optional: true lilconfig@3.1.3: {} @@ -6169,6 +6179,7 @@ snapshots: data-uri-to-buffer: 4.0.1 fetch-blob: 3.2.0 formdata-polyfill: 4.0.10 + optional: true npm-run-path@5.3.0: dependencies: @@ -6242,7 +6253,8 @@ snapshots: prettier@3.6.2: {} - promise-limit@2.7.0: {} + promise-limit@2.7.0: + optional: true psl@1.15.0: dependencies: @@ -6595,7 +6607,8 @@ snapshots: - tsx - yaml - web-streams-polyfill@3.3.3: {} + web-streams-polyfill@3.3.3: + optional: true web-streams-polyfill@4.0.0-beta.3: {} @@ -6701,7 +6714,8 @@ snapshots: ws@8.18.0: {} - ws@8.18.3: {} + ws@8.18.3: + optional: true y18n@5.0.8: {} diff --git a/src/controllers/auth/anilist/index.ts b/src/controllers/auth/anilist/index.ts index 6b12a6a..4ab0856 100644 --- a/src/controllers/auth/anilist/index.ts +++ b/src/controllers/auth/anilist/index.ts @@ -159,7 +159,7 @@ app.openapi(route, async (c) => { mediaListEntry.status, ); if (wasAdded) { - await maybeScheduleNextAiringEpisode(c.req, media.id); + await maybeScheduleNextAiringEpisode(media.id); } } diff --git a/src/controllers/internal/new-episode/index.ts b/src/controllers/internal/new-episode/index.ts index 2a99f92..1b62246 100644 --- a/src/controllers/internal/new-episode/index.ts +++ b/src/controllers/internal/new-episode/index.ts @@ -2,6 +2,7 @@ import { zValidator } from "@hono/zod-validator"; import { Hono } from "hono"; import { z } from "zod"; +import { getEpisodesFromAniwatch } from "~/controllers/episodes/getByAniListId/aniwatch"; import { fetchEpisodeUrl } from "~/controllers/episodes/getEpisodeUrl"; import { getAdminSdkCredentials } from "~/libs/gcloud/getAdminSdkCredentials"; import { sendFcmMessage } from "~/libs/gcloud/sendFcmMessage"; @@ -16,6 +17,46 @@ import { const app = new Hono(); +export async function onNewEpisode(aniListId: number, episodeNumber: number) { + console.log( + `On new episode, aniListId: ${aniListId}, episodeNumber: ${episodeNumber}`, + ); + + if (!(await isWatchingTitle(aniListId))) { + console.log(`Title ${aniListId} is no longer being watched`); + return { success: true, result: { isNoLongerWatching: true } }; + } + + const episodes = await getEpisodesFromAniwatch(aniListId); + const fetchUrlResult = await fetchEpisodeUrl({ aniListId, episodeNumber }); + if (!fetchUrlResult) { + console.error(`Failed to fetch episode URL for episode`); + return { success: false, message: "Failed to fetch episode URL" }; + } + + const tokens = await getTokensSubscribedToTitle(aniListId); + + await Promise.allSettled( + tokens.map(async (token) => { + return sendFcmMessage(getAdminSdkCredentials(), { + token, + data: { + type: "new_episode", + episodes: JSON.stringify(episodes), + episodeStreamInfo: JSON.stringify(fetchUrlResult), + aniListId: aniListId.toString(), + episodeNumber: episodeNumber.toString(), + }, + android: { priority: "high" }, + }); + }), + ); + + await maybeScheduleNextAiringEpisode(aniListId); + + return SuccessResponse; +} + app.post( "/", zValidator( @@ -30,48 +71,13 @@ app.post( aniListId: number; episodeNumber: number; }>(); - console.log( - `Internal new episode route, aniListId: ${aniListId}, episodeNumber: ${episodeNumber}`, - ); - if (!(await isWatchingTitle(aniListId))) { - console.log(`Title ${aniListId} is no longer being watched`); - return c.json( - { success: true, result: { isNoLongerWatching: true } }, - 200, - ); + const result = await onNewEpisode(aniListId, episodeNumber, c.req); + if (result.success) { + return c.json(result, 200); + } else { + return c.json(result, 500); } - - const fetchUrlResult = await fetchEpisodeUrl({ aniListId, episodeNumber }); - if (!fetchUrlResult) { - console.error(`Failed to fetch episode URL for episode`); - return c.json( - { success: false, message: "Failed to fetch episode URL" }, - 500, - ); - } - - const tokens = await getTokensSubscribedToTitle(aniListId); - - await Promise.allSettled( - tokens.map(async (token) => { - return sendFcmMessage(getAdminSdkCredentials(), { - token, - data: { - type: "new_episode", - episodes: JSON.stringify(episodes), - episodeStreamInfo: JSON.stringify(fetchUrlResult), - aniListId: aniListId.toString(), - episodeNumber: episodeNumber.toString(), - }, - android: { priority: "high" }, - }); - }), - ); - - await maybeScheduleNextAiringEpisode(c.req, aniListId); - - return c.json(SuccessResponse, 200); }, ); diff --git a/src/controllers/internal/upcoming-titles/anilist.ts b/src/controllers/internal/upcoming-titles/anilist.ts index 1dcf845..41f5673 100644 --- a/src/controllers/internal/upcoming-titles/anilist.ts +++ b/src/controllers/internal/upcoming-titles/anilist.ts @@ -87,7 +87,7 @@ export async function getUpcomingTitlesFromAnilist(req: HonoRequest) { await Promise.all( Array.from(plannedToWatchTitles).map((titleId) => - maybeScheduleNextAiringEpisode(req, titleId), + maybeScheduleNextAiringEpisode(titleId), ), ); diff --git a/src/controllers/maybeUpdateLastConnectedAt.ts b/src/controllers/maybeUpdateLastConnectedAt.ts index e2ce8dd..b56bd47 100644 --- a/src/controllers/maybeUpdateLastConnectedAt.ts +++ b/src/controllers/maybeUpdateLastConnectedAt.ts @@ -8,6 +8,6 @@ export const maybeUpdateLastConnectedAt = createMiddleware(async (c, next) => { return next(); } - await updateDeviceLastConnectedAt(c.env, deviceId); + await updateDeviceLastConnectedAt(deviceId); return next(); }); diff --git a/src/controllers/watch-status/index.ts b/src/controllers/watch-status/index.ts index 4960154..1b74ce7 100644 --- a/src/controllers/watch-status/index.ts +++ b/src/controllers/watch-status/index.ts @@ -1,6 +1,7 @@ import { OpenAPIHono, createRoute, z } from "@hono/zod-openapi"; import type { HonoRequest } from "hono"; +import { AnilistUpdateType } from "~/libs/anilist/updateType.ts"; import { maybeScheduleNextAiringEpisode } from "~/libs/maybeScheduleNextAiringEpisode"; import { buildNewEpisodeTaskId } from "~/libs/tasks/id"; import { queueTask } from "~/libs/tasks/queueTask"; @@ -76,9 +77,9 @@ export async function updateWatchStatus( watchStatus, ); if (wasAdded) { - await maybeScheduleNextAiringEpisode(req, titleId); + await maybeScheduleNextAiringEpisode(titleId); } else if (wasDeleted) { - await removeTask("new-episode", buildNewEpisodeTaskId(titleId)); + await removeTask("NEW_EPISODE", buildNewEpisodeTaskId(titleId)); } } @@ -115,13 +116,12 @@ app.openapi(route, async (c) => { } await queueTask( - "anilist-updates", + "ANILIST_UPDATES", { deviceId, watchStatus, titleId, - isRetrying: true, - nameSuffix: "watch-status", + updateType: AnilistUpdateType.UpdateWatchStatus, }, { req: c.req, scheduleConfig: { delay: { minute: 1 } } }, ); diff --git a/src/index.ts b/src/index.ts index b57fbc7..5921007 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,6 +2,10 @@ import { swaggerUI } from "@hono/swagger-ui"; import { OpenAPIHono } from "@hono/zod-openapi"; import { maybeUpdateLastConnectedAt } from "~/controllers/maybeUpdateLastConnectedAt"; +import type { QueueName } from "~/libs/tasks/queueName.ts"; + +import { onNewEpisode } from "./controllers/internal/new-episode"; +import type { QueueBody } from "./libs/tasks/queueTask"; const app = new OpenAPIHono(); @@ -65,6 +69,25 @@ app.doc("/openapi.json", { app.get("/docs", swaggerUI({ url: "/openapi.json" })); -export default app; +export default { + ...app, + async queue(batch) { + switch (batch.queue as QueueName) { + case "ANILIST_UPDATES": + batch.retryAll(); + break; + case "NEW_EPISODE": + for (const message of (batch as MessageBatch) + .messages) { + await onNewEpisode( + message.body.aniListId, + message.body.episodeNumber, + ); + message.ack(); + } + break; + } + }, +} satisfies ExportedHandler; export { AnilistDurableObject as AnilistDo } from "~/libs/anilist/anilist-do.ts"; diff --git a/src/libs/anilist/anilist-do.ts b/src/libs/anilist/anilist-do.ts index 454ab49..5fb18d6 100644 --- a/src/libs/anilist/anilist-do.ts +++ b/src/libs/anilist/anilist-do.ts @@ -1,10 +1,22 @@ import { DurableObject, env } from "cloudflare:workers"; -import { GraphQLClient } from "graphql-request"; +import { graphql, type ResultOf } from "gql.tada"; +import { print } from "graphql"; import { z } from "zod"; -import { GetTitleQuery, _fetchTitleFromAnilist } from "~/libs/anilist/getTitle"; import { sleep } from "~/libs/sleep.ts"; import type { Title } from "~/types/title"; +import { MediaFragment } from "~/types/title/mediaFragment"; + +const GetTitleQuery = graphql( + ` + query GetTitle($id: Int!) { + Media(id: $id) { + ...Media + } + } + `, + [MediaFragment], +); const nextAiringEpisodeSchema = z.nullable( z.object({ @@ -29,12 +41,13 @@ export class AnilistDurableObject extends DurableObject { switch (operationName) { case "GetTitle": { const { variables } = body; - const cache = await this.state.storage.get(variables.id); - if (cache) { - return new Response(JSON.stringify(cache), { - headers: { "Content-Type": "application/json" }, - }); - } + const storageKey = variables.id; + const cache = await this.state.storage.get(storageKey); + // if (cache) { + // return new Response(JSON.stringify(cache), { + // headers: { "Content-Type": "application/json" }, + // }); + // } const anilistResponse = await this.fetchTitleFromAnilist( variables.id, @@ -45,7 +58,7 @@ export class AnilistDurableObject extends DurableObject { ); const airingAt = (nextAiringEpisode?.airingAt ?? 0) * 1000; - await this.state.storage.put(variables.id, anilistResponse); + await this.state.storage.put(storageKey, anilistResponse); if (airingAt) { await this.state.storage.setAlarm(airingAt); await this.state.storage.put(`alarm:${variables.id}`, airingAt); @@ -63,7 +76,6 @@ export class AnilistDurableObject extends DurableObject { async alarm() { const now = Date.now(); const alarms = await this.state.storage.list({ prefix: "alarm:" }); - console.log("alarm", now, alarms); for (const [id, ttl] of Object.entries(alarms)) { if (now >= ttl) { await this.state.storage.delete(id); @@ -75,30 +87,70 @@ export class AnilistDurableObject extends DurableObject { id: number, token?: string | undefined, ): Promise { - const client = new GraphQLClient("https://graphql.anilist.co/"); - const headers = new Headers(); + const headers: any = { + "Content-Type": "application/json", + }; + if (token) { - headers.append("Authorization", `Bearer ${token}`); + headers["Authorization"] = `Bearer ${token}`; } - return client - .request(GetTitleQuery, { id }, headers) - .then((data) => data?.Media ?? undefined) - .catch((error) => { - if (error.message.includes("Not Found")) { - return undefined; - } - if (error.response?.status === 429) { - console.log( - "429, retrying in", - error.response.headers.get("Retry-After"), - ); - return sleep( - Number(error.response.headers.get("Retry-After")!) * 1000, - ).then(() => this.fetchTitleFromAnilist(id, token)); - } + const response = await fetch("http://localhost:3000/proxy", { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify({ + url: "https://graphql.anilist.co/", + method: "POST", + headers, // Pass the original headers here + data: { + operationName: "GetTitle", + query: print(GetTitleQuery), + variables: { id }, + }, + }), + }); - throw error; - }); + // 1. Handle Rate Limiting (429) + if (response.status === 429) { + const retryAfter = response.headers.get("Retry-After"); + console.log("429, retrying in", retryAfter); + + await sleep(Number(retryAfter || 1) * 1000); // specific fallback or ensure logic + return this.fetchTitleFromAnilist(id, token); + } + + // 2. Handle HTTP Errors (like 404 or 500) + if (!response.ok) { + // If it is specifically a 404 Not Found HTTP status + if (response.status === 404) { + return undefined; + } + + console.error(JSON.stringify(await response.json(), null, 2)); + // Throw for other HTTP errors to be caught by caller + throw new Error(`HTTP Error: ${response.status} ${response.statusText}`); + } + + // 3. Parse JSON + // We cast this to ResultOf<typeof GetTitleQuery> to maintain Tada type safety + const result = (await response.json().then((json) => json.data)) as { + data?: ResultOf<typeof GetTitleQuery>; + errors?: any[]; + }; + + // 4. Handle GraphQL Specific Errors (Anilist might return 200 OK but include errors) + if (result.errors && result.errors.length > 0) { + const errorMessage = JSON.stringify(result.errors); + + if (errorMessage.includes("Not Found")) { + return undefined; + } + + throw new Error(`GraphQL Error: ${errorMessage}`); + } + + return result.data?.Media ?? undefined; } } diff --git a/src/libs/anilist/getTitle.ts b/src/libs/anilist/getTitle.ts index 9e78b64..7ef409e 100644 --- a/src/libs/anilist/getTitle.ts +++ b/src/libs/anilist/getTitle.ts @@ -1,19 +1,6 @@ import { env } from "cloudflare:workers"; -import { graphql } from "gql.tada"; import type { Title } from "~/types/title"; -import { MediaFragment } from "~/types/title/mediaFragment"; - -export const GetTitleQuery = graphql( - ` - query GetTitle($id: Int!) { - Media(id: $id) { - ...Media - } - } - `, - [MediaFragment], -); export async function fetchTitleFromAnilist( id: number, diff --git a/src/libs/anilist/updateType.ts b/src/libs/anilist/updateType.ts new file mode 100644 index 0000000..350e245 --- /dev/null +++ b/src/libs/anilist/updateType.ts @@ -0,0 +1,3 @@ +export enum AnilistUpdateType { + UpdateWatchStatus, +} diff --git a/src/libs/maybeScheduleNextAiringEpisode.ts b/src/libs/maybeScheduleNextAiringEpisode.ts index f819ea4..19abc8d 100644 --- a/src/libs/maybeScheduleNextAiringEpisode.ts +++ b/src/libs/maybeScheduleNextAiringEpisode.ts @@ -1,4 +1,3 @@ -import type { HonoRequest } from "hono"; import { DateTime } from "luxon"; import { @@ -7,18 +6,9 @@ import { } from "~/models/unreleasedTitles"; import { getNextEpisodeTimeUntilAiring } from "./anilist/getNextEpisodeAiringAt"; -import { getCurrentDomain } from "./getCurrentDomain"; import { queueTask } from "./tasks/queueTask"; -export async function maybeScheduleNextAiringEpisode( - req: HonoRequest, - aniListId: number, -) { - const domain = getCurrentDomain(req); - if (!domain) { - return; - } - +export async function maybeScheduleNextAiringEpisode(aniListId: number) { const { nextAiring, status } = await getNextEpisodeTimeUntilAiring(aniListId); if ( !nextAiring || @@ -33,9 +23,9 @@ export async function maybeScheduleNextAiringEpisode( const { airingAt, episode: nextEpisode } = nextAiring; await queueTask( - "new-episode", + "NEW_EPISODE", { aniListId, episodeNumber: nextEpisode }, - { req, scheduleConfig: { epochTime: airingAt } }, + { scheduleConfig: { epochTime: airingAt } }, ); await removeUnreleasedTitle(aniListId); } diff --git a/src/libs/tasks/queueName.ts b/src/libs/tasks/queueName.ts index a5a1a17..8426105 100644 --- a/src/libs/tasks/queueName.ts +++ b/src/libs/tasks/queueName.ts @@ -1 +1,5 @@ -export type QueueName = "anilist-updates" | "new-episode"; +type QueueKeys<T> = { + [K in keyof T]: T[K] extends Queue ? K : never; +}[keyof T]; + +export type QueueName = QueueKeys<Cloudflare.Env>; diff --git a/src/libs/tasks/queueTask.ts b/src/libs/tasks/queueTask.ts index c961ccc..c0be4b6 100644 --- a/src/libs/tasks/queueTask.ts +++ b/src/libs/tasks/queueTask.ts @@ -1,26 +1,20 @@ import { env as cloudflareEnv } from "cloudflare:workers"; import type { HonoRequest } from "hono"; -import isEqual from "lodash.isequal"; -import { DateTime, type DurationLike } from "luxon"; +import { DateTime, Duration, type DurationLike } from "luxon"; +import { AnilistUpdateType } from "~/libs/anilist/updateType.ts"; import type { WatchStatus } from "~/types/title/watchStatus"; -import { FailedToQueueTaskError } from "../errors/FailedToQueueTask"; -import { getAdminSdkCredentials } from "../gcloud/getAdminSdkCredentials"; -import { getGoogleAuthToken } from "../gcloud/getGoogleAuthToken"; -import { getCurrentDomain } from "../getCurrentDomain"; -import { buildAnilistRetryTaskId, buildNewEpisodeTaskId } from "./id"; import type { QueueName } from "./queueName"; -type QueueBody = { - "anilist-updates": { +export type QueueBody = { + ANILIST_UPDATES: { deviceId: string; watchStatus: WatchStatus | null; titleId: number; - isRetrying: true; - nameSuffix: string; + updateType: AnilistUpdateType; }; - "new-episode": { aniListId: number; episodeNumber: number }; + NEW_EPISODE: { aniListId: number; episodeNumber: number }; }; type ScheduleConfig = @@ -39,163 +33,75 @@ export async function queueTask( body: QueueBody[QueueName], { scheduleConfig, req, env = cloudflareEnv }: QueueTaskOptionalArgs = {}, ) { - const domain = req - ? getCurrentDomain(req) - : "https://aniplay-v2.rururu.workers.dev"; - if (!domain) { - console.log("Skipping queue task due to local domain", queueName, body); - return; - } - - const adminSdkCredentials = getAdminSdkCredentials(env); - const { projectId } = adminSdkCredentials; - - const task = buildTask( - projectId, + const { scheduleTime, headers } = buildTask( queueName, scheduleConfig, - domain, body, req?.header(), ); - const { res } = await queueCloudTask(task); - - if (!res.ok) { - if (res.status === 409) { - if ( - await checkIfTaskExists( - env, - queueName, - task.name.split("/").at(-1)!, - body, - ) - ) { - return; - } else { - const hashedTaskName = await import("node:crypto").then( - ({ createHash }) => - createHash("sha256") - .update(task.name.split("/").at(-1)!) - .digest("hex"), - ); - console.log("task name", hashedTaskName); - const { res } = await queueCloudTask({ - ...task, - name: - task.name.split("/").slice(0, -1).join("/") + "/" + hashedTaskName, - }); - if (!res.ok) { - if (await checkIfTaskExists(env, queueName, hashedTaskName, body)) { - return; - } - - throw new FailedToQueueTaskError(res.status, await res.text()); - } + const contentType = + headers["Content-Type"] === "application/json" ? "json" : "text"; + if (!env) { + const Cloudflare = await import("cloudflare").then( + ({ Cloudflare }) => Cloudflare, + ); + const client = new Cloudflare({ apiToken: env.CLOUDFLARE_TOKEN }); + let queueId: string | null = null; + const queues = await client.queues.list({ + account_id: env.CLOUDFLARE_ACCOUNT_ID, + }); + for await (const queue of queues) { + if (queueId == queue.queue_name) { + queueId = queue.queue_id!; } } + if (!queueId) { + throw new Error(`Queue ${queueName} not found`); + } - throw new FailedToQueueTaskError(res.status, await res.text()); - } - - async function queueCloudTask(task: object) { - const res = await fetch( - `https://content-cloudtasks.googleapis.com/v2/projects/${projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks?alt=json`, + await client.queues.messages.push(queueId, { + body: { body, headers }, + content_type: contentType, + delay_seconds: scheduleTime, + account_id: env.CLOUDFLARE_ACCOUNT_ID, + }); + } else { + await env[queueName].send( + { body, headers }, { - headers: { - Authorization: `Bearer ${await getGoogleAuthToken(adminSdkCredentials)}`, - "Content-Type": "application/json", - }, - body: JSON.stringify({ task }), - method: "POST", + contentType, + delaySeconds: scheduleTime, }, ); - return { res }; } } - -async function checkIfTaskExists( - env: Cloudflare.Env, - queueName: QueueName, - taskId: string, - expectedBody: QueueBody[QueueName], -) { - const adminSdkCredentials = getAdminSdkCredentials(env); - - const body = await fetch( - `https://content-cloudtasks.googleapis.com/v2/projects/${adminSdkCredentials.projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks/${taskId}?responseView=FULL`, - { - headers: { - Authorization: `Bearer ${await getGoogleAuthToken(adminSdkCredentials)}`, - }, - }, - ) - .then((res) => res.json()) - .then(({ httpRequest }) => httpRequest?.body); - - return ( - body && - isEqual( - JSON.parse(Buffer.from(body as string, "base64").toString()), - expectedBody, - ) - ); -} - function buildTask( - projectId: string, queueName: QueueName, scheduleConfig: ScheduleConfig | undefined, - domain: string, body: QueueBody[QueueName], headers: Record<string, string> | undefined, ) { - let scheduleTime: string | undefined; + let scheduleTime: number = 0; if (scheduleConfig) { const { delay, epochTime } = scheduleConfig; if (epochTime) { - scheduleTime = DateTime.fromSeconds(epochTime).toUTC().toISO(); + scheduleTime = DateTime.fromSeconds(epochTime) + .diffNow("second") + .as("second"); } else if (delay) { - scheduleTime = DateTime.now().plus(delay).toUTC().toISO(); + scheduleTime = Duration.fromDurationLike(delay).as("second"); } } - let taskId: string; switch (queueName) { - case "new-episode": - const { aniListId } = body as QueueBody["new-episode"]; - taskId = buildNewEpisodeTaskId(aniListId); - + case "ANILIST_UPDATES": + case "NEW_EPISODE": return { - name: `projects/${projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks/${taskId}`, + body, scheduleTime, - httpRequest: { - url: `${domain}/internal/new-episode`, - httpMethod: "POST", - body: Buffer.from(JSON.stringify(body)).toString("base64"), - headers: { - "Content-Type": "application/json", - "X-Anilist-Token": headers?.["X-Anilist-Token"], - }, - }, - }; - case "anilist-updates": - const { deviceId, titleId, nameSuffix } = - body as QueueBody[typeof queueName]; - taskId = buildAnilistRetryTaskId(deviceId, titleId, nameSuffix); - - return { - name: `projects/${projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks/${taskId}`, - scheduleTime, - httpRequest: { - url: `${domain}/watch-status`, - httpMethod: "POST", - body: Buffer.from( - JSON.stringify({ ...body, nameSuffix: undefined }), - ).toString("base64"), - headers: { - "Content-Type": "application/json", - "X-Anilist-Token": headers?.["X-Anilist-Token"], - }, + headers: { + "Content-Type": "application/json", + "X-Anilist-Token": headers?.["X-Anilist-Token"], }, }; default: diff --git a/src/models/db.ts b/src/models/db.ts index f1bc739..03f45eb 100644 --- a/src/models/db.ts +++ b/src/models/db.ts @@ -1,20 +1,15 @@ -import { createClient } from "@libsql/client"; +// import { createClient } from "@libsql/client"; import { env as cloudflareEnv } from "cloudflare:workers"; -import { drizzle } from "drizzle-orm/libsql"; +import { drizzle } from "drizzle-orm/d1"; type Db = ReturnType<typeof drizzle>; -let db: Db | null = null; +// let db: Db | null = null; export function getDb(env: Cloudflare.Env = cloudflareEnv): Db { - if (db) { - return db; - } + // if (db) { + // return db; + // } - const client = createClient({ - url: env.TURSO_URL, - authToken: env.TURSO_AUTH_TOKEN, - }); - - db = drizzle(client); + const db = drizzle(env.DB, { logger: true }); return db; } diff --git a/worker-configuration.d.ts b/worker-configuration.d.ts index 85b36f5..6a2c85b 100644 --- a/worker-configuration.d.ts +++ b/worker-configuration.d.ts @@ -1,5 +1,5 @@ /* eslint-disable */ -// Generated by Wrangler by running `wrangler types` (hash: ebfd0a114715273ef48ef9f20e119f14) +// Generated by Wrangler by running `wrangler types` (hash: 55b6bc5eef0b3709210a3c57164a6bda) // Runtime types generated with workerd@1.20251011.0 2024-09-23 nodejs_compat declare namespace Cloudflare { interface GlobalProps { @@ -11,7 +11,14 @@ declare namespace Cloudflare { TURSO_AUTH_TOKEN: string; ENABLE_ANIFY: string; ADMIN_SDK_JSON: string; + CLOUDFLARE_TOKEN: string; + CLOUDFLARE_D1_TOKEN: string; + CLOUDFLARE_ACCOUNT_ID: string; + CLOUDFLARE_DATABASE_ID: string; ANILIST_DO: DurableObjectNamespace<import("./src/index").AnilistDo>; + DB: D1Database; + ANILIST_UPDATES: Queue; + NEW_EPISODE: Queue; } } interface Env extends Cloudflare.Env {} diff --git a/wrangler.toml b/wrangler.toml index 052d7b7..4ab4a13 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -17,4 +17,29 @@ class_name = "AnilistDo" [[migrations]] tag = "v1" +new_classes = ["AniwatchApiContainer"] + +[[migrations]] +tag = "<v2>" +deleted_classes = ["AniwatchApiContainer"] new_classes = ["AnilistDo"] + +[[queues.producers]] +queue = "anilist-updates" +binding = "ANILIST_UPDATES" + +[[queues.producers]] +queue = "new-episode" +binding = "NEW_EPISODE" + +[[queues.consumers]] +queue = "anilist-updates" + +[[queues.consumers]] +queue = "new-episode" + +[[d1_databases]] +binding = "DB" +database_name = "aniplay" +database_id = "5083d01d-7444-4336-a629-7c3e2002b13d" +migrations_dir = "drizzle"