Сайт написан на flask. Я пытаюсь подключиться к брокеру MQTT с такими параметрами, записанными в файле config.py:
class DevelopConfig(Config):
DEBUG = True
TESTING = True
ASSETS_DEBUG = True
class MqttConfig(object):
MQTT_BROKER_URL = 'temper.chost.com.ua'
MQTT_BROKER_PORT = 1883
MQTT_CLIENT_ID = 'flask_mqtt'
MQTT_CLEAN_SESSION = True
MQTT_USERNAME = 'test'
MQTT_PASSWORD = 'tester'
MQTT_KEEPALIVE = 2
MQTT_TLS_ENABLED = False
MQTT_LAST_WILL_TOPIC = 'home/lastwill'
MQTT_LAST_WILL_MESSAGE = 'bye'
MQTT_LAST_WILL_QOS = 2
Запуск приложения Flask выполняется через файл app.py:
# обработчик страницы профиля пользователя
# только для авторизованных пользователей
@app.route('/profile/<int:iduser>')
@login_required
def profile(iduser):
user = Users.query.get(iduser)
devices = user.devices.all()
print(devices)
if len(devices) != 0:
mqttToaa = MqttTOAA(devices[1].device_code, devices[1].typedev.reverse)
return render_template('profile.html', iduser=current_user.get_id())
if __name__ == '__main__':
try:
socketio.run(app, host='127.0.0.1', port=5000, use_reloader=False, debug=True)
except socket.error as socketerror:
print("Error socketio: " + socketerror)
Файл для работы с сервером MQTT:
# -*- coding: utf-8 -*-
from app import socketio, mqtt
from flask import request
import eventlet
import eventlet.greenthread as greenthread
import json
class MqttTOAA(object):
# топик контроля воротами забора, топик данных воротами забора, топик контроля гаражными, топик
данных гаражными
type_topic = ["/Control", "/Data"]
m_request_state = {"comm": "3"} # запрос на получение статуса ворот
m_start = {"Gate": "Start"} # сообщение для открытия/закрытия ворот
m_stop = {"Gate": "Stop"} # сообщение для остановки ворот
qos_request = 2
qos_sub = 0
# состояние ворот: действие
dict_state_button_fence = {"con_Clos": "Открыть",
"con_Open": "Закрыть",
"fl_OpenClos": ("Остановить", "Продолжить")}
dict_state_button_garage = {"con_Clos": "Открыть",
"con_Open": "Закрыть",
"fl_OpenClos": ("Продолжить", "Остановить", "Прервать")}
# действие: статус, отображаемый на странице
dict_state_text_fence = {"Открыть": "закрыты",
"Закрыть": "открыты",
"Остановить": ("открываются", "закрываются", "в движении"),
"Продолжить": "остановлены"}
dict_state_text_garage = {"Открыть": "закрыты",
"Закрыть": "открыты",
"Продолжить": "остановлены",
"Прервать": "закрываются",
"Остановить": ("открываются", "закрываются", "в движении")}
# поля: итоговые статусы
dict_type_element_fence = {"button": "", "text": ""} # текст кнопки и отображаемый статус
dict_type_element_garage = {"button": "", "text": ""}
state_gate_fence = {} # статус ворот забора
state_gate_garage = {} # статус ворот гаража
initial_position_fence = "" # первоначальное положение
position_garage = {"state": "", "stop": False} # предыдущая позиция ворот и отметка о том, были ли отсановленны
POOL_TIME = 3 # Интервал отправки запроса брокеру
def __init__(self, device_code, reverse):
self.mqtt_connect = mqtt.on_connect()(self._handle_connect)
self.mqtt_onmessage = mqtt.on_message()(self._handle_mqtt_message)
self.mqtt_onlog = mqtt.on_log()(self._handle_logging)
self.socketio_error = socketio.on_error()(self._handle_error)
self.handle_change_state = socketio.on('change_state')(self._handle_change_state)
self.device_code = "BK" + device_code
self.reverse = reverse
# обработчик ошибок
def _handle_error(self):
print(request.event["message"]) # "my error event"
print(request.event["args"]) # (data,)
# функция изменения состояния ворот по нажатию
def _handle_change_state(self):
message = None
try:
# для каких из ворот необходимо сменить состояние
if self.reverse is not True:
type_g = self.state_gate_fence
else:
type_g = self.state_gate_garage
if type_g["fl_OpenClos"] == 1: # ворота в движении -> остановка
message = self.m_stop
# остановились и двигаются в обратном направлении -> остановка
elif (type_g["fl_OpenClos"] == 0) and (self.position_garage["state"] == "закрываются"):
message = self.m_stop
self.position_garage["state"] = "открываются"
else: # ворота остановленны -> продолжение движения
message = self.m_start
print(self.position_garage["state"])
print(message)
except Exception as ex:
print(ex)
mqtt.publish(self.device_code + self.type_topic[0], json.dumps(message), self.qos_request)
# передача запроса на получение данных
@staticmethod
def handle_publish(topic_req_res, m_req_state, qos_req, timer):
while True:
print("Send")
eventlet.sleep(timer)
msg = json.dumps(m_req_state)
mqtt.publish(topic_req_res, msg, qos_req)
# ожидание подключения к брокеру,
# затем подписка на топик и запуск потока для постоянной отсылки сообщений в топик Control
def _handle_connect(self, client, userdata, flags, rc):
mqtt.subscribe(self.device_code + self.type_topic[1], self.qos_sub)
print("Subscribe!")
publish_thread = greenthread.spawn(self.handle_publish, self.device_code + self.type_topic[0],
self.m_request_state, self.qos_request, self.POOL_TIME)
# обработка принятых сообщений от топика, на который подписан
def _handle_mqtt_message(self, client, userdata, message):
print("Get message")
data = dict(
topic=message.topic,
payload=message.payload.decode(),
qos=message.qos,
)
try:
data = json.loads(data['payload'])
if self.reverse is not True:
self.fence_msg(data)
else:
self.garage_msg(data)
except Exception as ex:
print("Exception: " + str(ex))
Подключение выполняется очень долго или вообще не выполняется. В логах постоянно выводятся 2 сообщения:
16 Sending PINGREQ
16 Received PINGRESP
Код зависает в ожидании того когда вызовется декоратор
mqtt.on_connect ()
Проблема скорее всего в неправильной организации потоков. Для создания потоков я использую модуль eventlet.greenthread, а для работы с сервером MQTT - Flask-MQTT. Файл инициализации и создания Flask экземпляра init.py:
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_mail import Mail
from flask_script import Manager
from flask_socketio import SocketIO
from flask_mqtt import Mqtt
import eventlet
from config import DevelopConfig, MqttConfig, MailConfig
eventlet.monkey_patch()
app = Flask(__name__)
app.config.from_object(DevelopConfig) # применение конфигурация разработчика
app.config.from_object(MqttConfig) # конфигурация для работы с сервером MQTT
app.config.from_object(MailConfig) # конфигурация для работы с email
db = SQLAlchemy(app)
migrate = Migrate(app, db)
mail = Mail(app)
manager = Manager(app, db)
socketio = SocketIO(app)
mqtt = Mqtt(app)
from app import models
if __name__ == "__main__":
manager.run()