Задать вопрос
@Elnurhan

Как получать значения из потоков без использования future?

Приветствую!
Я написал программу, которая считает суммарное количество слов в .log файлах в указанной директории в многопоточном режиме.
Первым аргументом в командной строке даётся путь к директории, в которой нужно искать .log файлы и считать слова в них.
Вторым аргументом даётся кол-во потоков.
Я написал следующий код для решения этой задачи

ThreadPool.h
#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <boost/thread/condition_variable.hpp>
#include <boost/thread.hpp>

#include <future> // I don't how to work with boost future
#include <queue>
#include <vector>
#include <functional>

class ThreadPool
{
public:
    using Task = std::function<void()>; // Our task

    explicit ThreadPool(int num_threads)
    {
        start(num_threads);
    }

    ~ThreadPool()
    {
        stop();
    }

    template<class T>
    auto enqueue(T task)->std::future<decltype(task())>
    {
        // packaged_task wraps any Callable target
        auto wrapper = std::make_shared<std::packaged_task<decltype(task()) ()>>(std::move(task));

        {
            boost::unique_lock<boost::mutex> lock{ mutex_p };
            tasks_p.emplace([=] {
                (*wrapper)();
            });
        }

        event_p.notify_one();
        return wrapper->get_future();
    }

    //void enqueue(Task task)
    //{
    //  {
    //      boost::unique_lock<boost::mutex> lock { mutex_p };
    //      tasks_p.emplace(std::move(task));
    //      event_p.notify_one();
    //  }
    //}

private:
    std::vector<boost::thread> threads_p; // num of threads
    std::queue<Task>           tasks_p;   // Tasks to make
    boost::condition_variable  event_p; 
    boost::mutex               mutex_p;

    bool                       isStop = false;

    void start(int num_threads)
    {
        for (int i = 0; i < num_threads; ++i)
        {
            // Add to the end our thread
            threads_p.emplace_back([=] {
                while (true)
                {
                    // Task to do
                    Task task;

                    {
                        boost::unique_lock<boost::mutex> lock(mutex_p);

                        event_p.wait(lock, [=] { return isStop || !tasks_p.empty(); });

                        // If we make all tasks
                        if (isStop && tasks_p.empty())
                            break;

                        // Take new task from queue
                        task = std::move(tasks_p.front());
                        tasks_p.pop();
                    }

                    // Execute our task
                    task();
                }
            });
        }
    }

    void stop() noexcept
    {
        {
            boost::unique_lock<boost::mutex> lock(mutex_p);
            isStop = true;
            event_p.notify_all();
        }

        for (auto& thread : threads_p)
        {
            thread.join();
        }
    }
};

#endif


main.cpp
#include "ThreadPool.h"

#include <iostream>
#include <iomanip>
#include <Windows.h>

#include <vector>
#include <map>

#include <boost/filesystem.hpp>
#include <boost/thread.hpp>


namespace bfs = boost::filesystem;

int count_words(const std::string& filename)
{
    int counter = 0;
    std::ifstream file(filename);
    std::string buffer;
    while (file >> buffer)
    {
        ++counter;
    }
    
    return counter;
}

int main(int argc, const char* argv[])
{
    bfs::path path = argv[1];
    // If this path is exist and if this is dir
    if (bfs::exists(path) && bfs::is_directory(path))
    {
        // Number of threads. Default = 4
        int n = (argc == 3 ? atoi(argv[2]) : 4);
        ThreadPool pool(n);

        // Container to store all filenames and number of words inside them
        std::map<bfs::path, int> all_files_and_sums;
        
        // Iterate all files in dir
        for (auto& p : bfs::directory_iterator(path)) {
            // Takes only .txt files
            if (p.path().extension() == ".log") {
                // Future for taking value from here
                auto fut = pool.enqueue([&p, &all_files_and_sums]() {
                    // In this lambda function I count all words in file and return this value
                    int result = count_words(p.path().string());
                    std::cout << "TID " << GetCurrentThreadId() << "\n";
                    return result;
                });
                // "filename = words in this .txt file"
                all_files_and_sums[p.path()] = fut.get();
            }
        }

        int result = 0;

        for (auto& k : all_files_and_sums)
        {
            std::cout << k.first << "- " << k.second << "\n";
            result += k.second;
        }

        std::cout << "Result: " << result << "\n";
    }
    else
        std::perror("Dir is not exist");
}


Данное решение работает корректно. Но если в директории много .log файлов - программа работает медленно, а некоторые потоки (при большом кол-ве потоков) просто существуют и не делают ничего.
Я думаю, что проблема в future'ах. Как можно доставать значения из потоков без future?
  • Вопрос задан
  • 206 просмотров
Подписаться 1 Средний 4 комментария
Решения вопроса 1
@MarkusD Куратор тега C++
все время мелю чепуху :)
Если почитать твой код внимательно, то становится видно немало проблем.

[&p, &all_files_and_sums]
Если с захватом all_files_and_sums по ссылке я не спорю, то точно ли p продолжит существовать после выхода из итерации? Давай подумаем. А после выхода из цикла продолжит?
У меня вообще нет уверенности в том, что после смещения bfs::directory_iterator адрес возвращаемого им bfs::directory_entry изменится. Я бы предпочел захватывать копию bfs::directory_entry в лямбде.

У тебя нет ожидания окончания работы от пула потоков - нет синхронизации с завершением запланированных задач. Точнее... ну как нет... У тебя жесткая синхронизация через std::future::get[?]. По факту в этот момент у тебя блокируется главный поток до момента обработки запланированной задачи. У тебя всегда планируется только одна задача. А видимость конкурентной работы создается лишь потому что какой поток ее схватил, тот и работает. Видимо задачи у тебя быстро обрабатываются, раз ты глазами этого не увидел.

Тебе стоит сохранять сами std::future от задач. В момент планирования их результат еще не определен и get вызывать не надо. Надо дождаться завершения работы всех потоков в пуле и исчерпания всех задач. Для этого у тебя в пуле должны быть продуманы механизмы оповещения.
После обработки всех задач ты можешь вызывать std::future::get, получать результаты и производить свои операции над ними.
Альтернативно, ты можешь более тонко реагировать на завершение каждой задачи и появление в ее std::future результата. Это тоже можно сделать. Просто сделать это надо своими руками и продумав масштабируемость такого механизма.

И в дополнение. Зачем тебе boost? Ты пользуешься std::future и лямбдами, ты пишешь в рамках стандарта C++11. Тебе доступны и std::thread, и все примитивы барьерирования из std.
В твоем распоряжении вся Thread Support Library. А boost тут явно лишний.
Если переключишься на C++17, то тебе и boost::filesystem не будет нужна, т.к. станет доступна std::filesystem - Filesystem Library.
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы