custom queue
This commit is contained in:
@ -2,13 +2,15 @@ import { Keyboard, MessageContext } from "vk-io";
|
||||
import { balabola } from "../balabola_api";
|
||||
import { Composer } from "../composer";
|
||||
import { is, isHasText } from "../filters";
|
||||
import { balabolaQueue } from "../queue";
|
||||
import { throttledQueue } from "../utilities/throttledQueue";
|
||||
|
||||
export const composer = new Composer<MessageContext>();
|
||||
|
||||
const filter = composer
|
||||
.filter(is(isHasText), composer.compose());
|
||||
|
||||
const throttledBalabola = throttledQueue(1, 1000);
|
||||
|
||||
const selectStyleKeyboard = <C extends MessageContext>(ctx: C) => {
|
||||
return Keyboard.builder()
|
||||
.textButton({ label: '1', payload: { command: 'пр 0 ' + ctx.$match[1] } })
|
||||
@ -48,13 +50,13 @@ filter.hear(/^(?:пр)\s(1|2|3|4|5|6|7)\s(.*)?$/i, async ctx => {
|
||||
const query = ctx.$match[2];
|
||||
|
||||
await ctx.send('генерирую!!!');
|
||||
|
||||
const result = await balabolaQueue.add(() => balabola(query, BALABOLA_INTROS[+intro - 1]));
|
||||
const result = await throttledBalabola(() => balabola(query, BALABOLA_INTROS[+intro - 1]));
|
||||
await ctx.send(result);
|
||||
});
|
||||
|
||||
filter.use(async ctx => {
|
||||
if (ctx.isChat) return;
|
||||
const result = await balabolaQueue.add(() => balabola(ctx.text!, 0));
|
||||
await ctx.send('генерирую!!');
|
||||
const result = await throttledBalabola(() => balabola(ctx.text!, 0));
|
||||
await ctx.send(result);
|
||||
});
|
||||
|
11
src/queue.ts
11
src/queue.ts
@ -1,11 +0,0 @@
|
||||
import PQueue from 'p-queue';
|
||||
import { logger } from './logger';
|
||||
|
||||
export const balabolaQueue = new PQueue({
|
||||
concurrency: 1,
|
||||
interval: 2000,
|
||||
intervalCap: 1,
|
||||
carryoverConcurrencyCount: true
|
||||
});
|
||||
|
||||
balabolaQueue.on('add', () => logger.info('new balabola job added to queue'));
|
@ -1,5 +0,0 @@
|
||||
function isNumber(value: string | number): boolean {
|
||||
return ((value != null) &&
|
||||
(value !== '') &&
|
||||
!isNaN(Number(value.toString())));
|
||||
}
|
53
src/utilities/throttledQueue.ts
Normal file
53
src/utilities/throttledQueue.ts
Normal file
@ -0,0 +1,53 @@
|
||||
export function throttledQueue(
|
||||
maxRequestsPerInterval: number,
|
||||
interval: number,
|
||||
evenlySpaced = false,
|
||||
) {
|
||||
if (evenlySpaced) {
|
||||
interval = interval / maxRequestsPerInterval;
|
||||
maxRequestsPerInterval = 1;
|
||||
}
|
||||
const queue: Array<() => Promise<void>> = [];
|
||||
let lastIntervalStart = 0;
|
||||
let numRequestsPerInterval = 0;
|
||||
let timeout: NodeJS.Timeout | undefined;
|
||||
const dequeue = () => {
|
||||
const intervalEnd = lastIntervalStart + interval;
|
||||
const now = Date.now();
|
||||
if (now < intervalEnd) {
|
||||
timeout !== undefined && clearTimeout(timeout);
|
||||
timeout = setTimeout(dequeue, intervalEnd - now);
|
||||
return;
|
||||
}
|
||||
lastIntervalStart = now;
|
||||
numRequestsPerInterval = 0;
|
||||
for (const callback of queue.splice(0, maxRequestsPerInterval)) {
|
||||
numRequestsPerInterval++;
|
||||
void callback();
|
||||
}
|
||||
if (queue.length) {
|
||||
timeout = setTimeout(dequeue, interval);
|
||||
} else {
|
||||
timeout = undefined;
|
||||
}
|
||||
};
|
||||
|
||||
return <Return = unknown>(fn: () => Promise<Return> | Return): Promise<Return> => new Promise<Return>(
|
||||
(resolve, reject) => {
|
||||
const callback = () => Promise.resolve().then(fn).then(resolve).catch(reject);
|
||||
const now = Date.now();
|
||||
if (timeout === undefined && (now - lastIntervalStart) > interval) {
|
||||
lastIntervalStart = now;
|
||||
numRequestsPerInterval = 0;
|
||||
}
|
||||
if (numRequestsPerInterval++ < maxRequestsPerInterval) {
|
||||
void callback();
|
||||
} else {
|
||||
queue.push(callback);
|
||||
if (timeout === undefined) {
|
||||
timeout = setTimeout(dequeue, lastIntervalStart + interval - now);
|
||||
}
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
Reference in New Issue
Block a user