Задать вопрос
@Acaunt

Как сделать безопасную многопоточную очередь?

Написал свою реализацию многопоточной очереди основанную на паттерне односвязного списка.
у меня была мысль, что очередь можно разделить с блокировкой на два вида: то что работает с первым элементом и то что работает с последним элементом. И предполагал что можно разблокировать в нужных местах, но компилятор при запуске выдал исключение c текстом что разблокируется неиспользуемый мьютекс. После того как закомментировал все unlock() исключения не было

...
template<class Type>
class Queue {
private:
	// нода для создания односвязного списка
	struct Node {
		// умный указатель для контроля существования объекта
		std::shared_ptr<Type> object;

		// следующий узел
		std::unique_ptr<Node> next;
	};

public:
	// конструктор по умолчания
	Queue() = default;

	// конструктор перемещения
	Queue(Queue&& other) {
		*this = std::move(other)
	}

	// конструктор копирования пока удалён
	Queue(const Queue&) = delete;

public:
	// оператор перемещения
	Queue& operator=(Queue&& other) {
		// если слуайно передана эта же очередь выходим из функции
		if (this == &other) return *this;
		// закрываем мьюьтексы у обоих очередей
		std::scoped_lock(mtx_head, mtx_tail, other.mtx_head, other.mtx_tail);

		// перемещаем все элементы в другую очередь
		head = std::move(other.head);

		tail = other.tail;
		other.tail = nullptr;

		_size = other._size;
		other._size = 0;

		return *this;
	}

	// оператор копирования пока удалён
	Queue& operator=(const Queue&) = delete;

public:
	// метод для проверки на пустоту
	bool empty() {
		return _size == 0;
	}

	// метод для того чтобы узнать сколько имеется элементов в очереди
	size_t size() {
		return _size.load(std::memory_order_relaxed);
	}

public:
	// методы для получения первого и последнего элемента
	std::shared_ptr<Type> front() {
		// блокируем доступ всем методам которые могут изменить первый элемент
		std::lock_guard lock(mtx_head);

		return head ? head->object : nullptr;
	}

	const std::shared_ptr<Type> front() const {
		// блокируем доступ всем методам которые могут изменить первый элемент
		std::lock_guard lock(mtx_head);

		return head ? head->object : nullptr;
	}

	std::shared_ptr<Type> back() {
		// блокируем доступ всем методам которые могут изменить первый элемент
		std::lock_guard lock(mtx_tail);

		return tail ? tail->object : nullptr;
	}

	const std::shared_ptr<Type> back() const {
		// блокируем доступ всем методам которые могут изменить первый элемент
		std::lock_guard lock(mtx_tail);

		return tail ? tail->object : nullptr;
	}

public:
	// методы для добавления элементов
	std::shared_ptr<Type> push(Type&& object) {
		return add_element(std::move(object));
	}

	std::shared_ptr<Type> push(const Type& object) {
		return add_element(object);
	}

	template<class ... Args>
	std::shared_ptr<Type> emplace(Args&& ... args) {
		return add_element(std::forward<Args>(args) ...);
	}

private:
	// метод создания нового узла и объекта
	template<class ... Args>
	std::shared_ptr<Type> add_element(Args&& ... args) {
		// блокируем все методы которые могут изменить очередь
		std::scoped_lock lock(mtx_head, mtx_tail);

		// если элементов нет создаем новую ноду
		if (head == nullptr) {
			head = std::make_unique<Node>();

			tail = head.get();
		}
		//иначе мы проверяем не является ли первый и последний элемент одним элементом
		else switch (bool(head.get() != tail)) {
		case true:
			// тут я предпологал, что раз первый элемент и последний элемент не являются одним элементом,
			// то можно разблокировать доступ методам которые работают с первым элементом
			//mtx_head.unlock();

		case false:
			// создаем новую ноду
			tail->next = std::make_unique<Node>();

			tail = tail->next.get();
		}

		// создаем объект
		tail->object = std::make_shared<Type>(std::forward<Args>(args) ...);

		// увеличиваем счетчик элементов
		++_size;

		return tail->object;
	}

public:
	// метод для объединения одной очереди в конец другой
	void merge_back(Queue& other) {
		// если слуайно передана эта же очередь выходим из функции
		if (this == &other) return;

		// блокируем методы в этой и другой очереди
		std::scoped_lock lock(mtx_head, mtx_tail, other.mtx_head, other.mtx_tail);
		
		// если текущая очередь пустая то перемещаем другую очередь в эту
		if (head == nullptr) {
			head = std::move(other.head);
			tail = other.tail;
			other.tail = nullptr;
		}
		// проверяем что первый и последний элемент не являются одним элементом
		else switch (bool(head.get() != tail)) {
		case true:
			// тут предполагаю, что раз первый и последний элемент не являются одним элементом,
			// то можно разблокировать методы работающие с первым элементом
			//mtx_head.unlock();

		case false:
			// перемещаем все элементы другой очереди в конец другой
			tail->next = std::move(other.head);
			tail = other.tail;
			other.tail = nullptr;
		}

		// изменяем количество элементов
		_size += other._size;
		other._size = 0;
	}

	// Метод для объединения очередей в чередующем порядке
	void merge_interleave(Queue& other) {
		// если слуайно передана эта же очередь выходим из функции
		if (this == &other) return;

		// блокируем методы в этой и другой очереди
		std::scoped_lock lock(mtx_head, mtx_tail, other.mtx_head, other.mtx_tail);

		// если другая очередь пустая то выходим
		if (other._size == 0) return;
		// иначе если текущая очередь пустая то перемещаем все элементы другой очереди в текущую
		else if (_size == 0) {
			head = std::move(other.head);

			tail = other.tail;
			other.tail = nullptr;

			_size = other._size;
			other._size = 0;

			return;
		}

		// узел для слияния очередей
		std::unique_ptr<Node> front = std::make_unique<Node>();

		// последний элемент
		Node* back = front.get();

		// меняем количество элементов
		_size += other._size;

		// метод для перемещения объектов из узла в другой узел
		auto move = [&](std::unique_ptr<Node>& _front) {
			back->object = std::move(_front->object);
			_front = std::move(_front->next);
		};

		// метод для создания нового узла
		auto next = [&]() {
			back->next = std::make_unique<Node>();
			back = back->next.get();
		};

		// метод для слияния узлов
		auto interleave = [&]() {
			move(head);

			next();

			move(other.head);
		};

		// выполняем предворительное слияние
		if (head != nullptr && other.head != nullptr) interleave();

		// выполняем оставшиеся слияния
		while (head != nullptr && other.head != nullptr) {
			next();

			interleave();
		}

		// перемещаем полученные узлы в текущую очередь
		head = std::move(front);

		// если обе очереди были полностью пустые, то меняем указатель на последний элемент в полученной очереди
		if (head != nullptr && other.head != nullptr) {
			tail = back;
		}
		// иначе если другя очередь не пустая,
		// то выполняем перемещение оставшихся элементов текущей очереди в полученные узлы
		else if (other.head) {
			back->next = std::move(head);
		}
		// иначе проводим перемещение элементов из другой очереди в полученные узлы
		else {
			back->next = std::move(other.head);

			tail = other.tail;
		}

		// обнуляем информацию у другой очереди
		other._size = 0;

		other.tail = nullptr;
	}

public:
	// удаление первого элемента
	void pop() {
		// блокируем все методы
		std::scoped_lock lock(mtx_head, mtx_tail);

		// если очередь пустая выходим
		if (head == nullptr) return;
		// иначе проверяем являются ли первый и последний элемент одним элеентом
		else switch (bool(head.get() == tail)) {
		case true:
			// если да то обнуляем конец
			tail = nullptr;

		case false:
			// тут я предпологал разблокировать методы работающие с последним элементом
			//mtx_tail.unlock();

			// меняем первый элемент на следующий
			head = std::move(head->next);
		}

		// меняем количество элементов
		--_size;
	}

	// метод полной очистки очереди
	void clear() {
		// блокируем все методы
		std::scoped_lock lock(mtx_head, mtx_tail);

		// обнуляем всю информацию
		head.reset(nullptr);
		tail = nullptr;
		_size = 0;
	}

private:
	// начальный узел
	std::unique_ptr<Node> head;

	// конечный узел
	Node* tail = nullptr;

	// количество элементов
	std::atomic_size_t _size = 0;

	// мьютекс для ограничения методов связанных с первым элементом
	mutable std::mutex mtx_head;

	// мьютекс для ограничения методов связанных с последним элементом
	mutable std::mutex mtx_tail;
};


Я так понимаю либо мне нужно менять концепцию принципа работы, либо вместо двух мьютексов сделать только один. (или если можете то предложите свои мысли по этому поводу)
  • Вопрос задан
  • 59 просмотров
Подписаться 1 Простой Комментировать
Пригласить эксперта
Ответы на вопрос 2
wataru
@wataru Куратор тега C++
Разработчик на С++, экс-олимпиадник.
Самое простое, конечно, это иметь только один мьютекс на всю очередь и лочить его при любой работе с ней. Ваша концепция имеет много проблем. Например, а что если у вас только один элемент? Тогда он и начало и конец, но в зависимости от того, что вы там вызываете, вы будете блокировать только один из двух мьютексов.

Если вы гонитесь за эффективностью и параллельностью, то вам лучше написать lock-free структуру данных. Гуглите эти ключевые слова. Но это очень сложно.

Еще, как вариант, можно физически разбить очередь на 2 куска. Один для записи, другой для чтения. Т.е. у вас 2 очереди, каждая со своим локом. При доступе к голове вы блокируете мьютекс на чтение и читаете из очереди на чтение. При доступе к хвосту - на запись. Записывать тривиально - просто лочте мьютекс на запись и добавляйте элемент в очередь на запись. При чтении лочте очередь на чтение и читайте из нее. Но, если вы выяснили, что очередь на чтение оказалась пуста, то надо залочить еще и очередь на запись и поменять их местами (т.е. перекинуть всю очередь записи в чтение). И потом вернуть голову этой очереди.

Такой подход позволит более-менее параллелить чтение и запись, особенно, если очередь часто не пуста.

Лочить только один эелемент с каждого конца сложно - там очень много случаев. Лоча 2 половины отдельно вы можете не думать о другой большую часть времени.
Ответ написан
@coodi
Есть книга "C++ Concurrency in Action", там на примере очереди очень хорошо разжевывается. Можно прямо оттуда ее и взять
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы