diff --git a/src/apps.js b/src/apps.js index 9e58891..31e52a6 100644 --- a/src/apps.js +++ b/src/apps.js @@ -1,4 +1,5 @@ import { getConfig } from "./common/config.js"; +import { watchAudits } from "./common/queue.js"; import { getPool } from "./db/index.js"; export async function loadApps() { @@ -14,5 +15,8 @@ export async function loadApps() { `, [ type, ]); + if (type === 'core') + watchAudits(); } + await client.release(); } diff --git a/src/common/queue.js b/src/common/queue.js new file mode 100644 index 0000000..2cd2e1b --- /dev/null +++ b/src/common/queue.js @@ -0,0 +1,182 @@ +import { getPool } from "../db/index.js"; +import { sleep } from "./sleep.js"; +import axios from 'axios'; + +const backOffUrls = {}; + +async function getQueue(client, type, batchSize) { + const now = new Date(); + const sql = ` + SELECT + q.id, + qu.id AS queue_url_id, + qu.value AS url, + q.headers, + q.data, + q.run_count + FROM queue q + INNER JOIN queue_type qt ON q.queue_type_id = qt.id + INNER JOIN queue_url qu ON q.queue_url_id = qu.id + WHERE qt.value = $1 + AND qu.next_run_ts <= $2 + ORDER BY qu.next_run_ts ASC, q.id ASC + LIMIT $3; + `; + const data = [ + type, + now, + batchSize, + ]; + const res = await client.query(sql, data); + return res.rows; +} + +export async function putQueue(client, type, url, headers, data) { + await client.query(` + INSERT INTO queue_url ( + value + ) VALUES ( + $1 + ) ON CONFLICT DO NOTHING; + `, [ + url, + ]); + await client.query(` + INSERT INTO queue ( + queue_type_id, + queue_url_id, + headers, + data + ) VALUES ( + ( + SELECT id + FROM queue_type + WHERE value = $1 + ), + ( + SELECT id + FROM queue_url + WHERE value = $2 + ), + $3, + $4 + ); + `, [ + type, + url, + headers, + data + ]); +} + +async function updateQueue(client, item, lastFail) { + // Queue back off + // ============== + // - Start with 30 seconds + // - Double for every run + // - Max 120 minutes + const delay = 30; + const maxDelay = 120 * 60; + const delaySeconds = Math.min(delay * Math.pow(2, item.run_count - 1), maxDelay); + const now = new Date(); + const nextRunTs = new Date(now.getTime() + delaySeconds * 1000); + if (backOffUrls[item.queue_url_id]) { + if (backOffUrls[item.queue_url_id] < nextRunTs) { + backOffUrls[item.queue_url_id] = nextRunTs; + } else { + nextRunTs = backOffUrls[item.queue_url_id]; + } + } else { + backOffUrls[item.queue_url_id] = nextRunTs; + } + await client.query(` + UPDATE queue_url SET + next_run_ts = $1, + updated_ts = $2 + WHERE id = $3 + `, [ + backOffUrls[item.queue_url_id].toISOString(), + now.toISOString(), + item.id, + ]); + await client.query(` + UPDATE queue SET + run_count = $1, + last_fail = $2, + updated_ts = $3 + WHERE id = $4 + `, [ + parseInt(item.run_count) + 1, + lastFail, + now.toISOString(), + item.id, + ]); +} + +async function deleteQueue(client, id) { + await client.query(` + DELETE FROM queue + WHERE id = $1 + `, [ + id, + ]); +} + +async function processBatch(client, type) { + const batchSize = 100; + const queueList = await getQueue(client, type, batchSize); + const now = new Date(); + for await (const item of queueList) { + if (backOffUrls[item.queue_url_id] && backOffUrls[item.queue_url_id] > now) + continue; + try { + let entry = 'unknown'; + if (item.data.audit) { + const uuid = item.data.audit.eventId ?? item.data.audit.agreementId; + const auditType = item.data.audit.eventId ? 'event' : 'agreement'; + entry = `${auditType}:${uuid}` + } + console.log(`${type.toUpperCase()} - ${entry} (${item.run_count})`); + let result; + try { + result = await axios.post( + item.url, + item.data, + { + headers: item.headers, + } + ) + } catch (e) { + result = e + } + if (result?.status === 200) { + await deleteQueue(client, item.id); + } else { + let lastFail = `${result.status}`; + if (result.response.data.error) { + lastFail += ` - ${result.response.data.error}` + } + await updateQueue(client, item, lastFail); + } + } catch (e) { + console.error(e); + } + } + return queueList.length; +} + +async function watchQueue(client, type) { + const repeat = 30 * 1000; // seconds + while (true) { + const count = await processBatch(client, type); + if (count === 0) { + await sleep(repeat); + } + } +} + +export async function watchAudits() { + const client = await getPool(); + await watchQueue(client, 'audit'); + await client.release(); +} diff --git a/src/db/migrations/000008 - queue.sql b/src/db/migrations/000008 - queue.sql new file mode 100644 index 0000000..18827a0 --- /dev/null +++ b/src/db/migrations/000008 - queue.sql @@ -0,0 +1,45 @@ +-- DROP TABLE IF EXISTS public.queue_type CASCADE; +CREATE TABLE public.queue_type ( + id BIGSERIAL PRIMARY KEY, + value TEXT NOT NULL UNIQUE, + created_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_ts TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX idx__public__queue_type__value ON public.queue_type (value); +CREATE INDEX idx__public__queue_type__created_ts ON public.queue_type (created_ts); +CREATE INDEX idx__public__queue_type__updated_ts ON public.queue_type (updated_ts); + +INSERT INTO public.queue_type (value) VALUES ('audit') ON CONFLICT DO NOTHING; + +-- DROP TABLE IF EXISTS public.queue_url CASCADE; +CREATE TABLE public.queue_url ( + id BIGSERIAL PRIMARY KEY, + value TEXT NOT NULL UNIQUE, + next_run_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(), + created_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_ts TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX idx__public__queue_url__value ON public.queue_url (value); +CREATE INDEX idx__public__queue_url__next_run_ts ON public.queue_url (next_run_ts); +CREATE INDEX idx__public__queue_url__created_ts ON public.queue_url (created_ts); +CREATE INDEX idx__public__queue_url__updated_ts ON public.queue_url (updated_ts); + + +-- DROP TABLE IF EXISTS public.queue CASCADE; +CREATE TABLE public.queue ( + id BIGSERIAL PRIMARY KEY, + queue_type_id BIGINT NOT NULL REFERENCES public.queue_type(id), + queue_url_id BIGINT NOT NULL REFERENCES public.queue_url(id), + headers JSON, + data JSON, + run_count BIGINT NOT NULL DEFAULT 0, + last_fail TEXT, + last_run_ts TIMESTAMPTZ, + created_ts TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_ts TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE INDEX idx__public__queue__queue_type_id ON public.queue (queue_type_id); +CREATE INDEX idx__public__queue__run_count ON public.queue (run_count); +CREATE INDEX idx__public__queue__last_run_ts ON public.queue (last_run_ts); +CREATE INDEX idx__public__queue__created_ts ON public.queue (created_ts); +CREATE INDEX idx__public__queue__updated_ts ON public.queue (updated_ts); diff --git a/src/modules/core/data/agreement.js b/src/modules/core/data/agreement.js index 8521154..91f57e5 100644 --- a/src/modules/core/data/agreement.js +++ b/src/modules/core/data/agreement.js @@ -2,7 +2,7 @@ import pkg from '@jlinc/core'; const { JlincAgreement, JlincAudit } = pkg; import { getPool } from "../../../db/index.js"; import { entity } from "./entity.js"; -import axios from 'axios'; +import { putQueue } from "../../../common/queue.js"; async function getAgreement(client, userId, id) { const sql = ` @@ -397,14 +397,14 @@ async function process(input, userId, _client, _agreement) { }, } if (input.archive) { - await axios.post( + await putQueue( + client, + 'audit', `${input.archive.url}/api/v1/audit/put`, - response.data.auditData, { - headers: { - 'Authorization': `Bearer ${input.archive.key}`, - } - } + 'Authorization': `Bearer ${input.archive.key}`, + }, + response.data.auditData, ) } } catch(e) { diff --git a/src/modules/core/data/event.js b/src/modules/core/data/event.js index 09a7a71..3e6adea 100644 --- a/src/modules/core/data/event.js +++ b/src/modules/core/data/event.js @@ -2,7 +2,7 @@ import pkg from '@jlinc/core'; const { JlincEvent, JlincAudit } = pkg; import { getPool } from "../../../db/index.js"; import { entity } from "./entity.js"; -import axios from 'axios'; +import { putQueue } from "../../../common/queue.js"; async function getEvent(client, userId, id, includeData, meta) { const dataSql = includeData @@ -340,14 +340,14 @@ async function process(input, userId, _client, _event, _sender, meta) { if (meta) { response.data.auditData.meta = meta; } - await axios.post( + await putQueue( + client, + 'audit', `${input.archive.url}/api/v1/audit/put`, - response.data.auditData, { - headers: { - 'Authorization': `Bearer ${input.archive.key}`, - } - } + 'Authorization': `Bearer ${input.archive.key}`, + }, + response.data.auditData, ) } } catch(e) {