Recosh
@Recosh
Программист студент

Запись в 1 переменную с двух потоков. Как правильно обработать очередь в python?

К примеру есть такая упрощённая конструкция с запуском 2х потоков и одновременной перезаписью одной переменной, в итоге образуется рандом в последней строчке вывода.
import threading as th

lenAllData = 1000000
queueData = []

def threadAddQueue():
    global queueData
    # Симулируем заполнение очереди
    for i in range(lenAllData):
        queueData.append(i)

def threadProcQueue():
    global queueData
    queueProceed = []
    # симулируем разбор очереди
    while True:
        if len(queueData) > 0:
            queueProceed = queueProceed + queueData
            queueData = []
            print(f'Length {len(queueProceed)}, last value {queueProceed[-1]}')

# Запускаем и получаем рандом в выводе
if __name__ == '__main__':
    th1 = th.Thread(target=threadAddQueue)
    th1.start()

    th2 = th.Thread(target=threadProcQueue)
    th2.start()

Пример вывода

Length 33021, last value 33020
Length 52147, last value 52147
Length 71771, last value 71772
Length 106174, last value 106175
Length 143665, last value 143667
Length 184920, last value 184922
Length 223584, last value 223587
Length 242001, last value 242004
Length 277876, last value 277879
Length 313757, last value 313761
Length 331471, last value 351468
Length 374527, last value 394524
Length 393301, last value 413299
Length 408631, last value 428629
Length 425010, last value 464092
Length 441459, last value 502028
Length 479035, last value 557692
Length 498091, last value 597529
Length 536072, last value 652986
Length 572724, last value 709981
Length 609640, last value 764660
Length 646480, last value 824379
Length 682897, last value 881066
Length 724371, last value 943361
Length 761276, last value 997796


И при этом нельзя ставить в паузу первый поток на долго, так как из за этого зависнет другая часть программы.
Как правильно обработать очередь не приостанавливая первый поток и не потерять данные? При этом мне нужен паттерн похожий на глобальные переменные и потоки (не asyncio)
  • Вопрос задан
  • 775 просмотров
Решения вопроса 1
Recosh
@Recosh Автор вопроса
Программист студент
В общем решил так:
import threading as th
from collections import deque

lenAllData = 1000000

class ClassForQueue:
    queueData = deque()

def threadAddQueue():
    # Симулируем заполнение очереди
    print('Start added to Queue')
    for i in range(lenAllData):
        ClassForQueue.queueData.append(i)
    print('All added to Queue')


def threadProcQueue():
    # симулируем разбор очереди
    result = deque()
    while True:
        if len(ClassForQueue.queueData):
            result.append(ClassForQueue.queueData.popleft())
        if result[-1] == lenAllData - 1:
            print(f'final! Length {len(result)}, last value {result[-1]}')
            break

# Запускаем и получаем адекватные данные в выводе
if __name__ == '__main__':
    th1 = th.Thread(target=threadAddQueue)
    th1.start()

    th2 = th.Thread(target=threadProcQueue)
    th2.start()

Как выяснилось, очереди сильно тормозили потоки, так как мне не требуется шарить данные больше чем в 1 поток, то решил опробовать deque, стало работать ещё быстрее чем в исходном примере
Ответ написан
Комментировать
Пригласить эксперта
Ответы на вопрос 2
@bbkmzzzz
На помощь придет Queue
Ответ написан
firedragon
@firedragon
Не джун-мидл-сеньор, а трус-балбес-бывалый.
Стандартное решение во всех языках это либо лок после получения данных, либо lockfree примитивы. В последнем проекте я использовал тупой поток для получения данных и опросники которые просто возвращают текущее значение. Это позволило поднять rps c жалких 5 до 400
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы