import logging
import asyncio
q = asyncio.Queue()
async def a_difficult_task(some_arg):
print(f"task {some_arg} started")
await asyncio.sleep(120)
print(f"task {some_arg} ended")
async def worker():
while True:
try:
some_arg = await q.get()
await a_difficult_task(some_arg=some_arg)
except (KeyboardInterrupt, SystemExit, SystemError):
break
except Exception:
logging.exception("...")
finally:
q.task_done()
async def main():
asyncio.create_task(worker())
print("Adding tasks to queue")
for x in range(1, 7):
await q.put( x )
print("Tasks added to queue")
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
import telebot
from telebot import types
a = 100
bot = telebot.TeleBot('токен ')
def make_buttons(a):
markup = types.InlineKeyboardMarkup()
markup.row_width = 1
markup.add(types.InlineKeyboardButton(text="click", callback_data="cb_minus_"+str(a)))
return markup
@bot.message_handler(commands=['start'])
def start(message):
bot.send_message(
message.chat.id,
text="Привет, {0.first_name}! жми кнопки)))".format(message.from_user),
reply_markup=make_buttons(a)
)
@bot.callback_query_handler(func=lambda call: print(call.data))
@bot.callback_query_handler(func=lambda call: call.data.startswith('cb_'))
def callback_query(call):
print(call.data)
_, op, digit = call.data.split("_")
digit=int(digit)
if op=="minus":
digit-=1
bot.edit_message_reply_markup(
call.message.chat.id,
call.message.id,
make_buttons(digit)
)
bot.answer_callback_query(call.id, str(digit), show_alert=True)
bot.polling(none_stop=True)
a = 4
b = -22
c = 1
a = a if a < 0 else 0
b = b if b < 0 else 0
c = c if c < 0 else 0
print(a + b + c)
# pip install selenium
import os
from pathlib import Path
from platform import system
from urllib.parse import urlparse
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
options = Options()
options.add_argument("--headless")
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option('useAutomationExtension', False)
options.add_argument("--disable-blink-features=AutomationControlled")
exec_path = os.path.join(os.getcwd(), 'driver', 'chromedriver.exe') if system() == "Windows" else \
os.path.join(os.getcwd(), 'driver', 'chromedriver')
driver = webdriver.Chrome(options=options, service=Service(log_path=os.devnull, executable_path=exec_path))
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
'source': '''
delete window.cdc_adoQpoasnfa76pfcZLmcfl_Array;
delete window.cdc_adoQpoasnfa76pfcZLmcfl_Promise;
delete window.cdc_adoQpoasnfa76pfcZLmcfl_Symbol;
'''
})
def screen_sait(url) -> None:
path = str(urlparse(url).hostname).replace(Path(str(urlparse(url).hostname)).suffix, ".png")
driver.get(url)
s = lambda x: driver.execute_script('return document.body.parentNode.scroll' + x)
driver.set_window_size(s('Width'), s('Height'))
driver.find_element(By.TAG_NAME, 'body').screenshot(path)
driver.close()
driver.quit()
screen_sait('https://codeby.net/')
class Person(BaseModel):
age: conint(ge=30)
class Config:
validate_assignment = True
validate_all = True
class FormModel(BaseModel):
first: Optional[bool] = False
second: Optional[bool] = False
third: Optional[bool] = False
something_there: str = "qwexdfg"
class Config:
validate_assignment = True
@validator("first", "second", "third")
def set_bool(cls, val):
return False if val is None else val
@validator("something_there")
def set_str(cls, val):
return "x" in val
>>> import requests
>>> views:int = requests.get("https://api.telegra.ph/getViews/testfhiasehilf-07-21?year=2023&month=7").json()["result"]["views"]
>>> print(views)
7
>>>
>>> import requests
>>> views:int = requests.get("https://api.telegra.ph/getViews/testfhiasehilf-07-21").json()["result"]["views"]
>>> print(views)
7
>>>
@bot.on.chat_message()
async def message_handler(message: Message):
await message.answer("Тест!")
@bot.on.chat_message()
async def message(message: Message):
print("Тестовый PRINT")
if message.text=="тест":
await message_handler(message)