@Blunker

Как правильно распараллелить?

Имеется такая задача.
Конвейер (pipeline).
На вход через случайные промежутки времени t < 1с подаются текстовые файлы очень большого размера и формируют очередь на обслуживание. Необходимо выделить из каждого файла все слова, удалить знаки препинания, привести к нижнему регистру, выполнить подсчёт статистики встречаемости слов, отсортировать слова по алфавиту и вывести результаты в файлы. Информировать пользователя о ходе выполнения задания. После выполнения показать краткую статистику на экране (10 самых встречаемых слов по каждому файлу).

Вот что получилось:
#include <iostream>
#include <string>
#include <fstream>
#include <sstream>
#include <vector>
#include <deque>
#include <map>
#include <thread>
#include <mutex>
#include <ctime>
#include <unistd.h>
#include <algorithm>
#include <signal.h>
#include <chrono>
#include <condition_variable>

typedef struct stage_tag
{
    std::unique_lock<std::mutex> mutex;
    std::condition_variable avail;
    std::condition_variable ready;

    bool data_ready;
    std::string data;
    std::vector<std::thread> thread;
    stage_tag *next;
} stage_t;

typedef struct pipe_tag
{
    std::mutex mutex;

    stage_t *head;
    stage_t *tail;

    int stages;
    int active;
} pipe_t;

int pipe_create(pipe_t*, int);
void pipe_stage(stage_t*);
int pipe_send(stage_t*, std::string);

int main(int argc, char **argv)
{

}

int pipe_create(pipe_t *pipe, int stages)
{
    int pipe_index;
    stage_t **link = &pipe->head, *new_stage = new stage_t, *stage;

    pipe->stages = stages;
    pipe->active = 0;

    for (pipe_index = 0; pipe_index <= stages; pipe_index++)
    {
        new_stage = new stage_t;
        new_stage->data_ready = 0;
        *link = new_stage;
        link = &new_stage->next;
    }

    *link = nullptr;
    pipe->tail = new_stage;

    for (stage = pipe->head; stage->next != nullptr; stage = stage->next)
    {
        stage->thread.push_back(std::thread(pipe_stage, stage));
    }

    return 0;
}

void pipe_stage(stage_t *stage)
{
    stage_t *next_stage = stage->next;

    stage->mutex.lock();

    for (; ;)
    {
        while (stage->data_ready != 1)
            stage->avail.wait(stage->mutex);

        pipe_send(next_stage, stage->data);

        stage->data_ready = 0;
        stage->ready.notify_all();
    }
}

int pipe_send(stage_t *stage, std::string data)
{
    stage->mutex.lock();

    while (stage->data_ready)
        stage->ready.wait(stage->mutex);

    stage->data = data;
    stage->data_ready = 1;

    stage->mutex.unlock();

    return 0;
}


Дальше ничего придумать не могу.
  • Вопрос задан
  • 375 просмотров
Пригласить эксперта
Ответы на вопрос 2
gbg
@gbg Куратор тега C++
Любые ответы на любые вопросы
Сначала решите внутреннюю задачу - про обработку текста и сбор статистики. А потом повесьте на нее конвеер.
Ответ написан
@vilgeforce
Раздолбай и программист
Создаете нужное число потоков. Каждый поток обращается к очереди заданий, выбирает оттуда свое (синхронизация потоков тут!) и выполняет его. Как выполнил - делает что там еще нужно и возвращается в начало описанного для треда проецесса. Все. Дополнительно можно прикрутить проверку аварийного завершения тредов и их восстановления.
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы