@makarowdmitry

Не корректно срабатывает threading.RLOCK при записи в файл. Использую concurrent.futures. Ка решить?

Всем привет!
Не корректно срабатывает threading.RLOCK при записи в файл. В файл записывают несколько потоков в одно и тоже время, а должен один. Использую concurrent.futures. Делал на основе статьи https://habrahabr.ru/post/229767/

Где-то что-то не учел.

Проект из трех файлов:

Первый запуск многопотока
def multithreading(action,queryset,concurrency):
	result = task_queue(action, queryset, concurrency=concurrency)

	try:
	    while not result.done():
	        try:
	            result.result(.2)
	        except TimeoutError:
	            pass
	        # print('\rdone {done}, in work: {delayed}  '.format(**result.stats)),
	        sys.stdout.flush()
	except KeyboardInterrupt:
	    result.cancel()
	    raise


def task_queue(task, iterator, concurrency=1, on_fail=lambda _: None):
    def submit():
        try:
            obj = next(iterator)
        except StopIteration:
            return
        if result.cancelled():
            return
        stats['delayed'] += 1
        future = executor.submit(task, obj)
        future.obj = obj
        future.add_done_callback(upload_done)

    def upload_done(future):
        with io_lock:
            submit()
            stats['delayed'] -= 1
            stats['done'] += 1
        if future.exception():
            on_fail(future.exception(), future.obj)
        if stats['delayed'] == 0:
            result.set_result(stats)

    def cleanup(_):
        with io_lock:
            executor.shutdown(wait=False)

    io_lock = threading.RLock()
    executor = ThreadPoolExecutor(concurrency)
    result = Future()
    result.stats = stats = {'done': 0, 'delayed': 0}
    result.add_done_callback(cleanup)

    with io_lock:
        for _ in range(concurrency):
            submit()

    return result
if __name__ == '__main__':
    multithreading(myfunc,queryset,countthread)


Второй файл. Функция в которой основные действия

def myfunc(queryset):
	io_lock2 = threading.RLock()
	...
	
	with io_lock2:
		action_file()
	
	...


Третий файл. Функция читает файл - берет первую строку - перезаписывает файл без полученной строки.
def action_file(filename):
	links = open('source/links/'+filename,'r').read().split('\n')
	links = [value for value in links if value]
	if len(links)==0:
		firebaselib.source_count_update(filename,'empty')
		return False
	else:
		links_update = map(lambda x:x+'\n',links[1:])

		links_save = open('source/links/'+filename,'w')
		links_save.writelines(links_update)
		links_save.close()

		return links[0]


Как решить? И в какую сторону копать. Спасибо!
Готов заплатить за решение - добавляйте в скайп maktoeq
  • Вопрос задан
  • 331 просмотр
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы