import asyncio
from concurrent.futures import ThreadPoolExecutor
from flask import Flask, jsonify
import discord
from discord.ext import commands
from settings import settings
app = Flask(__name__)
intents = discord.Intents.default()
intents.message_content = True
intents.members = True
bot = commands.Bot(command_prefix='>', intents=intents)
def start_flask():
app.run(host='localhost', port=8000, debug=True)
async def start_bot():
await bot.start(settings['discordToken'])
def run_flask():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(start_flask())
def run_bot():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(start_bot())
@app.route('/api/getUser/<user_id>')
def get_user_route(user_id):
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(get_user(user_id))
except discord.NotFound:
return jsonify({"error": "User not found"}), 404
async def get_user(user_id):
user = await bot.fetch_user(user_id)
guild_id = settings['hakuServerId']
guild = await bot.fetch_guild(guild_id)
if guild:
member = await guild.fetch_member(user.id)
if member:
avatar = user.avatar
nickname = member.display_name
banner = user.banner.url if user.banner else user.accent_color
roles = [role.name for role in member.roles]
banner = banner if banner is not None else user.accent_color
return jsonify({
"avatar": str(avatar.url) if avatar else None,
"nickname": str(nickname),
"roles": roles,
"discord_id": user_id,
"banner": str(banner) if banner else None
})
@bot.event
async def on_ready():
print(f'[{bot.user}] Успешная авторизация')
if __name__ == "__main__":
with ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(run_flask)
executor.submit(run_bot)
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
* Running on http://localhost:8000
Press CTRL+C to quit
[Бот] Успешная авторизация
# main.py
bot = NoirBot(debug=True)
thread_bot = threading.Thread(target=bot.run, name="bot")
try:
thread_bot.start()
asyncio.run(bot.serve_app())
except Exception as exp:
lprint(f"Connection failed: {exp}", Color.red)
finally:
lprint(f"Stopped", Color.yellow)
exit()
# Bot.py
# Run & Stop funcs
def run(self) -> None:
try:
self.loop.run_until_complete(self.start())
except KeyboardInterrupt:
self.node.disconnect()
self.loop.run_until_complete(self.close())
# cancel all tasks lingering
finally:
self.loop.close()
def start(self):
if self._debug:
return super().start(self._config.get("altsecrets", "token"))
return super().start(self._config.get("secrets", "token"))
# Bot.py
# App
async def serve_app(self):
lprint("Checking routers", Color.blue, worker="APP")
routersLoad(self)
lprint("Loading Quart", Color.blue, worker="APP")
config = hypercorn.Config()
config.bind = ["0.0.0.0:5001"]
config.use_reloader = True
lprint("Done", Color.green, worker="APP")
await serve(self._app, config)