class Mybot(telebot.TeleBot):
def __init__(self,arg, *args, **kwargs):
super().__init__(arg, *args, **kwargs)
def loop_poop(self):
while True:
time.sleep(15)
print(time.ctime())
def polling(self, *args, **kwargs):
thread = threading.Thread(target=self.loop_poop)
thread.start()
super().polling(*args, **kwargs)
Может быть быть сделать 2 файла?Отличная идея, но и многопоточность прикрутить не сложно. Всё зависит от масштаба. Если это мини проект то проще всё в один файл запхнуть. А если крупный проект, то сам бот стоит разложить на несколько файлов. Так же не каждый сервер даст много вычислительных секунд, поэтому к техничесой стороне тоже стоит присмотреться. Что за тариф не переплачивать. Ну и т.д.
part1 = 'tata'
part2 = 'titi'
def cat_twice(part1, part2) :
cat = part1 + part2
return cat
print(cat_twice(part1,part2))
class emp:
name='Harsh'
salary='25000'
def show(self):
print (self.name)
print (self.salary)
e1 = emp()
# Use getattr instead of e1.name
print (getattr(e1,'name'))
# returns true if object has attribute
print (hasattr(e1,'name'))
# sets an attribute
setattr(e1,'height',152)
# returns the value of attribute name height
print (getattr(e1,'height'))
# delete the attribute
delattr(emp,'salary')
# -*- coding: utf-8 -*-
import telebot
bot = telebot.TeleBot('11111111111')
en = ['one','two','three']
ru = ['один','два','три']
@bot.message_handler(commands=[ 'start'])
def send_welcome(message):
msg = bot.send_message(message.chat.id,f'Переведи слово {ru[0]}')
bot.register_next_step_handler(msg, process_name_step, 0 )
def process_name_step(message, count=0):
print(message.text, count, len(en))
if message.text == en[count]:
if count<len(en)-1:
msg = bot.send_message(message.chat.id,f'Молодец,\nПереведи слово {ru[count+1]}')
bot.register_next_step_handler(msg, process_name_step, count+1)
else:
bot.send_message(message.chat.id,'Вопросы закончились')
count=0
else:
if count<len(en)-1:
msg = bot.send_message(message.chat.id,f'Неугадал,\nПереведи слово {ru[count+1]}')
bot.register_next_step_handler(msg, process_name_step, count+1)
else:
bot.send_message(message.chat.id,'Вопросы закончились')
count = 0
bot.enable_save_next_step_handlers(delay=2)
bot.load_next_step_handlers()
bot.polling()
def download_file(url):
local_filename = url.split('/')[-1]
# NOTE the stream=True parameter below
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(local_filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
# If you have chunk encoded response uncomment if
# and set chunk_size parameter to None.
#if chunk:
f.write(chunk)
return local_filename
@bot.message_handler(func=lambda message: message.text == "start")
@bot.message_handler(commands=['start'])
def start_message(message):
bot.send_message(message.chat.id, 'Добро пожаловать в бот Агрегатор новостей! Здесь вы можете подписаться на рассылку '
'интересующих вас новостей, которые бот будет вам отправлять. '
'Для вывода новостных источников напишите команду /news', reply_markup = start_buttons())
Base().add_user(message.from_user.id, message.chat.id)
class Mybot(telebot.TeleBot):
def __init__(self,arg, *args, **kwargs):
super().__init__(arg, *args, **kwargs)
def loop_poop(self):
while True:
parse_all_news(self)
time.sleep(15)
print(time.ctime())
def polling(self, *args, **kwargs):
thread = threading.Thread(target=self.loop_poop)
thread.start()
super().polling(*args, **kwargs)
@bot.callback_query_handler(func=lambda call: call.data in news_sources.values())
def callback_worker(call):
news_source = ''
for k,v in news_sources.items():
if call.data == v:
news_source = k
try:
Base().add_subscribe(call.from_user.id, call.data)
bot.edit_message_text(f'Вы подписались на новости {news_source}', call.message.chat.id, call.message.message_id, reply_markup=subscribe_news_buttons())
except Exception as e:
bot.send_message(call.message.chat.id, f'Подписка на новости {news_source} не удалась')
def subscribe_news_buttons():
keyboard = types.InlineKeyboardMarkup()
for btn_text, callback in news_sources.items():
keyboard.add(types.InlineKeyboardButton(text=btn_text, callback_data=callback))
return keyboard
def get_user_subscribes(user_id):
keyboard = types.InlineKeyboardMarkup()
sc = Base().get_subscribes(user_id)
for chanel,sbs in sc:
if sbs == '1':
news_source = ''
for k,v in news_sources.items():
if chanel == v:
news_source = k
keyboard.add(types.InlineKeyboardButton(text='Отписаться от '+news_source, callback_data=f'del_{chanel}'))
return keyboard
def get_subscribes(self,user_id):
r1,r2=[],[]
sql = f"SELECT * FROM {self.table} WHERE {user_id} = user_id"
try:
res = self.cursor.execute(sql).fetchall()
rows = self.cursor.execute(sql).description
x=0
for row in rows[2:]:
r1.append(row[0])
for _ in res[0][2:]:
r2.append(_)
result = zip(r1,r2)
return list(result)
except Exception as e:
return False, e
while x>0:
res +=1
x//=base
return res