th_bot = Thread(target=pyrobot(), args=())
th_userbot = Thread(target=aiobot(), args=())
asyncio.new_event_loop()
, потом задаёт его как текущий для своего потока через asyncio.set_event_loop(loop)
. Если ботам не требуется взаимодействовать, то это может быть проще. Если требуется... будут проблемы. Два реактора в одной программе - это не хорошо.app.run()
, и вызов executor.start_polling(dp, skip_updates=True)
скорее всего под капотом создают асинхронную функцию (корутину), и запускают её в реакторе. Тогда ты можешь обойтись без потоков, заставив обоих ботов работать на одном реакторе. Нужно будет зарыться в доки, или даже глянуть исходники.When calling this method (app.run()) without any argument it acts as a convenience method that calls start(), idle() and stop() in sequence. It makes running a single client less verbose.
async def pyrobot(): # обрати внимание, теперь функция асинхронная!
print("pyro started")
@app.on_message(filters.chat("some_chat"))
async def print_pyrogram():
print("Pyrogram")
# это вместо вызова app.run(), как написано в доках.
await app.start()
try:
await app.idle()
finally:
await app.end()
asyncio.run(asyncio.gather(pyrobot(), aiobot()))
def compose_sum(numbers: list[int], total: int) -> list[int] | None:
# ищем индексы потенциальных слагаемых
indices = [i for i in range(len(numbers)) if numbers[i] <= total]
# сортируем по убыванию слагаемых, потом по порядку в списке
indices.sort(key = lambda i: (numbers[i], i), reverse=True)
# если нулевой элемент совпадает - мы нашли точную сумму. Прерываем рекурсию.
if numbers[indices[0]] == total:
return [indices[0]]
for index in indices: # иначе перебираем слагаемые
numcopy = numbers.copy()
# копия списка без рассматриваемого слагаемого
current = numpcopy.pop(index)
next_indices = compose_sum(numcopy, total - current)
if next_indices: # нашли решение, корректируем индексы (так как мы удалили один элемент)
for i in range(len(next_indices)):
if next_indices[i] >= index:
next_indices[i] += 1
return [index] + next_indices # отдаём наше решение "наверх"
# next_indices пуст/None - решения не нашли, пробуем другой index
return None # не нашли решения ни для одного index
линия и пунктир с треугольной стрелкой самые понятные, они просто наследуют классы
линии с ромбиками понятны отчасти, в свойстве класса создают экземпляр другого класса
CREATE TABLE Chats (id INTEGER PRIMARY KEY, name TEXT, topic TEXT);
CREATE TABLE Users (id INTEGER PRIMARY KEY, name TEXT);
CREATE TABLE UserInChat(
chat_id INTEGER,
user_id INTEGER,
FOREIGN KEY (chat_id) REFERENCES Chats(id),
FOREIGN KEY (user_id) REFERENCES Users(id),
PRIMARY KEY (chat_id, user_id)
);
SELECT Users.id as userid, Users.name as username
FROM Users INNER JOIN UserInChat ON Users.id = UserInChat.user_id
WHERE UserInChat.chat_id = 123456;
visit = random.choice(urls)
urls = [] # в urls пусто
# тут ты только определяешь функцию, но не вызываешь её
def createlist(ids):
global urls
for id in ids:
urls.append("https://scrap.tf/raffles/" + id)
# так что тут urls всё ещё пуст
# внутри login() вызывается rufflejoin(). А urls всё ещё пуст.
login()
def sort_list(original: list):
some_list = [ ... ]
sort_list(some_list)
def sort_list(original: list) -> list:
some_list = [ ... ]
some_list = sort_list(some_list)
def sort_list(original: list, inverse: bool = False):
sort_list(some_list)
по прежнему будет работать. Мы расширили старый интерфейс, а не заменили его.if x == 1:
@bot.message_handler(content_types="text")
def send(message):
...
@bot.message_handler(content_types="text")
. Если их несколько, отработате только одна.@bot.message_handler
ты должен получить ID пользователя, отправившего сообщение, взять из описанного выше хранилища номер последнего заданного вопроса (если есть), и уже на основании этого номера судить о том, правильный ли ответ. import threading
class MyWorkerThread(threading.Thread):
def __init__(self, arg1: float, arg2: float): # передаём потоку входные данные
# поток не должен их менять!
super().__init__()
self.arg1 = arg1
self.arg2 = arg2
self.result: t.Optional[float] = None
def run(self):
time.sleep(10) # имитируем длительную работу
self.result = self.arg1 + self.arg2
worker = MyWorkerThread(42, 69)
worker.start()
while True:
if worker.is_alive(): # проверяем, жив ли поток
# делаешь ещё что-то, пока поток работает
print('Still working...')
time.sleep(0.5)
else:
# поток завершился, даём знать пользователю.
print(f'Done! Result is {worker.result}!')
break # выходим из цикла
worker.join()
import threading, queue
class MyWorkerThread(threading.Thread):
def __init__(self, arg1: float, arg2: float): # передаём потоку входные данные
# поток не должен их менять!
super().__init__()
self.arg1 = arg1
self.arg2 = arg2
self.result: t.Optional[float] = None
self.progress = queue.Queue()
def run(self):
for i in range(10):
time.sleep(1) # имитируем длительную работу
self.progress.put(i/10) # сообщаем о прогрессе
self.result = self.arg1 + self.arg2
self.progress.put(1.00)
worker = MyWorkerThread(42, 69)
worker.start()
while True:
if worker.is_alive(): # проверяем, жив ли поток
# делаешь ещё что-то, пока поток работает
try:
progress = worker.progress.get(block=True, timeout=0.5)
except queue.Empty: # поток ничего не сообщил
print('Still working...')
else:
print(f'Still working... {progress:.0%}')
worker.progress.task_done() # один вызов task_done() на один успешный вызов get()!
else:
# поток завершился, даём знать пользователю.
print(f'Done! Result is {worker.result}!')
break # выходим из цикла
_pickle.PicklingError: Can't pickle <function <lambda> at 0x00000122F23CEB00>: attribute lookup <lambda> on __main__ failed