reorganize worker management and queue management to be better

This commit is contained in:
Ean Milligan 2025-05-01 23:19:08 -04:00
parent 6ab0923d71
commit 7053c25719
10 changed files with 284 additions and 231 deletions

View File

@ -0,0 +1,5 @@
let currentWorkers = 0;
export const addWorker = () => currentWorkers++;
export const removeWorker = () => currentWorkers--;
export const getWorkerCnt = () => currentWorkers;

26
src/artigen/managers/manager.d.ts vendored Normal file
View File

@ -0,0 +1,26 @@
import { DiscordenoMessage } from '@discordeno';
import { RollModifiers } from 'src/mod.d.ts';
// QueuedRoll is the structure to track rolls we could not immediately handle
interface BaseQueuedRoll {
rollCmd: string;
modifiers: RollModifiers;
originalCommand: string;
}
export interface ApiQueuedRoll extends BaseQueuedRoll {
apiRoll: true;
api: {
resolve: (value: Response | PromiseLike<Response>) => void;
channelId: bigint;
userId: bigint;
};
}
export interface DDQueuedRoll extends BaseQueuedRoll {
apiRoll: false;
dd: {
myResponse: DiscordenoMessage;
originalMessage: DiscordenoMessage;
};
}
export type QueuedRoll = ApiQueuedRoll | DDQueuedRoll;

View File

@ -2,35 +2,36 @@ import { log, LogTypes as LT } from '@Log4Deno';
import config from '~config';
import { currentWorkers, handleRollWorker } from 'artigen/managers/workerManager.ts';
import { getWorkerCnt } from 'artigen/managers/countManager.ts';
import { QueuedRoll } from 'artigen/managers/manager.d.ts';
import { handleRollRequest } from 'artigen/managers/workerManager.ts';
import { infoColor2, rollingEmbed } from 'src/commandUtils.ts';
import { QueuedRoll } from 'src/mod.d.ts';
import utils from 'src/utils.ts';
const rollQueue: Array<QueuedRoll> = [];
// Runs the roll or queues it depending on how many workers are currently running
export const queueRoll = (rq: QueuedRoll) => {
if (rq.apiRoll) {
handleRollWorker(rq);
} else if (!rollQueue.length && currentWorkers < config.limits.maxWorkers) {
handleRollWorker(rq);
export const sendRollRequest = (rollRequest: QueuedRoll) => {
if (rollRequest.apiRoll) {
handleRollRequest(rollRequest);
} else if (!rollQueue.length && getWorkerCnt() < config.limits.maxWorkers) {
handleRollRequest(rollRequest);
} else {
rollQueue.push(rq);
rq.dd.m
rollQueue.push(rollRequest);
rollRequest.dd.myResponse
.edit({
embeds: [
{
color: infoColor2,
title: `${config.name} currently has its hands full and has queued your roll.`,
description: `There are currently ${currentWorkers + rollQueue.length} rolls ahead of this roll.
description: `There are currently ${getWorkerCnt() + rollQueue.length} rolls ahead of this roll.
The results for this roll will replace this message when it is done.`,
},
],
})
.catch((e: Error) => utils.commonLoggers.messageEditError('rollQueue.ts:197', rq.dd.m, e));
.catch((e: Error) => utils.commonLoggers.messageEditError('rollQueue.ts:197', rollRequest.dd.myResponse, e));
}
};
@ -38,15 +39,15 @@ The results for this roll will replace this message when it is done.`,
setInterval(() => {
log(
LT.LOG,
`Checking rollQueue for items, rollQueue length: ${rollQueue.length}, currentWorkers: ${currentWorkers}, config.limits.maxWorkers: ${config.limits.maxWorkers}`,
`Checking rollQueue for items, rollQueue length: ${rollQueue.length}, currentWorkers: ${getWorkerCnt()}, config.limits.maxWorkers: ${config.limits.maxWorkers}`,
);
if (rollQueue.length && currentWorkers < config.limits.maxWorkers) {
const temp = rollQueue.shift();
if (temp && !temp.apiRoll) {
temp.dd.m.edit(rollingEmbed).catch((e: Error) => utils.commonLoggers.messageEditError('rollQueue.ts:208', temp.dd.m, e));
handleRollWorker(temp);
} else if (temp && temp.apiRoll) {
handleRollWorker(temp);
if (rollQueue.length && getWorkerCnt() < config.limits.maxWorkers) {
const rollRequest = rollQueue.shift();
if (rollRequest && !rollRequest.apiRoll) {
rollRequest.dd.myResponse.edit(rollingEmbed).catch((e: Error) => utils.commonLoggers.messageEditError('rollQueue.ts:208', rollRequest.dd.myResponse, e));
handleRollRequest(rollRequest);
} else if (rollRequest && rollRequest.apiRoll) {
handleRollRequest(rollRequest);
}
}
}, 1000);

View File

@ -0,0 +1,164 @@
import { DiscordenoMessage, sendDirectMessage, sendMessage } from '@discordeno';
import { log, LogTypes as LT } from '@Log4Deno';
import config from '~config';
import { DEVMODE } from '~flags';
import { loggingEnabled } from 'artigen/rollUtils.ts';
import { SolvedRoll } from 'artigen/solver.d.ts';
import { removeWorker } from 'artigen/managers/countManager.ts';
import { QueuedRoll } from 'artigen/managers/manager.d.ts';
import dbClient from 'db/client.ts';
import { queries } from 'db/common.ts';
import stdResp from 'endpoints/stdResponses.ts';
import { generateCountDetailsEmbed, generateDMFailed, generateRollEmbed } from 'src/commandUtils.ts';
import utils from 'src/utils.ts';
export const onWorkerComplete = async (workerMessage: MessageEvent<SolvedRoll>, workerTimeout: number, rollRequest: QueuedRoll) => {
let apiErroredOut = false;
try {
removeWorker();
clearTimeout(workerTimeout);
// gmModifiers used to create gmEmbed (basically just turn off the gmRoll)
const gmModifiers = JSON.parse(JSON.stringify(rollRequest.modifiers));
gmModifiers.gmRoll = false;
const returnMsg = workerMessage.data;
loggingEnabled && log(LT.LOG, `Roll came back from worker: ${returnMsg.line1.length} |&| ${returnMsg.line2.length} |&| ${returnMsg.line3.length} `);
loggingEnabled && log(LT.LOG, `Roll came back from worker: ${returnMsg.line1} |&| ${returnMsg.line2} |&| ${returnMsg.line3} `);
const pubEmbedDetails = await generateRollEmbed(
rollRequest.apiRoll ? rollRequest.api.userId : rollRequest.dd.originalMessage.authorId,
returnMsg,
rollRequest.modifiers,
);
const gmEmbedDetails = await generateRollEmbed(
rollRequest.apiRoll ? rollRequest.api.userId : rollRequest.dd.originalMessage.authorId,
returnMsg,
gmModifiers,
);
const countEmbed = generateCountDetailsEmbed(returnMsg.counts);
loggingEnabled && log(LT.LOG, `Embeds are generated: ${JSON.stringify(pubEmbedDetails)} |&| ${JSON.stringify(gmEmbedDetails)}`);
// If there was an error, report it to the user in hopes that they can determine what they did wrong
if (returnMsg.error) {
if (rollRequest.apiRoll) {
rollRequest.api.resolve(stdResp.InternalServerError(returnMsg.errorMsg));
} else {
rollRequest.dd.myResponse.edit({ embeds: [pubEmbedDetails.embed] });
}
if (rollRequest.apiRoll || (DEVMODE && config.logRolls)) {
// If enabled, log rolls so we can see what went wrong
dbClient
.execute(queries.insertRollLogCmd(rollRequest.apiRoll ? 1 : 0, 1), [
rollRequest.originalCommand,
returnMsg.errorCode,
rollRequest.apiRoll ? null : rollRequest.dd.myResponse.id,
])
.catch((e) => utils.commonLoggers.dbError('rollQueue.ts:82', 'insert into', e));
}
} else {
let newMsg: DiscordenoMessage | void = undefined;
// Determine if we are to send a GM roll or a normal roll
if (rollRequest.modifiers.gmRoll) {
if (rollRequest.apiRoll) {
newMsg = await sendMessage(rollRequest.api.channelId, {
content: rollRequest.modifiers.apiWarn,
embeds: [pubEmbedDetails.embed],
}).catch(() => {
apiErroredOut = true;
rollRequest.api.resolve(stdResp.InternalServerError('Message failed to send - location 0.'));
});
} else {
// Send the public embed to correct channel
rollRequest.dd.myResponse.edit({ embeds: [pubEmbedDetails.embed] });
}
if (!apiErroredOut) {
// And message the full details to each of the GMs, alerting roller of every GM that could not be messaged
rollRequest.modifiers.gms.forEach(async (gm) => {
log(LT.LOG, `Messaging GM ${gm}`);
// Attempt to DM the GM and send a warning if it could not DM a GM
await sendDirectMessage(BigInt(gm.substring(2, gm.length - 1)), {
embeds: rollRequest.modifiers.count ? [gmEmbedDetails.embed, countEmbed] : [gmEmbedDetails.embed],
})
.then(async () => {
// Check if we need to attach a file and send it after the initial details sent
if (gmEmbedDetails.hasAttachment) {
await sendDirectMessage(BigInt(gm.substring(2, gm.length - 1)), {
file: gmEmbedDetails.attachment,
}).catch(() => {
if (newMsg && rollRequest.apiRoll) {
newMsg.reply(generateDMFailed(gm));
} else if (!rollRequest.apiRoll) {
rollRequest.dd.originalMessage.reply(generateDMFailed(gm));
}
});
}
})
.catch(() => {
if (rollRequest.apiRoll && newMsg) {
newMsg.reply(generateDMFailed(gm));
} else if (!rollRequest.apiRoll) {
rollRequest.dd.originalMessage.reply(generateDMFailed(gm));
}
});
});
}
} else {
// Not a gm roll, so just send normal embed to correct channel
if (rollRequest.apiRoll) {
newMsg = await sendMessage(rollRequest.api.channelId, {
content: rollRequest.modifiers.apiWarn,
embeds: rollRequest.modifiers.count ? [pubEmbedDetails.embed, countEmbed] : [pubEmbedDetails.embed],
}).catch(() => {
apiErroredOut = true;
rollRequest.api.resolve(stdResp.InternalServerError('Message failed to send - location 1.'));
});
} else {
newMsg = await rollRequest.dd.myResponse.edit({
embeds: rollRequest.modifiers.count ? [pubEmbedDetails.embed, countEmbed] : [pubEmbedDetails.embed],
});
}
if (pubEmbedDetails.hasAttachment && newMsg) {
// Attachment requires you to send a new message
newMsg.reply({
file: pubEmbedDetails.attachment,
});
}
}
if (rollRequest.apiRoll && !apiErroredOut) {
dbClient
.execute(queries.insertRollLogCmd(1, 0), [rollRequest.originalCommand, returnMsg.errorCode, newMsg ? newMsg.id : null])
.catch((e) => utils.commonLoggers.dbError('rollQueue.ts:155', 'insert into', e));
rollRequest.api.resolve(
stdResp.OK(
JSON.stringify(
rollRequest.modifiers.count
? {
counts: countEmbed,
details: pubEmbedDetails,
}
: {
details: pubEmbedDetails,
},
),
),
);
}
}
} catch (e) {
log(LT.ERROR, `Unhandled rollRequest Error: ${JSON.stringify(e)}`);
if (rollRequest.apiRoll && !apiErroredOut) {
rollRequest.api.resolve(stdResp.InternalServerError(JSON.stringify(e)));
}
}
};

View File

@ -0,0 +1,13 @@
import { log, LogTypes as LT } from '@Log4Deno';
import { loggingEnabled } from 'artigen/rollUtils.ts';
import { QueuedRoll } from 'artigen/managers/manager.d.ts';
export const onWorkerReady = (rollWorker: Worker, rollRequest: QueuedRoll) => {
loggingEnabled && log(LT.LOG, `Sending roll to worker: ${rollRequest.rollCmd}, ${JSON.stringify(rollRequest.modifiers)}`);
rollWorker.postMessage({
rollCmd: rollRequest.rollCmd,
modifiers: rollRequest.modifiers,
});
};

View File

@ -0,0 +1,37 @@
import { SolvedRoll } from 'artigen/solver.d.ts';
import { removeWorker } from 'artigen/managers/countManager.ts';
import { QueuedRoll } from 'artigen/managers/manager.d.ts';
import stdResp from 'endpoints/stdResponses.ts';
import { generateRollEmbed } from 'src/commandUtils.ts';
import utils from 'src/utils.ts';
import { RollModifiers } from 'src/mod.d.ts';
export const terminateWorker = async (rollWorker: Worker, rollRequest: QueuedRoll) => {
rollWorker.terminate();
removeWorker();
if (rollRequest.apiRoll) {
rollRequest.api.resolve(stdResp.RequestTimeout('Roll took too long to process, try breaking roll down into simpler parts'));
} else {
rollRequest.dd.myResponse
.edit({
embeds: [
(
await generateRollEmbed(
rollRequest.dd.originalMessage.authorId,
<SolvedRoll> {
error: true,
errorCode: 'TooComplex',
errorMsg: 'Error: Roll took too long to process, try breaking roll down into simpler parts',
},
<RollModifiers> {},
)
).embed,
],
})
.catch((e) => utils.commonLoggers.messageEditError('rollQueue.ts:51', rollRequest.dd.myResponse, e));
}
};

View File

@ -1,192 +1,23 @@
import { DiscordenoMessage, sendDirectMessage, sendMessage } from '@discordeno';
import { log, LogTypes as LT } from '@Log4Deno';
import config from '~config';
import { DEVMODE } from '~flags';
import { loggingEnabled } from 'artigen/rollUtils.ts';
import { SolvedRoll } from 'artigen/solver.d.ts';
import { addWorker } from 'artigen/managers/countManager.ts';
import { QueuedRoll } from 'artigen/managers/manager.d.ts';
import dbClient from 'db/client.ts';
import { queries } from 'db/common.ts';
import stdResp from 'endpoints/stdResponses.ts';
import { generateCountDetailsEmbed, generateDMFailed, generateRollEmbed } from 'src/commandUtils.ts';
import { QueuedRoll, RollModifiers } from 'src/mod.d.ts';
import utils from 'src/utils.ts';
export let currentWorkers = 0;
import { onWorkerComplete } from 'artigen/managers/worker/complete.ts';
import { onWorkerReady } from 'artigen/managers/worker/ready.ts';
import { terminateWorker } from 'artigen/managers/worker/terminate.ts';
export const handleRollRequest = (rollRequest: QueuedRoll) => {
// Handle setting up and calling the rollWorker
export const handleRollWorker = (rq: QueuedRoll) => {
currentWorkers++;
// gmModifiers used to create gmEmbed (basically just turn off the gmRoll)
const gmModifiers = JSON.parse(JSON.stringify(rq.modifiers));
gmModifiers.gmRoll = false;
addWorker();
const rollWorker = new Worker(new URL('../rollWorker.ts', import.meta.url).href, { type: 'module' });
const workerTimeout = setTimeout(() => terminateWorker(rollWorker, rollRequest), config.limits.workerTimeout);
const workerTimeout = setTimeout(async () => {
rollWorker.terminate();
currentWorkers--;
if (rq.apiRoll) {
rq.api.resolve(stdResp.RequestTimeout('Roll took too long to process, try breaking roll down into simpler parts'));
} else {
rq.dd.m
.edit({
embeds: [
(
await generateRollEmbed(
rq.dd.message.authorId,
<SolvedRoll> {
error: true,
errorCode: 'TooComplex',
errorMsg: 'Error: Roll took too long to process, try breaking roll down into simpler parts',
},
<RollModifiers> {},
)
).embed,
],
})
.catch((e) => utils.commonLoggers.messageEditError('rollQueue.ts:51', rq.dd.m, e));
}
}, config.limits.workerTimeout);
rollWorker.addEventListener('message', async (workerMessage) => {
// Handle events from the worker
rollWorker.addEventListener('message', (workerMessage) => {
if (workerMessage.data === 'ready') {
loggingEnabled && log(LT.LOG, `Sending roll to worker: ${rq.rollCmd}, ${JSON.stringify(rq.modifiers)}`);
rollWorker.postMessage({
rollCmd: rq.rollCmd,
modifiers: rq.modifiers,
});
return;
}
let apiErroredOut = false;
try {
currentWorkers--;
clearTimeout(workerTimeout);
const returnMsg = workerMessage.data;
loggingEnabled && log(LT.LOG, `Roll came back from worker: ${returnMsg.line1.length} |&| ${returnMsg.line2.length} |&| ${returnMsg.line3.length} `);
loggingEnabled && log(LT.LOG, `Roll came back from worker: ${returnMsg.line1} |&| ${returnMsg.line2} |&| ${returnMsg.line3} `);
const pubEmbedDetails = await generateRollEmbed(rq.apiRoll ? rq.api.userId : rq.dd.message.authorId, returnMsg, rq.modifiers);
const gmEmbedDetails = await generateRollEmbed(rq.apiRoll ? rq.api.userId : rq.dd.message.authorId, returnMsg, gmModifiers);
const countEmbed = generateCountDetailsEmbed(returnMsg.counts);
loggingEnabled && log(LT.LOG, `Embeds are generated: ${JSON.stringify(pubEmbedDetails)} |&| ${JSON.stringify(gmEmbedDetails)}`);
// If there was an error, report it to the user in hopes that they can determine what they did wrong
if (returnMsg.error) {
if (rq.apiRoll) {
rq.api.resolve(stdResp.InternalServerError(returnMsg.errorMsg));
} else {
rq.dd.m.edit({ embeds: [pubEmbedDetails.embed] });
}
if (rq.apiRoll || (DEVMODE && config.logRolls)) {
// If enabled, log rolls so we can see what went wrong
dbClient
.execute(queries.insertRollLogCmd(rq.apiRoll ? 1 : 0, 1), [rq.originalCommand, returnMsg.errorCode, rq.apiRoll ? null : rq.dd.m.id])
.catch((e) => utils.commonLoggers.dbError('rollQueue.ts:82', 'insert into', e));
}
} else {
let n: DiscordenoMessage | void = undefined;
// Determine if we are to send a GM roll or a normal roll
if (rq.modifiers.gmRoll) {
if (rq.apiRoll) {
n = await sendMessage(rq.api.channelId, {
content: rq.modifiers.apiWarn,
embeds: [pubEmbedDetails.embed],
}).catch(() => {
apiErroredOut = true;
rq.api.resolve(stdResp.InternalServerError('Message failed to send - location 0.'));
});
} else {
// Send the public embed to correct channel
rq.dd.m.edit({ embeds: [pubEmbedDetails.embed] });
}
if (!apiErroredOut) {
// And message the full details to each of the GMs, alerting roller of every GM that could not be messaged
rq.modifiers.gms.forEach(async (gm) => {
log(LT.LOG, `Messaging GM ${gm}`);
// Attempt to DM the GM and send a warning if it could not DM a GM
await sendDirectMessage(BigInt(gm.substring(2, gm.length - 1)), {
embeds: rq.modifiers.count ? [gmEmbedDetails.embed, countEmbed] : [gmEmbedDetails.embed],
})
.then(async () => {
// Check if we need to attach a file and send it after the initial details sent
if (gmEmbedDetails.hasAttachment) {
await sendDirectMessage(BigInt(gm.substring(2, gm.length - 1)), {
file: gmEmbedDetails.attachment,
}).catch(() => {
if (n && rq.apiRoll) {
n.reply(generateDMFailed(gm));
} else if (!rq.apiRoll) {
rq.dd.message.reply(generateDMFailed(gm));
}
});
}
})
.catch(() => {
if (rq.apiRoll && n) {
n.reply(generateDMFailed(gm));
} else if (!rq.apiRoll) {
rq.dd.message.reply(generateDMFailed(gm));
}
});
});
}
} else {
// Not a gm roll, so just send normal embed to correct channel
if (rq.apiRoll) {
n = await sendMessage(rq.api.channelId, {
content: rq.modifiers.apiWarn,
embeds: rq.modifiers.count ? [pubEmbedDetails.embed, countEmbed] : [pubEmbedDetails.embed],
}).catch(() => {
apiErroredOut = true;
rq.api.resolve(stdResp.InternalServerError('Message failed to send - location 1.'));
});
} else {
n = await rq.dd.m.edit({
embeds: rq.modifiers.count ? [pubEmbedDetails.embed, countEmbed] : [pubEmbedDetails.embed],
});
}
if (pubEmbedDetails.hasAttachment && n) {
// Attachment requires you to send a new message
n.reply({
file: pubEmbedDetails.attachment,
});
}
}
if (rq.apiRoll && !apiErroredOut) {
dbClient
.execute(queries.insertRollLogCmd(1, 0), [rq.originalCommand, returnMsg.errorCode, n ? n.id : null])
.catch((e) => utils.commonLoggers.dbError('rollQueue.ts:155', 'insert into', e));
rq.api.resolve(
stdResp.OK(
JSON.stringify(
rq.modifiers.count
? {
counts: countEmbed,
details: pubEmbedDetails,
}
: {
details: pubEmbedDetails,
},
),
),
);
}
}
} catch (e) {
log(LT.ERROR, `Unhandled RQ Error: ${JSON.stringify(e)}`);
if (rq.apiRoll && !apiErroredOut) {
rq.api.resolve(stdResp.InternalServerError(JSON.stringify(e)));
}
return onWorkerReady(rollWorker, rollRequest);
}
onWorkerComplete(workerMessage, workerTimeout, rollRequest);
});
};

View File

@ -4,7 +4,7 @@ import { log, LogTypes as LT } from '@Log4Deno';
import config from '~config';
import { DEVMODE } from '~flags';
import { queueRoll } from 'artigen/managers/queueManager.ts';
import { sendRollRequest } from 'artigen/managers/queueManager.ts';
import dbClient from 'db/client.ts';
import { queries } from 'db/common.ts';
@ -52,9 +52,9 @@ export const roll = async (message: DiscordenoMessage, args: string[], command:
// Rejoin all of the args and send it into the solver, if solver returns a falsy item, an error object will be substituted in
const rollCmd = message.content.substring(config.prefix.length);
queueRoll({
sendRollRequest({
apiRoll: false,
dd: { m, message },
dd: { myResponse: m, originalMessage: message },
rollCmd,
modifiers,
originalCommand,

View File

@ -3,7 +3,7 @@ import { log, LogTypes as LT } from '@Log4Deno';
import config from '~config';
import { queueRoll } from 'artigen/managers/queueManager.ts';
import { sendRollRequest } from 'artigen/managers/queueManager.ts';
import dbClient from 'db/client.ts';
import { queries } from 'db/common.ts';
@ -97,7 +97,7 @@ export const apiRoll = async (query: Map<string, string>, apiUserid: bigint): Pr
};
return new Promise<Response>((resolve) => {
queueRoll({
sendRollRequest({
apiRoll: true,
api: { resolve, channelId: BigInt(query.get('channel') || '0'), userId: BigInt(query.get('user') || '') },
rollCmd,

26
src/mod.d.ts vendored
View File

@ -1,5 +1,3 @@
import { DiscordenoMessage } from '@discordeno';
// EmojiConf is used as a structure for the emojis stored in config.ts
export type EmojiConf = {
name: string;
@ -26,29 +24,7 @@ export type RollModifiers = {
apiWarn: string;
};
// QueuedRoll is the structure to track rolls we could not immediately handle
interface BaseQueuedRoll {
rollCmd: string;
modifiers: RollModifiers;
originalCommand: string;
}
export interface ApiQueuedRoll extends BaseQueuedRoll {
apiRoll: true;
api: {
resolve: (value: Response | PromiseLike<Response>) => void;
channelId: bigint;
userId: bigint;
};
}
export interface DDQueuedRoll extends BaseQueuedRoll {
apiRoll: false;
dd: {
m: DiscordenoMessage;
message: DiscordenoMessage;
};
}
export type QueuedRoll = ApiQueuedRoll | DDQueuedRoll;
// PastCommandCount is used in calculating the hourly rate of commands
export type PastCommandCount = {
command: string;
count: number;