@quad69

Почему worker-threads постепенно зависают?

Всем привет!

Есть приложение, которое асинхронно выполняет множество запросов (парсинг). Раньше это всё висело в одном потоке и работало медленно, но стабильно. Чуть позже выполнение запросов было вынесено на отдельные потоки (worker threads), что значительно улучшило скорость работы, правда, со временем я стал замечать что каждый следующий запрос выполняется всё медленнее и медленнее, вплоть до ошибок по типу timeout или socket hung up.

Причем, это не проблема именно интернет канала, сайтов, используемых пакетов и т.д, так как если перезапустить приложение, то всё работает также быстро, как будто ничего и не было.

NodeJS 18 версии. Пакеты axios + socks-proxy-agent (до этого были другие, в основном всякие доработки axios).

Для работы всего этого был написан простенький класс который равномерно раздаёт запросы по воркерам:

ThreadPool.js
const Worker = require('worker_threads').Worker;
const Mutex = require('async-mutex').Mutex;

class ThreadPool {
    workers = [];
    counter = 0;
    promises = new Map();
    mutes = new Mutex();

    constructor(filename, threads = 1) {
        for (let i = 0; i < threads; i++) {
            const worker = new Worker(filename);

            worker.on('error', (error) => this.workers = this.workers.filter(item => item !== worker));
            worker.on('message', (message) => {
                const id = message.id;
                const data = message.data;
                const promise = this.promises.get(id);

                if (promise === undefined) return;

                typeof data === 'string' ? promise.reject(data) : promise.resolve(data);

                this.promises.delete(id);
            });

            this.workers.push(worker);
        }
    }

    async exec(data) {
        const id = await this.mutes.runExclusive(() => this.counter++);
        const worker = this.workers[id % this.workers.length];

        return await new Promise((resolve, reject) => {
            this.promises.set(id, {resolve: resolve, reject: reject});
            worker.postMessage({id: id, data: data});
        });
    }
}

И сам файл воркера:
ProxyWorker.js
const ParentPort = require('worker_threads').parentPort;
const socksProxyAgent = require('socks-proxy-agent').SocksProxyAgent;
const axios = require('axios');
const httpAgents = new Map();

ParentPort.on('message', async (message) => {
    try {
        const config = message.data;
        const id = config.proxy.id;

        // Создаём только один экземпляр агента для каждого прокси и в будущем переиспользуем его.
        let agents = httpAgents.get(id);

        if (agents === undefined) {
            const agent = new socksProxyAgent({
                hostname: config.proxy.host,
                port: config.proxy.port,
                userId: config.proxy.username,
                password: config.proxy.password,
                timeout: 30000
            });

            agents = {
                httpAgent: agent,
                httpsAgent: agent
            }

            httpAgents.set(id, agents);
        }

        delete config.proxy;

        config.httpAgent = agents.httpAgent;
        config.httpsAgent = agents.httpsAgent;

        const response = await axios.request(config);
        ParentPort.postMessage({
            id: message.id,
            data: {
                status: response.status,
                statusText: response.statusText,
                headers: response.headers,
                data: response.data
            }
        });
    } catch (error) {
        ParentPort.postMessage({id: message.id, data: error.toString()});
    }
});


P.s запросы вообще перестают выполнятся спустя около 100-150 тысяч запросов, при этом если замедлить работу то всё вроде как приходит в норму (но вряд ли надолго). Есть ощущение что не хватает ресурсов, или он не может их по каким либо причинам использовать, при этом на самом хосте их более чем достаточно.
  • Вопрос задан
  • 154 просмотра
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы