Задать вопрос
Serhioromano
@Serhioromano
Web Developer

Какое решение архитектурное будет лучшим для запуска процессов на Яваскрипт?

Я пишу АПИ на яваскрит и `bun`. Моя задача - это приложение которое будет опрашивать разные устройства, например раз в секунду. Пользователь заходит на приложение, создает подключение к устройству, и запекает его. Начинается опрос. Код который опрашивает просто сохраняет результаты в Redis базе денных. Другая точка доступа АПИ эти данные читает и выдает пользователю если это нужно. Опрос продолжается пока пользователь не его не остановит.

Мне нужно хорошее решение в точке АПИ где пользователь запускает процесс, и останавливает. Получается мне нужно запустить процесс и его хендлер сохранить. В другой точке доступа этим управлять типа, остановить, перегрузить, ...

Получается мне нужно запустить независимый процесс. Я посмотрел на Workers в Bun но мне показалось это не совсем то что нужно. Какие есть технические решения для запуска этой задачи именно на яваскрипт.

Ну например у меня устройство MQTT. и запускает опрос устройства кнопкой старт на сайте. Мой АПИ должен запустить подписаться на топики и венуть код HTTP200 при этом сохранив ссылку на этот процесс по ID. Все процесс сидит себе и ждет сообщений, как получит, так в базу запишет. Если пользователь нажал кнопку остановить опрос, то я беру ссылку на этот процесс, и останавливаю его.

Моет текущее решение выглядит вот так

index.ts

import { Elysia, t } from "elysia";

const app = new Elysia()
const workers: Worker[] = [];

app.get("/w_start/:id", (ctx) => {
    if (workers[ctx.params.id] === undefined) {
        const worker = new Worker(new URL(`./worker1.ts`, import.meta.url));
        workers[ctx.params.id] = worker;
    }
    workers[ctx.params.id].postMessage({
        s: true,
        id: ctx.params.id
    })
    return { success: true }
}, {
    params: t.Object({
        id: t.Number()
    })
})

app.get("/w_stopall", (ctx) => {
    workers.forEach((w, index) => {
        w.postMessage({
            s: false,
            id: index,
        });
    });
    return { success: true }
})

app.get("/w_stop/:id", (ctx) => {
    if (workers[ctx.params.id] !== undefined) {
        workers[ctx.params.id].postMessage({
            s: false,
            id: ctx.params.id
        })
    }
    return { success: true }
}, {
    params: t.Object({
        id: t.Number()
    })
})

app.listen(3000);

console.log(
    `Workers ${app.server?.hostname}:${app.server?.port}`
);


worker1.ts

import { sleep } from "bun";

// prevents TS errors
declare var self: Worker;

const path = "./file.txt";
var start:boolean = false

self.onmessage = (event) => {
    console.log("From main:", event.data);
    let {s, id} = event.data
    start = s
    if(start) {
        endlessAsyncLoop(id);
        console.log('Loop has started and this message will display immediately.');
    }
};

async function endlessAsyncLoop(id: Number) {
    let counter = 0;
    while (start) {
        await Bun.sleep(1000)
        await Bun.write(path, `${counter++}`);
        console.log(`${id} - ${counter}`)
    }
}


Это все работает. Но я в самом начале пути, когда можно решить большинство проблем просто посоветовавшись с умными людьми. Это будет работать в долгосрочной перспективе?
  • Вопрос задан
  • 399 просмотров
Подписаться 1 Простой 21 комментарий
Помогут разобраться в теме Все курсы
  • Нетология
    Веб-разработчик с нуля: профессия с выбором специализации
    14 месяцев
    Далее
  • Академия Eduson
    Fullstack-разработчик на JavaScript
    11 месяцев
    Далее
  • Skillbox
    JavaScript
    3 месяца
    Далее
Решения вопроса 1
szQocks
@szQocks
Похоже вы пытаетесь сделать наблюдатель, вот простой псевдо пример того как это можно сделать через bullmq
в примере показан наблюдатель который желательно должен работать каждые 1-3 секунды, если ваш наблюдатель должен работать не так часто, лучше за место простых задач использовать отложенные и в коде просто убрать строчку Bun.sleep

import IORedis from 'ioredis'; // можно заюзать ioredis и простой redis, разница там небольшая, всё будет работать
import { Elysia, t } from "elysia";
import { Queue, Worker } from 'bullmq';

const connection = new IORedis('localhost:6379', { maxRetriesPerRequest: null });

const app = new Elysia()

const projectQueue = new Queue('project', {
    connection: {
      ...connection,
      enableOfflineQueue: false
    },
    defaultJobOptions: {
      attempts: 10,
      backoff: {
        type: 'fixed',
        delay: 2000,
        jitter: 0.5
      },
      removeOnComplete: true,
      removeOnFail: {
        age: 24 * 3600, // 24 hours
        count: 200 // max 200 fails
      }
    }
});

const projectWorker = new Worker('project', async ({ data }) => {

    const projectId = data.projectId;
    const delay = data.delay;
    // здесь делаем все свои манипуляции для отслеживания, отправляем запрос куда-либо или в базу данных и читаем, измеяем данные и т.д

    // если данные нужно отслеживать каждые пару секунд, 1-3 к примеру, то проще воспользоваться конструкцией которая написана ниже
    await Bun.sleep(delay);

    return { projectId, delay }
}, { connection, autorun: false, concurrency: 1000 });

projectWorker.on('completed', (job , { projectId, delay }) => {
    projectQueue.add(
        'project', 
        { projectId,  delay }, 
        { jobId: `${projectId}` }
    );
});

projectWorker.on('failed', (job, error, prev) => {
    console.log(error);
});

app.get("/w_start/:id", (ctx) => {

    const projectId = ctx.params.id;
    const delay = ctx.params.delay  // как часто нужно отслеживать, каждую например каждую секунду
    
    projectQueue.add(
        'project', 
        { projectId, delay }, 
        { jobId: `${projectId}` }
    );

    return { success: true }
}, {
    params: t.Object({
        id: t.Number()
    })
})

app.get("/w_stopall", async (ctx) => {
    
    const cancelled = await projectWorker.cancelAllJobs();
    console.log(cancelled);

    return { success: true }
})

app.get("/w_stop/:id", async (ctx) => {
    
    const cancelled = await projectWorker.cancelJob(`${ctx.params.id}`);
    console.log(cancelled)

    return { success: true }
}, {
    params: t.Object({
        id: t.Number()
    })
})

app.listen(3000, () => {
  projectWorker.run();
});


console.log(
    `Workers ${app.server?.hostname}:${app.server?.port}`
);


это будет работать даже если узлы масштабировать до нескольких
Ответ написан
Пригласить эксперта
Ответы на вопрос 1
@Everything_is_bad
как именно запуситть процесс так чтобы мой скрипт прекратил работу, а процесс нет
отдельные worker'ы на основе очереди задач

Если пользователь нажал кнопку остановить опрос, то я беру ссылку на этот процесс, и останавливаю его.
например простейшая реализация - флаги, допустим в том же редисе, процесс периодически его проверяет, пользователь нажал "остановить", изменяешь флаг, при следующей проверке, процесс видит его изменение и прекращает работу
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы