import asyncio
from typing import Dict
import aiohttp
import re
from tgbot.models import Projects, Clusters
from asgiref.sync import sync_to_async
from django.core.management.base import BaseCommand
class Command(BaseCommand):
help = 'Запуск обработчика'
def handle(self, *args, **options):
try:
self.stdout.write(self.style.SUCCESS('Запуск обработчика'))
asyncio.run(mainloop())
except KeyboardInterrupt:
self.stdout.write(self.style.SUCCESS('Обработчик остановлен'))
async def sendData(data, token):
url = f"http://{HOST}/webhook/?token={token}"
print(url)
try:
async with aiohttp.ClientSession() as session:
try:
async with session.post(url, json=data["result"][0], timeout=10) as response:
return await response.text()
except aiohttp.client_exceptions.ClientConnectorError:
print("ClientConnectorError")
await asyncio.sleep(1)
return None
except aiohttp.client_exceptions.ClientOSError:
print("ClientOSError")
await asyncio.sleep(1)
return None
except aiohttp.client_exceptions.ServerDisconnectedError:
print("ServerDisconnectedError")
await asyncio.sleep(1)
return None
finally:
session.close()
except asyncio.TimeoutError as e:
return None
async def process_response_queue(queue: asyncio.Queue):
print("Processor started")
while True:
url_from, data = await queue.get()
token = re.findall(r'\d{10}\:[A-Za-z0-9_-]{35}', url_from)[0]
r = None
if data.get("result"):
r = await sendData(data, token)
print(f"Received {data} from {url_from} r::{r}")
async def getUrl(token, _json=None):
offset = 0
if _json:
if _json.get("result"):
offset = _json["result"][0]["update_id"] + 1
url = f"https://api.telegram.org/bot{token}/getUpdates?timeout=20&offset={offset}"
return url
class TaskManager:
"""
Управление задачами получения данных
"""
def __init__(self):
self.queue = asyncio.Queue()
self.tasks: Dict[str, asyncio.Task] = {}
async def get_repeat(self, token, timeout=22):
url = await getUrl(token)
print(f"Task for {token} started")
try:
async with aiohttp.ClientSession() as session:
while True:
async with session.get(url, timeout=timeout) as resp:
url = await getUrl(token, await resp.json())
await self.queue.put((url, await resp.json()))
finally:
del self.tasks[token]
print(f"Task for {token} canceled")
def start_processor(self):
self.tasks["_processor"] = asyncio.create_task(process_response_queue(self.queue))
def start_new_task(self, token, offset=0):
url = f"https://api.telegram.org/bot{token}/getUpdates?timeout=20&offset={offset}"
self.tasks[token] = asyncio.create_task(self.get_repeat(token))
def stop_task(self, token):
self.tasks[token].cancel()
def close(self):
for _, task in self.tasks.items():
task.cancel()
@sync_to_async
def getTokens():
projects = Projects.objects.all()
p_bot = [project.p_bot_token for project in projects if project.p_bot_token is not None]
projects = Projects.objects.all()
da_bot = [project.da_bot_token for project in projects if project.da_bot_token is not None]
# projects = Projects.objects.all()
# c_bot = [project.c_bot_token for project in projects]
clusters = Clusters.objects.all()
clusters_bot = [cluster.bot_token for cluster in clusters if cluster.bot_token is not None]
return p_bot + da_bot + clusters_bot # + c_bot
async def mainloop():
TOKENS = []
task_manager = TaskManager()
task_manager.start_processor()
while True:
tokens = await getTokens()
if tokens is None:
continue
for token in list(set(tokens).difference(set(TOKENS))):
task_manager.start_new_task(token)
TOKENS.append(token)
print(f"add {token}")
for token in list(set(TOKENS).difference(set(tokens))):
task_manager.stop_task(token)
TOKENS.remove(token)
print(f"remove {token}")
for task_token in TOKENS:
if task_token not in task_manager.tasks:
task_manager.start_new_task(task_token)
await asyncio.sleep(2)
task_manager.close()