Код у тебя странный.
asyncio.get_event_loop().run_forever()
не будет вызвано, так как app.run() блокирует программу. run_ssh_command() использует asyncio только для ожидания. А смысл?
Имей ввиду, Flask запускает каждый роут в отдельном event loop, так что фоновую задачу через create_task() запустить
не выйдет.
Тебе нужно будет запустить отдельный воркер в отдельном потоке. Задачей этого воркера будет выполнять ssh команды, а также сообщать текущий статус команды и результат её выполнения. Так что API у воркера должно быть несложное.
Очередь (
queue.Queue) команд принимает на вход описание команды, которую нужно выполнить, и какой-то ID запроса (чтобы понять, где чья команда).
Воркер ждёт значение из очереди, выполняет команду, и помещает отклик команды в хранилище ответов вместе с ID. Соответственно должен быть метод, который по ID проверит наличие ответа в хранилище, и вернёт его, если он есть. Хранилище стоит защить мьютексом (threading.Lock), и время от времени чистить от старых ответов.
Соответственно во Flask ты делаешь отдельный роут, который принимает ID запроса, и дергаёт этот метод воркера на предмет наличия ответа. А на стороне клиента дёргаешь этот роут периодически, пока не получишь ответ в удобном для тебя формате (например, JSON).
Вот пример реализации воркера
from typing import Union, Dict
import threading
import time
import queue
import uuid
import paramiko
class SSHWorker(threading.Thread):
def __init__(self, hostname: str, username: str, password: str):
super().__init__(daemon=True)
self.__client = paramiko.SSHClient()
self.__client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.__hostinfo = (hostname, username, password) # если сохраним эти данные, можно будет сделать реконнект
self.__commands = queue.Queue(maxsize=10) # очередь на выполнение не более 10 команд
self.__responses = {} # ответы команд
self.__response_lock = threading.Lock()
self.__run = True
def add_command(self, command: str) -> Union[str, None]:
"""Добавляет в очередь команду и возвращает её уникальный id, если это прошло успешно. Иначе вернёт None."""
cmdid = str(uuid.uuid4())
try:
self.__commands.put_nowait((cmdid, command))
except queue.Full:
return None
else:
return cmdid
def check_command(self, cmdid: str) -> Union[dict, None]:
"""Проверяет, выполнилась ли команда. Вернёт None если нет, или ответ если да."""
with self.__response_lock:
return self.__responses.get(cmdid, default=None)
def stop(self) -> None:
"""Говорит потоку остановиться. Фактическая остановка произойдёт позднее, смотря по таймауту ожидания команды."""
self.__run = False
def run(self) -> None:
self.__client.connect(self.__hostinfo[0], username=self.__hostinfo[1], password=self.__hostinfo[2])
while self.__run:
try:
cmdid, command = self.__commands.get(timeout=5.0) # есть ли команда в очереди?
except queue.Empty: # нет
# это позволяет остановить поток, если self.__run станет False, а также удалить старые незапрошенные ответы
too_old = time.monotonic() - 300.0 # удаляем все ответы старее 300 секунд, чтобы память не утекала
with self.__response_lock:
for cmdid, response in list(self.__responses.items()):
if response['timestamp'] < too_old:
del self.__responses[cmdid]
else: # что-то есть - выполняем
try:
stdin, stdout, stderr = self.__client.exec_command(f'/sbin/{command}', get_pty=True)
time.sleep(4.0)
response = stdout.read().decode('utf-8')
except Exception as err:
with self.__response_lock:
self.__responses[cmdid] = {'success': False, 'error': f'[{err.__class__.__name__}]: {err!s}', 'timestamp': time.monotonic()}
else:
with self.__response_lock:
self.__responses[cmdid] = {'success': True, 'output': response, 'timestamp': time.monotonic()}
self.__commands.task_done()
Ну и да, ты спалил данные SSH. =)