Всем привет, поставили такую задачу, написать приложение на питоне. Работаю с этим вообще первый раз. Суть такова, что если в словаре у ключа RESULT, сразу после подключения к контроллеру, есть значение 1, то нужно выводить ошибку что-то вроде "Перезагрузите контроллер". Пробую, понимаю, что функция подключения к контроллеру в другом потоке, пробую через сигналы, тоже не то. Основная проблема в том, что я не могу избавиться от многопоточности, ибо несколько процессов должны выполняться долго и одновременно.
Так же стоит уточнить, что я уже знаю, что все объекты QT должны вызываться из основного потока, как мне их туда перетащить?
Помогите разобраться в проблеме и посоветуйте нормальную песочницу питоновскую, а то я ничего вменяемого не нашёл
Код в файле main.py
def Connect_to_controller(tcp_win,data_queue):
host = '10.55.6.170'
port = 23
try:
client_socket.connect((host, port))
print("Успешно подключено")
response = client_socket.recv(1440)
data = extract_data(response)
if 'RESULT' in data and data['RESULT'] == '1\r':
from interface import TCP_Checker_Window
tcp_win = TCP_Checker_Window()
error_message = 'Не удалось начать тестирование! Перезагрузите контроллер!'
tcp_win.error_signal.emit(error_message)
client_socket.close()
return
else:
while True:
response = client_socket.recv(1440)
data_to_send = "Ваши данные для контроллера"
send_data_to_controller(data_to_send)
data = extract_data(response)
print(data)
data_queue.put(data)
if 'RESULT' in data and data['RESULT'] == '1\r':
break
try:
client_socket.close()
except Exception as e:
print(f"Произошла ошибка при закрытии сокета: {e}")
except Exception as e:
print(f"Произошла ошибка при подключении или обмене данными: {e}")
код в файле interface.py
def reboot_controller_message(self, error_message):
QMessageBox.warning(self, 'Не удалось начать тестирование!', error_message)
Код из файла app.py
def tcp_check(window):
tcp_check = TCP_Checker_Window()
window.hide()
tcp_check.show()
tcp_check.resize(900, 700)
factory = window.factory_input.text()
operator = window.operator_input.text()
serial_start = window.serial_start_input.text()
tcp_check.labels['Factory'].setText(f'Завод: {factory}')
tcp_check.labels['Operator'].setText(f'Оператор: {operator}')
tcp_check.labels['SerialStart'].setText(f'Начало серийного номера: {serial_start}')
labels = tcp_check.labels
data_queue = queue.Queue()
# Поток подключения к контроллеру
controller_thread = threading.Thread(target=Connect_to_controller, args=(tcp_check, data_queue,))
controller_thread.start()
def update_interface(*args):
while True:
data = data_queue.get()
tcp_check.update_signal.emit(data)
# Поток обновления окна
update_thread = threading.Thread(target=update_interface, args=(labels, ))
update_thread.start()
update_timer = QTimer()
update_timer.timeout.connect(app.processEvents)
update_timer.start(100)
# Подключение сигнала к слоту
tcp_check.update_signal.connect(tcp_check.update_text_fields_from_signal)
if __name__ == '__main__':
main()