def long_worker_thread(event: threading.Event):
... # тут начальная подготовка. имей ввиду, что цикл начнётся с ожидания
while not event.wait(1.0): # ждём пока пройдёт заданное время - или пока event не будет взведено
... # тут работу работаем - но не слишком долго, чтобы проверки event.wait() делались регулярно!
... # завершение. Произойдёт, если был сделан break, или если event было взведено
stop_worker = threading.Event()
thread = threading.Thread(target=long_worker_thread, args=(stop_worker,))
thread.start()
... # что-то делаем пока поток крутится
stop_worker.set() # ожидание в потоке прервётся немедленно, не дожидаясь конца интервала
thread.join() # поэтому можно спокойно дождаться, пока поток не закончит работу - это будет быстро
p = multiprocessing.Process(target=gen,args=(lst,))
p.start()
p.join()
), когда он завершится. Это мало чем отличается от просто вызова gen() в твоём коде, безо всякого мультипроцессинга.import threading
import time
class Data:
def __init__(self):
self.x: int = 0
self.y: int = 0
do_sleep = False
run = True
def reader(d: Data):
while run:
x, y = d.x, d.y
# по идее это условие не должно выполниться никогда
if (x != 0) != (y != 0):
print(f'Got x={x} and y={y}')
else:
print(f'OK {x}', end='\x08\x08\x08\x08')
def writer(d: Data):
while run:
if d.x == 0:
d.x = 1
if do_sleep: pass
d.y = 1
else:
d.x = 0
if do_sleep: pass
d.y = 0
do_sleep = False
instance = Data()
reader_thread = threading.Thread(target=reader, args=(instance,), daemon=True)
writer_thread = threading.Thread(target=writer, args=(instance,), daemon=True)
reader_thread.start()
writer_thread.start()
try:
input()
finally:
run = False
reader_thread.join()
writer_thread.join()
if do_sleep: pass
закомментировать, то в консоли высвечивается только OK - иными словами, присваивание двух полей выполняется достаточно быстро, чтобы поток не успел переключиться в промежутке. Как следствие, reader() всегда видит либо x=0 y=0, либо x=1 y=1.if do_sleep: pass
оставить, то выполнение тела цикла замедляется достаточно, чтобы поток успел переключиться - и, как следствие, reader() начинает видеть структуру данных Data в неконсистентном состоянии, когда x=0 y=1 или когда x=1 y=0. import threading
class MyWorkerThread(threading.Thread):
def __init__(self, arg1: float, arg2: float): # передаём потоку входные данные
# поток не должен их менять!
super().__init__()
self.arg1 = arg1
self.arg2 = arg2
self.result: t.Optional[float] = None
def run(self):
time.sleep(10) # имитируем длительную работу
self.result = self.arg1 + self.arg2
worker = MyWorkerThread(42, 69)
worker.start()
while True:
if worker.is_alive(): # проверяем, жив ли поток
# делаешь ещё что-то, пока поток работает
print('Still working...')
time.sleep(0.5)
else:
# поток завершился, даём знать пользователю.
print(f'Done! Result is {worker.result}!')
break # выходим из цикла
worker.join()
import threading, queue
class MyWorkerThread(threading.Thread):
def __init__(self, arg1: float, arg2: float): # передаём потоку входные данные
# поток не должен их менять!
super().__init__()
self.arg1 = arg1
self.arg2 = arg2
self.result: t.Optional[float] = None
self.progress = queue.Queue()
def run(self):
for i in range(10):
time.sleep(1) # имитируем длительную работу
self.progress.put(i/10) # сообщаем о прогрессе
self.result = self.arg1 + self.arg2
self.progress.put(1.00)
worker = MyWorkerThread(42, 69)
worker.start()
while True:
if worker.is_alive(): # проверяем, жив ли поток
# делаешь ещё что-то, пока поток работает
try:
progress = worker.progress.get(block=True, timeout=0.5)
except queue.Empty: # поток ничего не сообщил
print('Still working...')
else:
print(f'Still working... {progress:.0%}')
worker.progress.task_done() # один вызов task_done() на один успешный вызов get()!
else:
# поток завершился, даём знать пользователю.
print(f'Done! Result is {worker.result}!')
break # выходим из цикла
_pickle.PicklingError: Can't pickle <function <lambda> at 0x00000122F23CEB00>: attribute lookup <lambda> on __main__ failed