async def call_process(command: str, timeout: int = 5):
def caller(cmd: str, t_out):
try:
proc = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
stdout, stderr = proc.communicate(timeout=t_out)
exitcode = proc.returncode
except TimeoutExpired:
return f'Command "{cmd}" timed out ({t_out}s)'
else:
return f'Exitcode: {exitcode}' + (
f'STDERR: {stderr.decode()}'
if stderr else f'STDOUT: {stdout.decode()}' if stdout else ''
)
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, caller, command, timeout)
await some_async_func(result)
class SomeState(StatesGroup):
name = State()
@dp.message_handler(commands='timer')
async def register_executor(msg):
await msg.answer('Отправь мне свое имя')
SomeState.name.set()
@dp.message_handler(state=SomeState.name)
async def re_get_about(msg):
state.update_data(name=msg.text)
user_data = await state.get_data()
await msg.answer(f'Твое имя {user_data["name"]}')
SomeState.finish()
data = {
"X-APP-TYPE": "knd_mo",
"X-APP-BUILD-NUMBER": "70440",
"X-PLATFORM": "android",
...
"Content-Transfer-Encoding": "binary",
"Content-Type": "text/plain; charset=utf-8",
"Content-Length": "8"
}
requests.post(url, data = data)
from aiogram import Bot, Dispatcher, executor, types
from .config import BOT_TOKEN
bot = Bot(token=BOT_TOKEN)
dp = Dispatcher(bot)
@dp.message_handler()
async def somehandler(msg: types.Message):
if msg.text == "...":
await msg.answer(f'@{msg.from_user.id}, вы выбрали {msg.text}')
elif msg.text == '...':
await msg.answer(f'@{msg.from_user.id}, вы выбрали {msg.text}')
else:
await msg.answer('Я не знаю такого варианта')
if __name__ == '__main__':
executor.start_polling(dp, skip_updates=True)