@Yasha261998

Долгое время не удается подключиться к брокеру MQTT и неправильно работают потоки greenthread?

Сайт написан на flask. Я пытаюсь подключиться к брокеру MQTT с такими параметрами, записанными в файле config.py:
class DevelopConfig(Config):
    DEBUG = True
    TESTING = True
    ASSETS_DEBUG = True


class MqttConfig(object):
    MQTT_BROKER_URL = 'temper.chost.com.ua'
    MQTT_BROKER_PORT = 1883
    MQTT_CLIENT_ID = 'flask_mqtt'
    MQTT_CLEAN_SESSION = True
    MQTT_USERNAME = 'test'
    MQTT_PASSWORD = 'tester'
    MQTT_KEEPALIVE = 2
    MQTT_TLS_ENABLED = False
    MQTT_LAST_WILL_TOPIC = 'home/lastwill'
    MQTT_LAST_WILL_MESSAGE = 'bye'
    MQTT_LAST_WILL_QOS = 2

Запуск приложения Flask выполняется через файл app.py:
# обработчик страницы профиля пользователя
# только для авторизованных пользователей
@app.route('/profile/<int:iduser>')
@login_required
def profile(iduser):
    user = Users.query.get(iduser)
    devices = user.devices.all()
    print(devices)
    if len(devices) != 0:
        mqttToaa = MqttTOAA(devices[1].device_code, devices[1].typedev.reverse)
    return render_template('profile.html', iduser=current_user.get_id())

if __name__ == '__main__':
    try:
        socketio.run(app, host='127.0.0.1', port=5000, use_reloader=False, debug=True)
    except socket.error as socketerror:
        print("Error socketio: " + socketerror)


Файл для работы с сервером MQTT:

# -*- coding: utf-8 -*-

from app import socketio, mqtt
from flask import request
import eventlet
import eventlet.greenthread as greenthread
import json


class MqttTOAA(object):
    # топик контроля воротами забора, топик данных воротами забора, топик контроля гаражными, топик 
данных гаражными
    type_topic = ["/Control", "/Data"]
    m_request_state = {"comm": "3"}  # запрос на получение статуса ворот
    m_start = {"Gate": "Start"}  # сообщение для открытия/закрытия ворот
    m_stop = {"Gate": "Stop"}  # сообщение для остановки ворот
    qos_request = 2
    qos_sub = 0
    # состояние ворот: действие
    dict_state_button_fence = {"con_Clos": "Открыть",
                           "con_Open": "Закрыть",
                           "fl_OpenClos": ("Остановить", "Продолжить")}
    dict_state_button_garage = {"con_Clos": "Открыть",
                            "con_Open": "Закрыть",
                            "fl_OpenClos": ("Продолжить", "Остановить", "Прервать")}
    # действие: статус, отображаемый на странице
    dict_state_text_fence = {"Открыть": "закрыты",
                         "Закрыть": "открыты",
                         "Остановить": ("открываются", "закрываются", "в движении"),
                         "Продолжить": "остановлены"}
    dict_state_text_garage = {"Открыть": "закрыты",
                          "Закрыть": "открыты",
                          "Продолжить": "остановлены",
                          "Прервать": "закрываются",
                          "Остановить": ("открываются", "закрываются", "в движении")}
    # поля: итоговые статусы
    dict_type_element_fence = {"button": "", "text": ""}  # текст кнопки и отображаемый статус
dict_type_element_garage = {"button": "", "text": ""}
state_gate_fence = {}  # статус ворот забора
state_gate_garage = {}  # статус ворот гаража
initial_position_fence = ""  # первоначальное положение
position_garage = {"state": "", "stop": False}  # предыдущая позиция ворот и отметка о том, были ли отсановленны
POOL_TIME = 3     # Интервал отправки запроса брокеру

def __init__(self, device_code, reverse):
    self.mqtt_connect = mqtt.on_connect()(self._handle_connect)
    self.mqtt_onmessage = mqtt.on_message()(self._handle_mqtt_message)
    self.mqtt_onlog = mqtt.on_log()(self._handle_logging)
    self.socketio_error = socketio.on_error()(self._handle_error)
    self.handle_change_state = socketio.on('change_state')(self._handle_change_state)
    self.device_code = "BK" + device_code
    self.reverse = reverse

# обработчик ошибок
def _handle_error(self):
    print(request.event["message"])  # "my error event"
    print(request.event["args"])  # (data,)


# функция изменения состояния ворот по нажатию
def _handle_change_state(self):
    message = None
    try:
        # для каких из ворот необходимо сменить состояние
        if self.reverse is not True:
            type_g = self.state_gate_fence
        else:
            type_g = self.state_gate_garage

        if type_g["fl_OpenClos"] == 1:  # ворота в движении -> остановка
            message = self.m_stop
        # остановились и двигаются в обратном направлении -> остановка
        elif (type_g["fl_OpenClos"] == 0) and (self.position_garage["state"] == "закрываются"):
            message = self.m_stop
            self.position_garage["state"] = "открываются"
        else:  # ворота остановленны -> продолжение движения
            message = self.m_start
        print(self.position_garage["state"])
        print(message)
    except Exception as ex:
        print(ex)
    mqtt.publish(self.device_code + self.type_topic[0], json.dumps(message), self.qos_request)

# передача запроса на получение данных
@staticmethod
def handle_publish(topic_req_res, m_req_state, qos_req, timer):
    while True:
        print("Send")
        eventlet.sleep(timer)
        msg = json.dumps(m_req_state)
        mqtt.publish(topic_req_res, msg, qos_req)

# ожидание подключения к брокеру,
# затем подписка на топик и запуск потока для постоянной отсылки сообщений в топик Control
def _handle_connect(self, client, userdata, flags, rc):
    mqtt.subscribe(self.device_code + self.type_topic[1], self.qos_sub)
    print("Subscribe!")
    publish_thread = greenthread.spawn(self.handle_publish, self.device_code + self.type_topic[0],
                                       self.m_request_state, self.qos_request, self.POOL_TIME)

# обработка принятых сообщений от топика, на который подписан
def _handle_mqtt_message(self, client, userdata, message):
    print("Get message")
    data = dict(
        topic=message.topic,
        payload=message.payload.decode(),
        qos=message.qos,
        )
    try:
        data = json.loads(data['payload'])
        if self.reverse is not True:
            self.fence_msg(data)
        else:
            self.garage_msg(data)
    except Exception as ex:
        print("Exception: " + str(ex))

Подключение выполняется очень долго или вообще не выполняется. В логах постоянно выводятся 2 сообщения:
16 Sending PINGREQ 
16 Received PINGRESP

Код зависает в ожидании того когда вызовется декоратор
mqtt.on_connect ()

Проблема скорее всего в неправильной организации потоков. Для создания потоков я использую модуль eventlet.greenthread, а для работы с сервером MQTT - Flask-MQTT. Файл инициализации и создания Flask экземпляра init.py:
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_mail import Mail
from flask_script import Manager
from flask_socketio import SocketIO
from flask_mqtt import Mqtt
import eventlet
from config import DevelopConfig, MqttConfig, MailConfig

eventlet.monkey_patch()

app = Flask(__name__)
app.config.from_object(DevelopConfig)       # применение конфигурация разработчика
app.config.from_object(MqttConfig)          # конфигурация для работы с сервером MQTT
app.config.from_object(MailConfig)          # конфигурация для работы с email
db = SQLAlchemy(app)
migrate = Migrate(app, db)
mail = Mail(app)
manager = Manager(app, db)
socketio = SocketIO(app)
mqtt = Mqtt(app)

from app import models

if __name__ == "__main__":
    manager.run()
  • Вопрос задан
  • 139 просмотров
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы