Всем привет!
Есть приложение, которое асинхронно выполняет множество запросов (парсинг). Раньше это всё висело в одном потоке и работало медленно, но стабильно. Чуть позже выполнение запросов было вынесено на отдельные потоки (worker threads), что значительно улучшило скорость работы, правда, со временем я стал замечать что каждый следующий запрос выполняется всё медленнее и медленнее, вплоть до ошибок по типу timeout или socket hung up.
Причем, это не проблема именно интернет канала, сайтов, используемых пакетов и т.д, так как если перезапустить приложение, то всё работает также быстро, как будто ничего и не было.
NodeJS 18 версии. Пакеты axios + socks-proxy-agent (до этого были другие, в основном всякие доработки axios).
Для работы всего этого был написан простенький класс который равномерно раздаёт запросы по воркерам:
ThreadPool.jsconst Worker = require('worker_threads').Worker;
const Mutex = require('async-mutex').Mutex;
class ThreadPool {
workers = [];
counter = 0;
promises = new Map();
mutes = new Mutex();
constructor(filename, threads = 1) {
for (let i = 0; i < threads; i++) {
const worker = new Worker(filename);
worker.on('error', (error) => this.workers = this.workers.filter(item => item !== worker));
worker.on('message', (message) => {
const id = message.id;
const data = message.data;
const promise = this.promises.get(id);
if (promise === undefined) return;
typeof data === 'string' ? promise.reject(data) : promise.resolve(data);
this.promises.delete(id);
});
this.workers.push(worker);
}
}
async exec(data) {
const id = await this.mutes.runExclusive(() => this.counter++);
const worker = this.workers[id % this.workers.length];
return await new Promise((resolve, reject) => {
this.promises.set(id, {resolve: resolve, reject: reject});
worker.postMessage({id: id, data: data});
});
}
}
И сам файл воркера:
ProxyWorker.jsconst ParentPort = require('worker_threads').parentPort;
const socksProxyAgent = require('socks-proxy-agent').SocksProxyAgent;
const axios = require('axios');
const httpAgents = new Map();
ParentPort.on('message', async (message) => {
try {
const config = message.data;
const id = config.proxy.id;
// Создаём только один экземпляр агента для каждого прокси и в будущем переиспользуем его.
let agents = httpAgents.get(id);
if (agents === undefined) {
const agent = new socksProxyAgent({
hostname: config.proxy.host,
port: config.proxy.port,
userId: config.proxy.username,
password: config.proxy.password,
timeout: 30000
});
agents = {
httpAgent: agent,
httpsAgent: agent
}
httpAgents.set(id, agents);
}
delete config.proxy;
config.httpAgent = agents.httpAgent;
config.httpsAgent = agents.httpsAgent;
const response = await axios.request(config);
ParentPort.postMessage({
id: message.id,
data: {
status: response.status,
statusText: response.statusText,
headers: response.headers,
data: response.data
}
});
} catch (error) {
ParentPort.postMessage({id: message.id, data: error.toString()});
}
});
P.s запросы вообще перестают выполнятся спустя около 100-150 тысяч запросов, при этом если замедлить работу то всё вроде как приходит в норму (но вряд ли надолго). Есть ощущение что не хватает ресурсов, или он не может их по каким либо причинам использовать, при этом на самом хосте их более чем достаточно.