Задать вопрос
syxme
@syxme

EventLoop, потоки и блокировки, как правильно блокировать?

Есть класс сообщений eventloop'а
MessageQueue .hpp
class MessageQueue {
		private:
			using Runnable = std::function<void()>;
			std::vector<Runnable> pool;
			std::mutex m_mutex;
		public:
	        std::string tag = "";
			MessageQueue* next = nullptr;
			void postEnd(Runnable run);
			void nextTick();
			bool empty();
			MessageQueue* getNextFrameMessageQueue();
			MessageQueue();
			~MessageQueue();
	};
 MessageQueue::MessageQueue()
    {
    }
    

    MessageQueue::~MessageQueue()
    {
        
    }

    void MessageQueue::postEnd(Runnable run) 
    {
       m_mutex.lock();
        pool.push_back(run);
        m_mutex.unlock();
    }
    
    void MessageQueue::nextTick() 
    {

        m_mutex.lock();//TODO LOCK

        for (size_t i = 0; i < pool.size(); i++){
            pool.at(i)();
        }
        pool.clear();
        m_mutex.unlock();
    }
    
    bool MessageQueue::empty() 
    {
        return pool.empty();
    }
    
    MessageQueue* MessageQueue::getNextFrameMessageQueue() 
    {
        return next;
    }


Инициализация в главном потоке:
MessageQueue *message = new MessageQueue();
	message->tag = "MQ-11";
	message->next = new MessageQueue();
	message->next->tag = "MQ-22";
	message->next->next = message;
	nextFrame = message;


Цикл главного потока:
while (mWindow->isRunningLiveCycle()||!glfwWindowShouldClose(glWindow)) {

				mCurrentEvent = IS_WINDOWS_EVENTS;
				isFromWait = false;
				if (nextFrame->empty()) {
					glfwWaitEvents();
					isFromWait = true;
				}

				g_mutex.lock();
				currentFrame = nextFrame;
				nextFrame = nextFrame->next;
				g_mutex.unlock();

				if (!isFromWait){
					glfwPollEvents();
				}

				mCurrentEvent = IS_HANDLER_EVENTS;


				if (!messageQueueTimeout->empty()) {
					messageQueueTimeout->nextTick();
				}

				if (!currentFrame->empty()) {
					currentFrame->nextTick();
				}
				mWindow->layoutChanges();

				if (mWindow->isInvalidated()) {
					mWindow->renderFrame();
					mWindow->setNotIvalidated();
				}else{

				}
			}



Мой топорный ExecutorService

class ExecutorService  
	{
		
		private:
			int mThreadCount = 1;
			int mLimit = 99999;
			// vector<unique_ptr<condition_variable>> cv_vec;
			std::vector<Runnable*> tasks;
			std::vector<std::thread> th_vec;
			std::mutex m_mutex;
			std::condition_variable m_cv;
			std::mutex g_mutex;
			void threadCycle(int threadIndex);
			Runnable* getThreadTask( );
			bool interrupt = false;
		public:
			ExecutorService();
			ExecutorService(int threadCount):mThreadCount(threadCount) {
				if (mThreadCount <= 0) {
					mThreadCount = 1;
				}
				for (size_t i = 0; i < mThreadCount; i++) {
					// cv_vec.push_back(make_unique<condition_variable>());
					th_vec.push_back(std::thread(&ExecutorService::threadCycle, this, i));
				}
			}
			void setLimit(int limit){
				mLimit = limit;
			};
			void submit(Runnable* rnbl);
			~ExecutorService();

	};
void ExecutorService::threadCycle(int threadIndex) {
        std::unique_lock<std::mutex> lk(m_mutex);
        while (!interrupt){
            Runnable* response = getThreadTask();
            lk.unlock();
            if (response){
                response->run();
                delete response;
            }
             lk.lock();
            if (tasks.empty()){
             
                m_cv.wait(lk);
            }
        }
        
    }
    
    Runnable* ExecutorService::getThreadTask() {
        if (!tasks.empty()){
            g_mutex.lock();
            Runnable* response = tasks.at(0);
            tasks.erase(tasks.begin());
            g_mutex.unlock();
            return response;
        }
        return nullptr;
    }
    void ExecutorService::submit(Runnable* rnbl) {
        g_mutex.lock();
        if (tasks.size()>mLimit){
			tasks.erase(tasks.begin());
        }
        tasks.push_back(rnbl);
        g_mutex.unlock();
        // std::unique_lock<std::mutex> lck(m_mutex);
        m_cv.notify_all();
    
       
    }
    
  

    ExecutorService::ExecutorService() 
    {
        
    }
    
 
    ExecutorService::~ExecutorService() 
    {
		interrupt = true;
		m_cv.notify_all();
    }



ImageLoader

class ImageLoader {


protected:
	Bitmap* getBitmap(unsigned long id,string& url);
private:
	struct PhotoToLoad{
		unsigned long id;
		std::string url;
		ImageView* imageView;
	};
	Handler ui;
	ImageMemoryCache memoryCache;
	ExecutorService executorService = ExecutorService(4);
	std::map<ImageView*,unsigned long> imageViews;
	std::mutex mutex;
	Context* ctx;

	bool imageViewReused(PhotoToLoad& photoToLoad);


	class PhotosLoader:public Runnable{
	private:
		ImageLoader* parent;
		PhotoToLoad photoToLoad;
	public:
		PhotosLoader(ImageLoader* parent,PhotoToLoad photoToLoad):parent(parent),photoToLoad(std::move(photoToLoad)){

		};
		void run() override{
			if(parent->imageViewReused(photoToLoad)){
				return;
			}

			Bitmap* bmp = parent->getBitmap(photoToLoad.id, photoToLoad.url);
			if(parent->imageViewReused(photoToLoad)){
				delete bmp;
				return;
			}

			if (bmp) {
				parent->ui.post([img = photoToLoad,bmp,prnt = parent]() mutable {
					if (!prnt->imageViewReused(img)){
						img.imageView->setImageBitmap(bmp);
						prnt->memoryCache.put(img.id,bmp);
					}else{
					    delete bmp;
					}

				});

			}


		};
	};



public:
	ImageLoader(Context* context):ctx(context){

	};
	void DisplayImage(Drawable* defaultDrawable,unsigned long  id, string& url,ImageView* imageView);
	void DisplayImage(Drawable* defaultDrawable,string& id, string& url,ImageView* imageView);
};


void ImageLoader::DisplayImage(Drawable *defaultDrawable,string& id, string& url, ImageView *imageView) {
    DisplayImage(defaultDrawable, crc32(id),url,imageView);
}
void ImageLoader::DisplayImage(Drawable *defaultDrawable,unsigned long id, string& url, ImageView *imageView) {
//    memoryCache.clear();
	mutex.lock();
	imageViews[imageView] = id;
	mutex.unlock();
	if (url.empty()){
		imageView->setImageDrawable(defaultDrawable);
		return;
	}


//	memoryCache.clear();
	Bitmap* bitmap  = memoryCache.get(id);
	if (bitmap){
		imageView->setImageBitmap(bitmap);
	}else{

		imageView->setImageDrawable(defaultDrawable);
		executorService.submit(new PhotosLoader(this, {id,url,imageView}));
	}


}

bool ImageLoader::imageViewReused(PhotoToLoad& photoToLoad) {

	std::lock_guard<std::mutex> guard(mutex);
	if (imageViews.find(photoToLoad.imageView) == imageViews.end()){
		return true;
	}
	unsigned long id = imageViews[photoToLoad.imageView];
	if (id != photoToLoad.id)
		return true;
	return false;
}



Bitmap *ImageLoader::getBitmap(unsigned long id, string& url) {


	char* btmData = nullptr;
	int btmSize = 0;

	httpRequest().loading(&btmData,btmSize,url,"GET",Headers());
	if (btmSize!=0) {
		Bitmap *btm = BitmapFactory::decodeFileMemory(btmData, btmSize);
		//delete[] btmData;
		return btm;
	}else{
		return nullptr;
	}
}


ImageMemoryCache

class ImageMemoryCache {
private:
	map<unsigned long,unique_ptr<Bitmap>> cache;
	long size = 0;//current allocated size
	shared_timed_mutex mutex_;
	long limit = 4000000;//max memory in bytes
	long crearn = 2000000;//max memory in bytes

public:
	Bitmap* get(unsigned long id){
	    std::shared_lock<std::shared_timed_mutex> lock(mutex_);
		if (cache.find(id) == cache.end() ){
			return nullptr;
		}
//		printf("-g:%lu\n",id);
		return cache[id].get();
	}
	void put(unsigned long id,Bitmap* bitmap){
//	    printf("-p:%lu\n",id);
        std::unique_lock<std::shared_timed_mutex> lock(mutex_);
		size+=bitmap->bufferSize;
		std::unique_ptr<Bitmap> uPtr{bitmap};
		cache[id] = std::move(uPtr);
	};
	void clear(){
	    std::unique_lock<std::shared_timed_mutex> lock(mutex_);
	    if (size>limit){
	        printf("CLEAR_CHACHE\n");

	        auto it = cache.begin();
	        while (it != cache.end()) {
	            if (size<crearn){
	                break;
	            }

	            if (!it->second->isUsed()){
	                size-=it->second->bufferSize;
//	                printf("-r:%lu\n",it->first);
	                it = cache.erase(it);
	            }else{
	                int sizeres= it->second->refCount;
	               // printf("refcount %i\n",it->second->refCount);
	                it++;
	            }

	        }
	        printf("CLEAR_END\n");
	    }

	}
};



Моя цель чистить ImageMemoryCache методом clear()
Но у меня постоянно какие-то траблы с блокировками, запутался я уже в этих мутексах, всё работает идиально, только если не очищать память. Буду рад если поможите, подскажите где лишние блокировки, например ExecutorService писал наугад, но вроде как работает стабильно, а возможно и нет ))
Спасибо)
  • Вопрос задан
  • 72 просмотра
Подписаться 1 Сложный Комментировать
Решения вопроса 1
@res2001
Developer, ex-admin
Что касается ImageMemoryCache: в методе get вы возвращаете ссылку на элемент мапы. Блокировка с мапы снимается при выходе из get, но вызывающий код имеет ссылку и может с этим объектом делать все что угодно.
Параллельно вы можете вызвать clear, который удалит элемент на который осталась ссылка в другом потоке.

По уму вы должны блокировать мапу до тех пор пока жива любая ссылка на содержимое мапы.
Второй вариант - создавать в get копии объектов из мапы и возвращать их. При этом изменение этого нового объекта никак не повлияет на объект в мапе. Ну тут можно что-нибудь придумать, сделать какой-то прокси объект и т.п.
Третий вариант - каждый объект в мапе защищать своим мьютексом.
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы