import typing
import asyncio
import functools
def throttled(delay: float, measure: typing.Literal['end_to_start', 'start_to_start'] = 'start_to_start'):
def decorator(actual_func: typing.Coroutine) -> typing.Coroutine:
queue = None
task = None
async def _single_query(future, args, kwargs):
try:
result = await actual_func(*args, **kwargs) # тут делаем асинхронное обращение к сервису
except BaseException as err:
future.set_exception(err) # была ошибка - теперь await future выкинет исключение
else:
future.set_result(result) # полуен результат - await future вернёт его
async def _work_loop():
nonlocal queue
nonlocal task
while True:
try:
# ждем, пока не придёт запрос, или пока не закончится таймаут
future, args, kwargs = await asyncio.wait_for(queue.get(), delay)
except asyncio.TimeoutError: # новые запросы долго не приходят, сворачиваем работу, чтобы не тратить ресурсы
queue = None
task = None
return
single_task = _single_query(future, args, kwargs)
if measure == 'start_to_start':
asyncio.create_task(single_task)
else:
await single_task
queue.task_done() # каждому успешному get() соответствует task_done()
await asyncio.sleep(delay)
@functools.wraps(actual_func)
async def query(*args, **kwargs):
nonlocal queue # обращение к переменной выше уровнем, но не глобальной
nonlocal task
future = asyncio.Future() # Future просигналит, когда наш запрос будет обслужен
if task is None: # либо это первый запрос, либо запросы долго не приходили, и мы свернули работу
queue = asyncio.Queue()
task = asyncio.create_task(_work_loop())
await queue.put((future, args, kwargs))
return await future
return query
return decorator
# delay - минимальный интервал между запросами в секундах
# measure - как мерять интервалы между запросами: начало-начало или конец-начало
@throttled(delay=5.0, measure='start_to_start')
async def my_coroutine(*args, **kwargs) -> ReturnValue:
...
зашитая в кнопку команда объектом энтри игнорируется
To create a line break or new line (<br>), end a line with two or more spaces, and then type return.
' \n'
start_stop(True)
не закончит выполнение, time.sleep() даже не начнёт выполняться, не говоря уже о последующем. А start_stop(True)
не закончит выполнение никогда, потому что ты сделал вечный цикл.await asyncio.sleep(сколько_нужно)
, а за это время бот упал и был перезапущен, бот просто не "вспомнит" о ранее запланированных, но не выполненных рассылках. This requires Intents.guilds to be enabled.
print(1, 2, 3) # 1 2 3
print(1, 2, 3, sep='') # 123
print(1, 2, 3, sep=', ') # 1, 2, 3
print(1, 2, 3) # 1 2 3 с переводом на новую строку
print(1, 2, 3, end='') # 1 2 3 без перевода на новую строку
print(1, 2, 3, end=':') # 1 2 3: без перевода на новую строку
sep.join(str(arg) for arg in args) + end
v в каком файле ошибка v v строка v v функция v
File "/home/maksim/.local/lib/python3.10/site-packages/telebot/__init__.py", line 1074, in __threaded_polling
self.worker_pool.raise_exceptions()
^ оператор, вызвавший ошибку ^
File "/home/maksim/Загрузки/Python/Квест/Квест.py", line 33, in questions
bot.send_message(_id, 'Комфортно ли вам находиться в обществе?', reply_markup=markup)
bot.send_message(_id, 'Комфортно ли вам находиться в обществе?', reply_markup=markup)
chat not found
. Т.е. не найден чат с таким идентификатором, который ту указал. А значит, в переменной _id содержится неправильное значение. Выясняй, как оно туда попало. # -*- coding: utf-8 -*-
import sys
import numpy # pip install numpy
import cv2 # pip install opencv-python
def loadImg(fname : str) -> numpy.ndarray: # грузит файл
data = numpy.fromfile(fname, dtype=numpy.uint8)
img = cv2.imdecode(data, cv2.IMREAD_COLOR)
if img is None:
raise IOError("Not an image file")
return img
class Clicker: # класс для выбора точек на экране
def __init__(self, name: str, image: numpy.ndarray):
self.wnd = name
self.image = image
self.clicks = []
self.markersize = 5
self.markercolor = (255,0,255)
cv2.namedWindow(self.wnd, cv2.WINDOW_AUTOSIZE)
cv2.setMouseCallback(self.wnd, self._click)
def draw(self): # рисует точки на изображении и выводит их на экран
copy = self.image.copy()
color = self.markercolor
radius = self.markersize
for x,y in self.clicks:
cv2.circle(copy, (x,y), radius, color, 1)
cv2.line(copy, (x-radius,y), (x+radius,y), color, 1)
cv2.line(copy, (x,y-radius), (x,y+radius), color, 1)
cv2.imshow(self.wnd, copy)
def _click(self, event, x, y, flags, param):
if event == cv2.EVENT_LBUTTONDOWN: # левый клик - поставить точку
self.clicks.append((x,y))
elif event == cv2.EVENT_RBUTTONDOWN: # правый клик - сбросить последнюю точку
if self.clicks:
del self.clicks[-1]
else:
return
self.draw()
def close(self):
cv2.destroyWindow(self.wnd)
def __enter__(self):
self.draw()
return self
def __exit__(self, exctype, excvalue, traceback):
self.close()
try:
image = loadImg('times-square.jpg') # изображение, внутрь которого вписываем другое
poster = loadImg('lena.png') # изображение, которое вписываем в первое
except IOError:
print('Ошибка загрузки файла.')
sys.exit(1)
# эта часть только для ручного ввода координат
# если они уже есть, то это не нужно.
with Clicker('Select area', image) as clicker:
# четыре точки ставятся строго по часовой, начиная слева-сверху
while len(clicker.clicks) < 4: # пока не получили четыре точки - угла
if cv2.waitKey(100) == 27:
print('Отменено')
sys.exit(0)
pts = numpy.array(clicker.clicks, dtype=numpy.float32) # координаты углов тут
# вписываем изображение
height, width = poster.shape[:2]
srcpoints = numpy.array([ # углы вставляемого изображения в том же порядке по часовой
(0,0),
(width-1, 0),
(width-1, height-1),
(0, height-1),
], dtype=numpy.float32)
# матрица преобразования сопоставляет четыре точки второго изображения с точками первого
# по сути, она позволяет перейти от второго изображения к первому
matrix = cv2.getPerspectiveTransform(srcpoints, pts) # порядок аргументов важен, иначе переход будет наоборот
# применяем матрицу ко второму изображению. Но теперь надо убрать чёрные поля.
warped = cv2.warpPerspective(poster, matrix, (image.shape[1], image.shape[0]))
# делаем маску для переноса пикселей с warped на image
# мы хотим перенести только пиксели, на которые пришлись пиксели второго изображения
mask = numpy.zeros(image.shape, dtype=numpy.uint8) # рисовать можно только на обычном изображении
# закрашиваем пиксели внутри выбранного ранее четырёхугольника
cv2.fillPoly(mask, pts.reshape(1, -1, 2).astype(numpy.int32), (1,1,1))
mask.dtype = bool # а для переноса нам нужна логическая маска
# маска готова, переносим. numpy рулит, правда ведь?
image[mask] = warped[mask]
# показываем результат
cv2.imshow('Result', image)
cv2.waitKey()
aiohttp.client_exceptions.ClientConnectorCertificateError: Cannot connect to host api.vk.com:443 ssl:True [SSLCertVerificationError: (1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate in certificate chain (_ssl.c:1002)')]
for filename in os.listdir("./cogs"):
from pathlib import Path
import sys
# каталог скрипта
SCRIPT_DIR = Path(sys.argv[0]).parent.resolve()
# каталог с когами
COGS_DIR = SCRIPT_DIR / 'cogs'
# список имён когов
COGS = [f.stem for f in COGS_DIR.glob('*.py')]
x = eval(f"mine_{mines_kolv}[{now_state}]")
тебе нужна индексируемая коллекция! Список или кортеж. Тем более что списки ты уже используешь, значит, знаешь, что это такое. Неужели "список из списков" - это такая сложная концепция?AudioSegment.converter = f"{os.getcwd()}\\ffmpeg.exe"
from pathlib import Path
import sys
SCRIPT_DIR = Path(sys.argv[0]).parent.resolve()
FFMPEG = SCRIPT_DIR / 'ffmpeg.exe' # чтобы не париться с разделителем каталогов (/ или \)