fix: throws "failed to queue" error when task already exists
This commit is contained in:
@@ -27,6 +27,7 @@
|
||||
"graphql-request": "^7.1.0",
|
||||
"hono": "^4.5.11",
|
||||
"jose": "^5.8.0",
|
||||
"lodash.isequal": "^4.5.0",
|
||||
"lodash.mapkeys": "^4.6.0",
|
||||
"luxon": "^3.5.0",
|
||||
"zod": "^3.23.8"
|
||||
@@ -36,6 +37,7 @@
|
||||
"@cloudflare/workers-types": "^4.20240903.0",
|
||||
"@trivago/prettier-plugin-sort-imports": "^4.3.0",
|
||||
"@types/bun": "^1.1.8",
|
||||
"@types/lodash.isequal": "^4.5.8",
|
||||
"@types/lodash.mapkeys": "^4.6.9",
|
||||
"@types/luxon": "^3.4.2",
|
||||
"drizzle-kit": "^0.22.8",
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import type { HonoRequest } from "hono";
|
||||
import isEqual from "lodash.isequal";
|
||||
import { DateTime, type DurationLike } from "luxon";
|
||||
|
||||
import type { Env } from "~/types/env";
|
||||
@@ -48,6 +49,14 @@ export async function queueTask(
|
||||
const adminSdkCredentials = getAdminSdkCredentials(env);
|
||||
const { projectId } = adminSdkCredentials;
|
||||
|
||||
const task = buildTask(
|
||||
projectId,
|
||||
queueName,
|
||||
scheduleConfig,
|
||||
domain,
|
||||
body,
|
||||
req?.header(),
|
||||
);
|
||||
const res = await fetch(
|
||||
`https://content-cloudtasks.googleapis.com/v2/projects/${projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks?alt=json`,
|
||||
{
|
||||
@@ -55,25 +64,54 @@ export async function queueTask(
|
||||
Authorization: `Bearer ${await getGoogleAuthToken(adminSdkCredentials)}`,
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
body: JSON.stringify({
|
||||
task: buildTask(
|
||||
projectId,
|
||||
queueName,
|
||||
scheduleConfig,
|
||||
domain,
|
||||
body,
|
||||
req?.header(),
|
||||
),
|
||||
}),
|
||||
body: JSON.stringify({ task }),
|
||||
method: "POST",
|
||||
},
|
||||
);
|
||||
|
||||
if (!res.ok) {
|
||||
if (res.status === 409) {
|
||||
if (
|
||||
await checkIfTaskExists(
|
||||
env,
|
||||
queueName,
|
||||
task.name.split("/").at(-1)!,
|
||||
body,
|
||||
)
|
||||
) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
throw new FailedToQueueTaskError(res.status, await res.text());
|
||||
}
|
||||
}
|
||||
|
||||
async function checkIfTaskExists(
|
||||
env: Env,
|
||||
queueName: QueueName,
|
||||
taskId: string,
|
||||
expectedBody: QueueBody[QueueName],
|
||||
) {
|
||||
const adminSdkCredentials = getAdminSdkCredentials(env);
|
||||
|
||||
const body = await fetch(
|
||||
`https://content-cloudtasks.googleapis.com/v2/projects/${adminSdkCredentials.projectId}/locations/northamerica-northeast1/queues/${queueName}/tasks/${taskId}?responseView=FULL`,
|
||||
{
|
||||
headers: {
|
||||
Authorization: `Bearer ${await getGoogleAuthToken(adminSdkCredentials)}`,
|
||||
},
|
||||
},
|
||||
)
|
||||
.then((res) => res.json())
|
||||
.then(({ httpRequest: { body } }) => body);
|
||||
|
||||
return isEqual(
|
||||
JSON.parse(Buffer.from(body as string, "base64").toString()),
|
||||
expectedBody,
|
||||
);
|
||||
}
|
||||
|
||||
function buildTask(
|
||||
projectId: string,
|
||||
queueName: QueueName,
|
||||
|
||||
Reference in New Issue
Block a user