Сообщество IT-специалистов
Ответы на любые вопросы об IT
Профессиональное развитие в IT
import json import time from datetime import datetime, timedelta from threading import Thread CONFIG_FILE = 'config.json' def load_config(): """Загрузить актуальный конфиг""" try: with open(CONFIG_FILE) as f: data = json.load(f) if 'next_run' in data: data['next_run'] = datetime.fromisoformat(data['next_run']) return data except (FileNotFoundError, ValueError): return {'next_run': datetime.now()} def save_config(next_run): """Сохранить конфигурацию в файл""" with open(CONFIG_FILE, 'w') as f: json.dump({'next_run': next_run.isoformat()}, f) def vote(): """Процесс голосования""" print(f'Голосование начато: {datetime.now()}') time.sleep(5) # Имитация бурной деятельности print(f'Голосование завершено: {datetime.now()}') def run_voting(): """Функция запуска голосования""" # Устанавливаем время следующего голосования new_next_run = datetime.now() + timedelta(seconds=10) # Измени на необходимое количество времени между запуском голосований save_config(new_next_run) # Запускаем процесс голосования vote() def check_schedule(): """Проверить рассылку""" # Загружаем актуальную конфигурацию config = load_config() next_run = config.get('next_run', datetime.now()) # Проверяем настало ли время запуска if datetime.now() >= next_run: # Запускам поток для голосования Thread(target=run_voting).start() return True return False def main(): """Основная функция запуска""" last_check = 0 while True: try: # Проверяем расписание каждую секунду if time.time() - last_check > 1: if check_schedule(): print('Задача запущена') last_check = time.time() time.sleep(0.1) except KeyboardInterrupt: print('Работа программы завершена') break if __name__ == '__main__': main()
Если есть вопросы, пиши.