Исходный код:
// Header.h
#pragma once
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <chrono>
#include <format>
#include <vector>
#include "Types.h"
#define IA_NEW( type ) new( type )
// Types.h
#pragma once
#include <cstdint>
#include <memory>
#include <atomic>
#include <array>
#define USE_TYPE( type ) using IA::type
namespace IA
{
using Uint8 = std::uint8_t;
using Int8 = std::int8_t;
using Uint16 = std::uint16_t;
using Int16 = std::int16_t;
using Uint32 = std::uint32_t;
using Int32 = std::int32_t;
using Uint64 = std::uint64_t;
using Int64 = std::int64_t;
using Bool = bool;
using MemPtr = void*;
using MemDiff = std::ptrdiff_t;
using Void = void;
template< class T > using TAtomic = std::atomic< T >;
template< class T > using UniquePtr = std::unique_ptr< T >;
template< class T > using SharedPtr = std::shared_ptr< T >;
template< class T, std::size_t N > using Array = std::array< T, N >;
template< class T, class... Args > UniquePtr< T > CreateUniquePtr( Args&&... args )
{
return std::make_unique< T >( std::forward< Args >( args )... );
}
template< class T, class... Args > UniquePtr< T > CreateSharedPtr( Args&&... args )
{
return std::make_shared< T >( std::forward< Args >( args )... );
}
}
USE_TYPE( Uint8 );
USE_TYPE( Int8 );
USE_TYPE( Uint16 );
USE_TYPE( Int16 );
USE_TYPE( Uint32 );
USE_TYPE( Int32 );
USE_TYPE( Uint64 );
USE_TYPE( Int64 );
USE_TYPE( Bool );
USE_TYPE( Void );
USE_TYPE( MemPtr );
USE_TYPE( MemDiff );
// Main.cpp
#include "Header.h"
#include <map>
#include <string>
#include <type_traits>
#include <functional>
class WorkerThread;
std::vector< WorkerThread > GThreadPool;
std::mutex CommandLine;
enum WorkerState : Uint8
{
eThreadUndefined = 0,
eThreadSuspended,
eThreadProcessing,
eThreadError,
eThreadTerminated,
eThreadSuccess,
};
std::string Map( WorkerState state )
{
static std::map< WorkerState, std::string > StateToStringTable =
{
{ eThreadUndefined, "Undefined" },
{ eThreadSuspended, "Suspended" },
{ eThreadProcessing, "Processing" },
{ eThreadError, "Error" },
{ eThreadTerminated, "Terminated" },
{ eThreadSuccess, "Success" },
};
return StateToStringTable[ state ];
}
struct WorkerThreadInfo
{
WorkerThreadInfo(WorkerState state, Uint32 id) : State(state), Id(id) { }
WorkerState State;
Uint32 Id;
};
class WorkerThread
{
public:
WorkerThread()
: m_thread( nullptr )
, m_state( eThreadUndefined )
, m_id( 0 )
{
}
WorkerThread( IA::UniquePtr< std::thread >&& thread, WorkerState state, Uint32 id )
: m_state( state )
, m_id( id )
, m_thread( std::move( thread ) )
{
}
~WorkerThread()
{
Terminate();
}
WorkerThread( const WorkerThread& ) = delete;
WorkerThread& operator = ( const WorkerThread& ) = delete;
WorkerThread( WorkerThread&& another ) noexcept
{
m_id = another.m_id;
m_thread = std::move( another.m_thread );
m_state = another.m_state;
m_isFinished.exchange( another.m_isFinished );
m_isRunning.exchange( another.m_isRunning );
}
WorkerThreadInfo GetInfo() const { return { m_state, m_id }; }
Void SetState( WorkerState state ) { m_state = state; }
WorkerState GetState() const { return m_state; }
Uint32 GetId() const { return m_id; }
Void SetId( Uint32 id ) { m_id = id; }
Bool IsRunning() const { return m_isRunning; }
Bool IsCompleted() const { return m_isFinished; }
Bool IsValid() const { return m_thread != nullptr; }
template< class TFunc, class... TArgs >
Void Run( TFunc&& func, TArgs&&... args )
{
m_thread = IA::CreateUniquePtr< std::thread >
(
[this]( TFunc&& f, TArgs&&... a )
{
this->m_state = eThreadProcessing;
std::unique_lock lock( CommandLine, std::defer_lock );
lock.lock();
std::cout << "Thread with id - " << this->m_id << " changed its state to " << Map(this->m_state) << '\n';
std::cout << "This - " << this << '\n';
lock.unlock();
this->m_isRunning = true;
f( std::forward< TArgs >( a )... );
this->m_state = eThreadSuccess;
lock.lock();
std::cout << "Thread with id - " << this->m_id << " changed its state to " << Map( this->m_state ) << '\n';
std::cout << "This - " << this << '\n';
lock.unlock();
this->m_isRunning = false;
this->m_isFinished = true;
},
std::forward< TFunc >( func ),
std::forward< TArgs >( args )...
);
}
Void Join() { m_thread->join(); }
Bool TryJoin() { if ( IsValid() && m_thread->joinable() ) { m_thread->join(); return true; } else return false; }
Void Terminate() { m_thread = nullptr; m_state = eThreadTerminated; m_isRunning = false; m_isFinished = true; }
const std::thread& GetThreadInternal() const { return *m_thread; }
const std::thread* const GetThreadInternalPtr() const { return m_thread.get(); }
private:
IA::UniquePtr< std::thread > m_thread;
WorkerState m_state;
Uint32 m_id;
IA::TAtomic< Bool > m_isRunning{ false };
IA::TAtomic< Bool > m_isFinished{ false };
};
WorkerThreadInfo* GetThreadInfo( Uint32 id )
{
auto it = std::find_if( GThreadPool.begin(), GThreadPool.end(), [ id ]( const WorkerThread& worker ) { return worker.GetId() == id; } );
return ( it == GThreadPool.end() ) ? nullptr : IA_NEW( WorkerThreadInfo ) { it->GetInfo() } ;
}
template< class TFunc, class ...TArgs > //requires std::is_invocable_v< TFunc, TArgs... >
Void RunThread( Uint32 id, TFunc&& func, TArgs&&... args )
{
GThreadPool.emplace_back();
WorkerThread& worker = GThreadPool.back();
worker.SetId( id );
worker.Run( std::forward< TFunc>( func ), std::forward< TArgs >( args )... );
}
Void FinishJobs()
{
for ( auto& worker : GThreadPool )
{
worker.TryJoin();
}
std::cout << "Jobs are finished.\n";
}
Void WriteToConsole( Uint32 what )
{
std::unique_lock lock( CommandLine );
std::this_thread::sleep_for( std::chrono::milliseconds( 500 ) );
std::cout << "Writing to console - " << what << '\n';
}
Void PrintThreadInfo( Uint32 id )
{
WorkerThreadInfo* info = GetThreadInfo( id );
if ( !info )
{
std::lock_guard lock( CommandLine );
std::cout << "Thread with id - " << id << " does is not found.\n";
}
else
{
std::lock_guard lock( CommandLine );
std::cout << "Id - " << info->Id << ", state - " << Map( info->State ) << '\n';
}
}
Int32 main()
{
std::cout << "Hello everyone!\n";
RunThread( 1, std::function( WriteToConsole ), 10 );
RunThread( 2, std::function( WriteToConsole ), 20 );
RunThread( 3, std::function( WriteToConsole ), 30 );
PrintThreadInfo( 1 );
PrintThreadInfo( 2 );
PrintThreadInfo( 3 );
FinishJobs();
PrintThreadInfo( 1 );
PrintThreadInfo( 2 );
PrintThreadInfo( 3 );
for ( const auto& worker : GThreadPool )
{
std::cout << "WorkerID - " << worker.GetId() << ", object address - " << &worker << '\n';
}
return 0;
}
Вывод программы на одном из запусков:
Hello everyone!
Id - 1, state - Undefined
Id - 2, state - Undefined
Id - 3, state - Undefined
Thread with id - 3722304989 changed its state to Processing
This - 0000020717724620
Writing to console - 10
Thread with id - 3722304989 changed its state to Success
This - 0000020717724620
Thread with id - 3722304989 changed its state to Processing
This - 0000020717728488
Writing to console - 20
Thread with id - 3722304989 changed its state to Success
This - 0000020717728488
Thread with id - 3 changed its state to Processing
This - 00000207177160A0
Writing to console - 30
Thread with id - 3 changed its state to Success
This - 00000207177160A0
Jobs are finished.
Id - 1, state - Undefined
Id - 2, state - Undefined
Id - 3, state - Success
WorkerID - 1, object address - 0000020717716070
WorkerID - 2, object address - 0000020717716088
WorkerID - 3, object address - 00000207177160A0
Описание работы:
Создаются 3 потока ( инкапсулируются с помощью WorkerThread ), каждый из которых выполняет определенную функцию с переданными аргументами. Каждый поток имеет свой ID (не системный) и WorkerStatus, которые должны меняться в процессе исполнения.
Id устанавливается
до начала запуска потока. lambda функция должна изменять private-члены объекта WorkerThread, ссылка на который передается с помощью lambda-closure. Однако в процессе работы программы она либо крашится ( Heap corruction ), либо представляет результат выше.
lambda захватывает правильно только последний поток.
Нет идей, как это исправить.
Возможно, у меня неверное понимание принципов работы lambda функций.
Подскажите, в чем причина такого поведения программы.
Проект - Visual Studio 2019 - Cmake project, C++20
Заранее спасибо.