Использовать докер в продакшене - нормальное решение?
Не упустил ли я какие-то важные команды, которые не используются при разработке, но нужны перед сборкой продакшена?
после чего память на сервере закончилась
Автоматический бекап базы данных - ответственность самого сервиса (в коде делать дамп), докера (использовать какой-то image для бекапов) или сервера (непосредственно на сервере настроить крон)?
Как не терять данные из бд при перезапусках контейнера?
Но тк приложение работает в докере, я не могу выполнить nano app.logs для просмотра логов.
import typing
import asyncio
import functools
def throttled(delay: float, measure: typing.Literal['end_to_start', 'start_to_start'] = 'start_to_start'):
def decorator(actual_func: typing.Coroutine) -> typing.Coroutine:
queue = None
task = None
async def _single_query(future, args, kwargs):
try:
result = await actual_func(*args, **kwargs) # тут делаем асинхронное обращение к сервису
except BaseException as err:
future.set_exception(err) # была ошибка - теперь await future выкинет исключение
else:
future.set_result(result) # полуен результат - await future вернёт его
async def _work_loop():
nonlocal queue
nonlocal task
while True:
try:
# ждем, пока не придёт запрос, или пока не закончится таймаут
future, args, kwargs = await asyncio.wait_for(queue.get(), delay)
except asyncio.TimeoutError: # новые запросы долго не приходят, сворачиваем работу, чтобы не тратить ресурсы
queue = None
task = None
return
single_task = _single_query(future, args, kwargs)
if measure == 'start_to_start':
asyncio.create_task(single_task)
else:
await single_task
queue.task_done() # каждому успешному get() соответствует task_done()
await asyncio.sleep(delay)
@functools.wraps(actual_func)
async def query(*args, **kwargs):
nonlocal queue # обращение к переменной выше уровнем, но не глобальной
nonlocal task
future = asyncio.Future() # Future просигналит, когда наш запрос будет обслужен
if task is None: # либо это первый запрос, либо запросы долго не приходили, и мы свернули работу
queue = asyncio.Queue()
task = asyncio.create_task(_work_loop())
await queue.put((future, args, kwargs))
return await future
return query
return decorator
# delay - минимальный интервал между запросами в секундах
# measure - как мерять интервалы между запросами: начало-начало или конец-начало
@throttled(delay=5.0, measure='start_to_start')
async def my_coroutine(*args, **kwargs) -> ReturnValue:
...
def cool_staff(form_class, inits, defaults, user, other_param):
# много строчек кода
...
res = cool_staff(form_class=MainForm, inits={a:1, b:3}, defaults=[1,2,3], ...)
...
res = cool_staff(form_class=MainForm, inits={a:100500, b:42}, defaults=[3,2,1], ...)
...
main_cool_staff = lambda **kwargs: cool_staff(form_class=MainForm, **kwargs)
...
res = main_cool_staff(inits={a:1, b:3}, defaults=[1,2,3], ...)
...
res = main_cool_staff(inits={a:100500, b:42}, defaults=[3,2,1], ...)
...
main_cool_staff = lambda *args, **kwargs: cool_staff(form_class=MainForm, *args, **kwargs)
import functools
main_cool_staff = functools.partial(cool_staff, MainForm)
example.com/webhook/notification
и передаёт в теле POST запроса JSON (зачастую) и там будут свежие данные.bot.register_next_step_handler(message, process_name_step)
def process_name_step(message)
import asyncio
from aiogram import Bot, types
from aiogram.dispatcher import Dispatcher
from aiogram.utils import executor
DELAY = 7200
bot = Bot(token='BOT TOKEN HERE')
dp = Dispatcher(bot)
@dp.message_handler(commands=['start', 'help'])
async def send_welcome(message: types.Message):
await message.reply("Hi!\nI'm EchoBot!\nPowered by aiogram.")
async def update_price():
...
def repeat(coro, loop):
asyncio.ensure_future(coro(), loop=loop)
loop.call_later(DELAY, repeat, coro, loop)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.call_later(DELAY, repeat, update_price, loop)
executor.start_polling(dp, loop=loop)