Задать вопрос
  • Как сделать доступ по IP и PORT между контейнерами в Docker Swarm?

    @Ninzalo Автор вопроса
    Алан Гибизов, спасибо, но хотелось бы все-таки увидеть ссылочки на похожие темы, так как ищу информацию уже не первый день с:
    а тег PYTHON включил, так как новичок в этом вопросе и не совсем понимаю в чем заключается проблема, возможно, что python код тут не лишний
  • Есть ли способ аннотировать аргументы метода переменной класса?

    @Ninzalo Автор вопроса
    нет, у меня конфиг (теоретически) может различаться, но я его нигде не изменяю
  • Как запустить выполнение асинхронной функции с определенной частотой выполнения?

    @Ninzalo Автор вопроса
    Vindicar, протестировал ваш код. После небольшого рефакторинга все заработало.
    Отличное решение! Теперь не приходится создавать этот класс и тащить его через весь код
    Спасибо!
    Код

    import asyncio
    from datetime import datetime
    import pickle
    
    from lib.bot.converters import dataclass_from_dict
    from lib.bot.struct import Message_struct
    from config import LOCAL_IP, LOCAL_PORT, MESSAGE_REPLY_RATE
    
    import typing
    import functools
    
    
    def throttled(
        delay: float,
        measure: typing.Literal[
            "end_to_start", "start_to_start"
        ] = "start_to_start",
    ):
        def decorator(actual_func: typing.Coroutine) -> typing.Coroutine:
            queue = None
    
            async def _single_query(future, args, kwargs):
                try:
                    result = await actual_func(
                        *args, **kwargs
                    )  # тут делаем асинхронное обращение к сервису
                except Exception as err:
                    future.set_exception(
                        err
                    )  # была ошибка - теперь await future выкинет исключение
                else:
                    future.set_result(
                        result
                    )  # полуен результат - await future вернёт его
    
            async def _work_loop():
                nonlocal queue
                while True:
                    try:
                        # ждем, пока не придёт запрос, или пока не закончится таймаут
                        future, args, kwargs = await asyncio.wait_for(
                            queue.get(), delay
                        )
                    except (
                        asyncio.TimeoutError
                    ):  # новые запросы долго не приходят, сворачиваем работу, чтобы не тратить ресурсы
                        queue = None
                        return
                    task = _single_query(future, args, kwargs)
                    if measure == "start_to_start":
                        asyncio.create_task(task)
                    else:
                        await task
                    queue.task_done()  # каждому успешному get() соответствует task_done()
                    await asyncio.sleep(
                        delay
                    )  # можно учесть, сколько времени делался запрос. Но стоит ли?
    
            @functools.wraps(actual_func)
            async def query(*args, **kwargs):
                nonlocal queue  # обращение к переменной выше уровнем, но не глобальной
                future = (
                    asyncio.Future()
                )  # Future просигналит, когда наш запрос будет обслужен
                if (
                    queue is None
                ):  # либо это первый запрос, либо запросы долго не приходили, и мы свернули работу
                    queue = asyncio.Queue()
                    asyncio.create_task(_work_loop())
                await queue.put((future, args, kwargs))
                return await future
            return query
        return decorator
    
    async def handle_echo(reader, writer) -> None:
        tasks = []
        while True:
            data = await reader.read()
            if not data:
                break
            message = pickle.loads(data)
            message_cls = dataclass_from_dict(
                struct=Message_struct, dictionary=message
            )
            task = asyncio.create_task(replier(message_cls))
            tasks.append(task)
        addr = writer.get_extra_info("peername")
        print(f"Received {message_cls!r} from {addr!r}")
        await asyncio.gather(*tasks)
        writer.close()
    
    
    @throttled(delay=1.0/MESSAGE_REPLY_RATE, measure='start_to_start')
    async def replier(message_cls: Message_struct):
        # send answer to user
        print(
            f"Answered to {message_cls.user_id}, with text "
            f"'{message_cls.text}' at {datetime.now()}"
        )
    
    
    async def main() -> None:
        server = await asyncio.start_server(
            lambda reader, writer: handle_echo(reader, writer),
            LOCAL_IP,
            LOCAL_PORT,
        )
        addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets)
        print(f"Serving on {addrs}")
        async with server:
            await server.serve_forever()
    asyncio.run(main())

  • Как запустить выполнение асинхронной функции с определенной частотой выполнения?

    @Ninzalo Автор вопроса
    Vindicar, спасибо огромное, очень сильно помогли, все прекрасно работает
  • Как запустить выполнение асинхронной функции с определенной частотой выполнения?

    @Ninzalo Автор вопроса
    Vindicar Но что делать, если, допустим, получение ответа от api занимает 5 секунд?
    В вашем примере в итоге получается, что запросы идут последовательно (ответ получил -> новый запрос отправил и тд).
    Есть ли способ все равно делать эти запросы асинхронно, но с определенной частотой?
    Или же я неправильно использовал ваш пример?
  • Как запустить выполнение асинхронной функции с определенной частотой выполнения?

    @Ninzalo Автор вопроса
    При отправке, допустим, 10 сообщений из мессенжера, они все равно обработаются практически одновременно (учитывая мой пример, где обработка = вывод message.text и datetime.now())
  • Как запустить выполнение асинхронной функции с определенной частотой выполнения?

    @Ninzalo Автор вопроса
    Вот финальный код, включая мой пример:
    Код

    import asyncio
    from datetime import datetime
    import pickle
    
    from lib.bot.converters import dataclass_from_dict
    from lib.bot.struct import Message_struct
    from config import LOCAL_IP, LOCAL_PORT, MESSAGE_REPLY_RATE
    
    class ThrottledResource:
        def __init__(self, delay: float):
            self._delay = delay
            self._queue = asyncio.Queue()
            self._task = None
    
        def start(self):
            self._task = asyncio.create_task(self._work_loop())
        
        def stop(self):
            self._task.cancel()
            self._task = None
    
        # этот метод вызывается клиентским кодом, получает параметры и возвращает отклик спустя время.
        async def query(self, params):
            future = asyncio.Future()  # Future просигналит, когда наш запрос будет обслужен
            await self._queue.put((future, params))
            result = await future  # корутина спит, пока запрос не обслужат
            return result
    
        async def _work_loop(self):
            while True:
                future, params = await self._queue.get()  # ждем, пока не придёт запрос
                try:
                    result = await replier(params)  # тут делаем асинхронное обращение к сервису
                except Exception as err:
                    future.set_exception(err)  # была ошибка - теперь await future выкинет исключение
                else:
                    future.set_result(result)  # полуен результат - await future вернёт его
                self._queue.task_done()  # каждому успешному get() соответствует task_done()
                await asyncio.sleep(self._delay)
    
    
    async def handle_echo(reader, writer, message_throttler) -> None:
        tasks = []
        while True:
            data = await reader.read()
            if not data:
                break
            message = pickle.loads(data)
            message_cls = dataclass_from_dict(
                struct=Message_struct, dictionary=message
            )
            
            task = asyncio.create_task(message_throttler.query(message_cls))
            tasks.append(task)
    
        await asyncio.gather(*tasks)
        addr = writer.get_extra_info("peername")
        # print(f"Received {message_cls!r} from {addr!r}")
        writer.close()
    
    
    async def replier(message_cls: Message_struct):
        # process the received data here
        print(message_cls.text, datetime.now())
    
    
    async def main() -> None:
        message_throttler = ThrottledResource(delay=1.0/MESSAGE_REPLY_RATE)
        message_throttler.start()
        server = await asyncio.start_server(
            lambda reader, writer: handle_echo(reader, writer, message_throttler),
            LOCAL_IP,
            LOCAL_PORT
        )
    
        addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets)
        print(f"Serving on {addrs}")
    
        async with server:
            await server.serve_forever()
    
    
    asyncio.run(main())

  • Как правильно структурировать функции датакласса?

    @Ninzalo Автор вопроса
    Dr. Bacon, Для чего я это сделал? -Все просто! Потому что мне это было нужно.
    А от тебя внятного ответа с примерами я так и не увидел

    По понятиям ООП, ты как раз должен прятать подобную реализацию c.keys_funcs.add_key в c.add_key или т.п

    А если таких функций сотня, а может и больше? Код становится нечитабельным. Мне необходима была структуризация и классификация, я ее и сделал
  • Как правильно создать payload?

    @Ninzalo Автор вопроса
    Спасибо за подробный ответ!
  • Как правильно создать payload?

    @Ninzalo Автор вопроса
    Модератор, попытки у меня были, есть и будут, но мне не нужно конкретное решение, поэтому я и не указал код попыток. Вопрос не в готовом решении, а в примерах реализации от людей, которые в этом понимают больше меня, либо в статьях, которые помогут мне в этом разобраться. Задача не является частной.
    С нарушением п.5.12 я не согласен
  • Как получить новую запись на стене вк?

    @Ninzalo Автор вопроса
    iBird Rose, гениальный и простой ответ, я просто не создал новый ключ api после изменения настроек, спасибо!
  • Как получить список дат на неделе?

    @Ninzalo Автор вопроса
    Александр, мой метод меня хоть и устраивает по функционалу и скорости, хотел что-то более лаконичное
    метод seven5674 мне нравится намного больше моего, да и скорость выполнения больше (хоть и на 1мс, но тк мне надо много таких вычислений, это тоже сэкономленное время)
  • Как получить список дат на неделе?

    @Ninzalo Автор вопроса
    Александр, вот решение с помощью list comprehension
    today_date = '2022-4-1'
    today_date = datetime.datetime.strptime(today_date, '%Y-%m-%d')
    today_num = today_date.weekday()
    days = [(today_date - datetime.timedelta(days=delta)).strftime('%Y-%m-%d') for delta in reversed(range(0, today_num + 1))] 
    days += ([(today_date + datetime.timedelta(days=delta)).strftime('%Y-%m-%d') for delta in range(1, 7 - today_num)])


    вывод:
    ['2022-03-28', '2022-03-29', '2022-03-30', '2022-03-31', '2022-04-01', '2022-04-02', '2022-04-03']
  • Как получить список дат на неделе?

    @Ninzalo Автор вопроса
    Vindicar,
    today_date = '2022-4-4'
    today_date = datetime.datetime.strptime(today_date, '%Y-%m-%d')
    today_num = today_date.weekday()
    days = []
    for delta in reversed(range(0, today_num + 1)):
        day = (today_date - datetime.timedelta(days=delta)).strftime('%Y-%m-%d')
        days.append(day)
    for delta in range(1, 7 - today_num):
        day = (today_date + datetime.timedelta(days=delta)).strftime('%Y-%m-%d')
        days.append(day) 
    for day in days:
        print(day)


    но есть ли более эффективные методы решения данной задачи?
  • Как получить список дат на неделе?

    @Ninzalo Автор вопроса
    Александр, я пришел сюда за самым (по вашему мнению) быстрым и эффективным способом решения задачи, свое громоздкое решение есть
  • Как получить список дат на неделе?

    @Ninzalo Автор вопроса
    Vindicar, Александр, мой пример:
    today_date = '2022-4-1'
    today_num = datetime.datetime.strptime(today_date, '%Y-%m-%d').weekday()
    days = []
    for item in range(0, today_num + 1):
        day = (datetime.datetime.strptime(today_date, '%Y-%m-%d') - datetime.timedelta(days=item)).strftime('%Y-%m-%d')
        days.append(day)
    for item in range(1, 7 - today_num):
        day = (datetime.datetime.strptime(today_date, '%Y-%m-%d') + datetime.timedelta(days=item)).strftime('%Y-%m-%d')
        days.append(day)
    for item in days:
        print(item)


    вывод:
    2022-04-03
    2022-04-02
    2022-04-01
    2022-03-31
    2022-03-30
    2022-03-29
    2022-03-28
  • Как объединить элементы json файла по параметру?

    @Ninzalo Автор вопроса
    TypeError: unsupported operand type(s) for +=: 'dict' and 'dict'

    Понял что не уточнил: под ключем "other" у меня не список, а словарь с другими ключами. Вопрос поправил.
    Пример входных данных:
    [ { "date": "2022-2-7", "other": {"num": 1, "num2": 2} }, { "date": "2022-2-7", "other": {"num": 3, "num2": 4} }, { "date": "2022-2-8", "other": {"num": 5, "num2": 6} } ]
    Пример результата:
    [ { "date": "2022-2-7", "data": [ {"num": 1, "num2": 2}, {"num": 3, "num2": 4} ] }, { "date": "2022-2-8", "data": [ {"num": 5, "num2": 6} ] } ]
  • Какая разница между двумя списками?

    @Ninzalo Автор вопроса
    Это вроде как мне было понятно, но вот мне все равно не ясно почему выходные данные изменились, если я не изменял список