#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <boost/thread/condition_variable.hpp>
#include <boost/thread.hpp>
#include <future> // I don't how to work with boost future
#include <queue>
#include <vector>
#include <functional>
class ThreadPool
{
public:
using Task = std::function<void()>; // Our task
explicit ThreadPool(int num_threads)
{
start(num_threads);
}
~ThreadPool()
{
stop();
}
template<class T>
auto enqueue(T task)->std::future<decltype(task())>
{
// packaged_task wraps any Callable target
auto wrapper = std::make_shared<std::packaged_task<decltype(task()) ()>>(std::move(task));
{
boost::unique_lock<boost::mutex> lock{ mutex_p };
tasks_p.emplace([=] {
(*wrapper)();
});
}
event_p.notify_one();
return wrapper->get_future();
}
//void enqueue(Task task)
//{
// {
// boost::unique_lock<boost::mutex> lock { mutex_p };
// tasks_p.emplace(std::move(task));
// event_p.notify_one();
// }
//}
private:
std::vector<boost::thread> threads_p; // num of threads
std::queue<Task> tasks_p; // Tasks to make
boost::condition_variable event_p;
boost::mutex mutex_p;
bool isStop = false;
void start(int num_threads)
{
for (int i = 0; i < num_threads; ++i)
{
// Add to the end our thread
threads_p.emplace_back([=] {
while (true)
{
// Task to do
Task task;
{
boost::unique_lock<boost::mutex> lock(mutex_p);
event_p.wait(lock, [=] { return isStop || !tasks_p.empty(); });
// If we make all tasks
if (isStop && tasks_p.empty())
break;
// Take new task from queue
task = std::move(tasks_p.front());
tasks_p.pop();
}
// Execute our task
task();
}
});
}
}
void stop() noexcept
{
{
boost::unique_lock<boost::mutex> lock(mutex_p);
isStop = true;
event_p.notify_all();
}
for (auto& thread : threads_p)
{
thread.join();
}
}
};
#endif
#include "ThreadPool.h"
#include <iostream>
#include <iomanip>
#include <Windows.h>
#include <vector>
#include <map>
#include <boost/filesystem.hpp>
#include <boost/thread.hpp>
namespace bfs = boost::filesystem;
int count_words(const std::string& filename)
{
int counter = 0;
std::ifstream file(filename);
std::string buffer;
while (file >> buffer)
{
++counter;
}
return counter;
}
int main(int argc, const char* argv[])
{
bfs::path path = argv[1];
// If this path is exist and if this is dir
if (bfs::exists(path) && bfs::is_directory(path))
{
// Number of threads. Default = 4
int n = (argc == 3 ? atoi(argv[2]) : 4);
ThreadPool pool(n);
// Container to store all filenames and number of words inside them
std::map<bfs::path, int> all_files_and_sums;
// Iterate all files in dir
for (auto& p : bfs::directory_iterator(path)) {
// Takes only .txt files
if (p.path().extension() == ".log") {
// Future for taking value from here
auto fut = pool.enqueue([&p, &all_files_and_sums]() {
// In this lambda function I count all words in file and return this value
int result = count_words(p.path().string());
std::cout << "TID " << GetCurrentThreadId() << "\n";
return result;
});
// "filename = words in this .txt file"
all_files_and_sums[p.path()] = fut.get();
}
}
int result = 0;
for (auto& k : all_files_and_sums)
{
std::cout << k.first << "- " << k.second << "\n";
result += k.second;
}
std::cout << "Result: " << result << "\n";
}
else
std::perror("Dir is not exist");
}
[&p, &all_files_and_sums]
all_files_and_sums
по ссылке я не спорю, то точно ли p
продолжит существовать после выхода из итерации? Давай подумаем. А после выхода из цикла продолжит?bfs::directory_iterator
адрес возвращаемого им bfs::directory_entry
изменится. Я бы предпочел захватывать копию bfs::directory_entry
в лямбде.std::future::get
[?]. По факту в этот момент у тебя блокируется главный поток до момента обработки запланированной задачи. У тебя всегда планируется только одна задача. А видимость конкурентной работы создается лишь потому что какой поток ее схватил, тот и работает. Видимо задачи у тебя быстро обрабатываются, раз ты глазами этого не увидел.std::future
от задач. В момент планирования их результат еще не определен и get
вызывать не надо. Надо дождаться завершения работы всех потоков в пуле и исчерпания всех задач. Для этого у тебя в пуле должны быть продуманы механизмы оповещения.std::future::get
, получать результаты и производить свои операции над ними.std::future
результата. Это тоже можно сделать. Просто сделать это надо своими руками и продумав масштабируемость такого механизма.std::future
и лямбдами, ты пишешь в рамках стандарта C++11. Тебе доступны и std::thread
, и все примитивы барьерирования из std
.boost::filesystem
не будет нужна, т.к. станет доступна std::filesystem
- Filesystem Library.