add queue support for archive delivery
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
import { getConfig } from "./common/config.js";
|
||||
import { watchAudits } from "./common/queue.js";
|
||||
import { getPool } from "./db/index.js";
|
||||
|
||||
export async function loadApps() {
|
||||
@@ -14,5 +15,8 @@ export async function loadApps() {
|
||||
`, [
|
||||
type,
|
||||
]);
|
||||
if (type === 'core')
|
||||
watchAudits();
|
||||
}
|
||||
await client.release();
|
||||
}
|
||||
|
||||
182
src/common/queue.js
Normal file
182
src/common/queue.js
Normal file
@@ -0,0 +1,182 @@
|
||||
import { getPool } from "../db/index.js";
|
||||
import { sleep } from "./sleep.js";
|
||||
import axios from 'axios';
|
||||
|
||||
const backOffUrls = {};
|
||||
|
||||
async function getQueue(client, type, batchSize) {
|
||||
const now = new Date();
|
||||
const sql = `
|
||||
SELECT
|
||||
q.id,
|
||||
qu.id AS queue_url_id,
|
||||
qu.value AS url,
|
||||
q.headers,
|
||||
q.data,
|
||||
q.run_count
|
||||
FROM queue q
|
||||
INNER JOIN queue_type qt ON q.queue_type_id = qt.id
|
||||
INNER JOIN queue_url qu ON q.queue_url_id = qu.id
|
||||
WHERE qt.value = $1
|
||||
AND qu.next_run_ts <= $2
|
||||
ORDER BY qu.next_run_ts ASC, q.id ASC
|
||||
LIMIT $3;
|
||||
`;
|
||||
const data = [
|
||||
type,
|
||||
now,
|
||||
batchSize,
|
||||
];
|
||||
const res = await client.query(sql, data);
|
||||
return res.rows;
|
||||
}
|
||||
|
||||
export async function putQueue(client, type, url, headers, data) {
|
||||
await client.query(`
|
||||
INSERT INTO queue_url (
|
||||
value
|
||||
) VALUES (
|
||||
$1
|
||||
) ON CONFLICT DO NOTHING;
|
||||
`, [
|
||||
url,
|
||||
]);
|
||||
await client.query(`
|
||||
INSERT INTO queue (
|
||||
queue_type_id,
|
||||
queue_url_id,
|
||||
headers,
|
||||
data
|
||||
) VALUES (
|
||||
(
|
||||
SELECT id
|
||||
FROM queue_type
|
||||
WHERE value = $1
|
||||
),
|
||||
(
|
||||
SELECT id
|
||||
FROM queue_url
|
||||
WHERE value = $2
|
||||
),
|
||||
$3,
|
||||
$4
|
||||
);
|
||||
`, [
|
||||
type,
|
||||
url,
|
||||
headers,
|
||||
data
|
||||
]);
|
||||
}
|
||||
|
||||
async function updateQueue(client, item, lastFail) {
|
||||
// Queue back off
|
||||
// ==============
|
||||
// - Start with 30 seconds
|
||||
// - Double for every run
|
||||
// - Max 120 minutes
|
||||
const delay = 30;
|
||||
const maxDelay = 120 * 60;
|
||||
const delaySeconds = Math.min(delay * Math.pow(2, item.run_count - 1), maxDelay);
|
||||
const now = new Date();
|
||||
const nextRunTs = new Date(now.getTime() + delaySeconds * 1000);
|
||||
if (backOffUrls[item.queue_url_id]) {
|
||||
if (backOffUrls[item.queue_url_id] < nextRunTs) {
|
||||
backOffUrls[item.queue_url_id] = nextRunTs;
|
||||
} else {
|
||||
nextRunTs = backOffUrls[item.queue_url_id];
|
||||
}
|
||||
} else {
|
||||
backOffUrls[item.queue_url_id] = nextRunTs;
|
||||
}
|
||||
await client.query(`
|
||||
UPDATE queue_url SET
|
||||
next_run_ts = $1,
|
||||
updated_ts = $2
|
||||
WHERE id = $3
|
||||
`, [
|
||||
backOffUrls[item.queue_url_id].toISOString(),
|
||||
now.toISOString(),
|
||||
item.id,
|
||||
]);
|
||||
await client.query(`
|
||||
UPDATE queue SET
|
||||
run_count = $1,
|
||||
last_fail = $2,
|
||||
updated_ts = $3
|
||||
WHERE id = $4
|
||||
`, [
|
||||
parseInt(item.run_count) + 1,
|
||||
lastFail,
|
||||
now.toISOString(),
|
||||
item.id,
|
||||
]);
|
||||
}
|
||||
|
||||
async function deleteQueue(client, id) {
|
||||
await client.query(`
|
||||
DELETE FROM queue
|
||||
WHERE id = $1
|
||||
`, [
|
||||
id,
|
||||
]);
|
||||
}
|
||||
|
||||
async function processBatch(client, type) {
|
||||
const batchSize = 100;
|
||||
const queueList = await getQueue(client, type, batchSize);
|
||||
const now = new Date();
|
||||
for await (const item of queueList) {
|
||||
if (backOffUrls[item.queue_url_id] && backOffUrls[item.queue_url_id] > now)
|
||||
continue;
|
||||
try {
|
||||
let entry = 'unknown';
|
||||
if (item.data.audit) {
|
||||
const uuid = item.data.audit.eventId ?? item.data.audit.agreementId;
|
||||
const auditType = item.data.audit.eventId ? 'event' : 'agreement';
|
||||
entry = `${auditType}:${uuid}`
|
||||
}
|
||||
console.log(`${type.toUpperCase()} - ${entry} (${item.run_count})`);
|
||||
let result;
|
||||
try {
|
||||
result = await axios.post(
|
||||
item.url,
|
||||
item.data,
|
||||
{
|
||||
headers: item.headers,
|
||||
}
|
||||
)
|
||||
} catch (e) {
|
||||
result = e
|
||||
}
|
||||
if (result?.status === 200) {
|
||||
await deleteQueue(client, item.id);
|
||||
} else {
|
||||
let lastFail = `${result.status}`;
|
||||
if (result.response.data.error) {
|
||||
lastFail += ` - ${result.response.data.error}`
|
||||
}
|
||||
await updateQueue(client, item, lastFail);
|
||||
}
|
||||
} catch (e) {
|
||||
console.error(e);
|
||||
}
|
||||
}
|
||||
return queueList.length;
|
||||
}
|
||||
|
||||
async function watchQueue(client, type) {
|
||||
const repeat = 30 * 1000; // seconds
|
||||
while (true) {
|
||||
const count = await processBatch(client, type);
|
||||
if (count === 0) {
|
||||
await sleep(repeat);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function watchAudits() {
|
||||
const client = await getPool();
|
||||
await watchQueue(client, 'audit');
|
||||
await client.release();
|
||||
}
|
||||
45
src/db/migrations/000008 - queue.sql
Normal file
45
src/db/migrations/000008 - queue.sql
Normal file
@@ -0,0 +1,45 @@
|
||||
-- DROP TABLE IF EXISTS public.queue_type CASCADE;
|
||||
CREATE TABLE public.queue_type (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
value TEXT NOT NULL UNIQUE,
|
||||
created_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
updated_ts TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
CREATE INDEX idx__public__queue_type__value ON public.queue_type (value);
|
||||
CREATE INDEX idx__public__queue_type__created_ts ON public.queue_type (created_ts);
|
||||
CREATE INDEX idx__public__queue_type__updated_ts ON public.queue_type (updated_ts);
|
||||
|
||||
INSERT INTO public.queue_type (value) VALUES ('audit') ON CONFLICT DO NOTHING;
|
||||
|
||||
-- DROP TABLE IF EXISTS public.queue_url CASCADE;
|
||||
CREATE TABLE public.queue_url (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
value TEXT NOT NULL UNIQUE,
|
||||
next_run_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
created_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
updated_ts TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
CREATE INDEX idx__public__queue_url__value ON public.queue_url (value);
|
||||
CREATE INDEX idx__public__queue_url__next_run_ts ON public.queue_url (next_run_ts);
|
||||
CREATE INDEX idx__public__queue_url__created_ts ON public.queue_url (created_ts);
|
||||
CREATE INDEX idx__public__queue_url__updated_ts ON public.queue_url (updated_ts);
|
||||
|
||||
|
||||
-- DROP TABLE IF EXISTS public.queue CASCADE;
|
||||
CREATE TABLE public.queue (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
queue_type_id BIGINT NOT NULL REFERENCES public.queue_type(id),
|
||||
queue_url_id BIGINT NOT NULL REFERENCES public.queue_url(id),
|
||||
headers JSON,
|
||||
data JSON,
|
||||
run_count BIGINT NOT NULL DEFAULT 0,
|
||||
last_fail TEXT,
|
||||
last_run_ts TIMESTAMPTZ,
|
||||
created_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
updated_ts TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
CREATE INDEX idx__public__queue__queue_type_id ON public.queue (queue_type_id);
|
||||
CREATE INDEX idx__public__queue__run_count ON public.queue (run_count);
|
||||
CREATE INDEX idx__public__queue__last_run_ts ON public.queue (last_run_ts);
|
||||
CREATE INDEX idx__public__queue__created_ts ON public.queue (created_ts);
|
||||
CREATE INDEX idx__public__queue__updated_ts ON public.queue (updated_ts);
|
||||
@@ -2,7 +2,7 @@ import pkg from '@jlinc/core';
|
||||
const { JlincAgreement, JlincAudit } = pkg;
|
||||
import { getPool } from "../../../db/index.js";
|
||||
import { entity } from "./entity.js";
|
||||
import axios from 'axios';
|
||||
import { putQueue } from "../../../common/queue.js";
|
||||
|
||||
async function getAgreement(client, userId, id) {
|
||||
const sql = `
|
||||
@@ -397,14 +397,14 @@ async function process(input, userId, _client, _agreement) {
|
||||
},
|
||||
}
|
||||
if (input.archive) {
|
||||
await axios.post(
|
||||
await putQueue(
|
||||
client,
|
||||
'audit',
|
||||
`${input.archive.url}/api/v1/audit/put`,
|
||||
response.data.auditData,
|
||||
{
|
||||
headers: {
|
||||
'Authorization': `Bearer ${input.archive.key}`,
|
||||
}
|
||||
}
|
||||
'Authorization': `Bearer ${input.archive.key}`,
|
||||
},
|
||||
response.data.auditData,
|
||||
)
|
||||
}
|
||||
} catch(e) {
|
||||
|
||||
@@ -2,7 +2,7 @@ import pkg from '@jlinc/core';
|
||||
const { JlincEvent, JlincAudit } = pkg;
|
||||
import { getPool } from "../../../db/index.js";
|
||||
import { entity } from "./entity.js";
|
||||
import axios from 'axios';
|
||||
import { putQueue } from "../../../common/queue.js";
|
||||
|
||||
async function getEvent(client, userId, id, includeData, meta) {
|
||||
const dataSql = includeData
|
||||
@@ -340,14 +340,14 @@ async function process(input, userId, _client, _event, _sender, meta) {
|
||||
if (meta) {
|
||||
response.data.auditData.meta = meta;
|
||||
}
|
||||
await axios.post(
|
||||
await putQueue(
|
||||
client,
|
||||
'audit',
|
||||
`${input.archive.url}/api/v1/audit/put`,
|
||||
response.data.auditData,
|
||||
{
|
||||
headers: {
|
||||
'Authorization': `Bearer ${input.archive.key}`,
|
||||
}
|
||||
}
|
||||
'Authorization': `Bearer ${input.archive.key}`,
|
||||
},
|
||||
response.data.auditData,
|
||||
)
|
||||
}
|
||||
} catch(e) {
|
||||
|
||||
Reference in New Issue
Block a user