"""
Принимаем видео с вебкамеры, и пытаемся сегментировать его с помощью YOLOv8.
Выводим в окне маски объектов.
Потребуется установить пакеты следующей командой:
pip install ultralytics
"""
from sys import argv
from pathlib import Path
import numpy
import cv2
import torch
COLORS = [
(64, 128, 128),
(128, 64, 64),
(64, 128, 64),
(64, 64, 128),
(128, 64, 128),
]
VIDEO_SOURCE = 0 # 0 - вебкамера. Строка - имя файла или URL видео потока.
script_dir = Path(argv[0]).parent.resolve()
# определяем, на каком устройстве будет выполняться модель
device = (
'cuda' if torch.cuda.is_available() else
'mps' if torch.backends.mps.is_available() else
'cpu'
)
import ultralytics
def text_at( # рисует текст по центру
img: numpy.ndarray, # на чём
text: str, # что
pos: tuple[int, int], # где центр
font=cv2.FONT_HERSHEY_COMPLEX, # каким шрифтом
font_scale: float = 1.0, # в каком масштабе (не кегль!)
font_thickness: int = 2, # насколько толстые линии
text_color=(255, 0, 0), # каким цветом
bg_color=None # на каком фоне
):
x, y = pos
(text_w, text_h), _ = cv2.getTextSize(text, font, font_scale, font_thickness)
if bg_color is not None:
cv2.rectangle(img,
(x - text_w // 2, y - text_h // 2),
(x + text_w // 2, y + text_h // 2),
bg_color, -1)
cv2.putText(img,
text,
(x - text_w // 2, y + text_h // 2 - 1),
font, font_scale, text_color, font_thickness)
model = ultralytics.YOLO(
script_dir / 'yolov8m-seg.pt' # имя файла модели указывает на её тип
)
# цикл работы с видео
video = cv2.VideoCapture(VIDEO_SOURCE)
if not video.isOpened():
raise SystemExit('Не удалось открыть источник видео')
overlay_image: numpy.ndarray = numpy.zeros( # изображение-оверлей для отображения разметки кадра
(
int(video.get(cv2.CAP_PROP_FRAME_HEIGHT)),
int(video.get(cv2.CAP_PROP_FRAME_WIDTH)),
3
),
numpy.uint8
)
try:
while True:
success, frame = video.read() # читаем кадр
if not success:
raise SystemExit('Видео закончилось')
# для YOLO предобработка не требуется
outputs = model.predict( # получаем отклик сети
source=frame,
conf=0.15, # минимальная степень уверенности в объекте
device=device, # устройство
verbose=False # чтобы не спамило логом в консоль
)
output = outputs[0] # модель всегда возвращает список результатов, даже для одного изображения
if output.masks is not None: # если были получены маски
masks = output.masks.cpu().xy # список масок как координат вершин многоугольников (контуров)
boxes = output.boxes.cpu() # ограничивающие прямоугольники
image_output = [] # список аннотаций, которые нужно будет вывести
for box, mask in zip(boxes, masks): # формируем список аннотаций
class_id = int(box.cls.item()) # номер класса, соответствующего очередной области
class_name = model.names[class_id] # имя класса, соответствующее области
pts = numpy.array(mask).astype(numpy.int32) # массив координат точек контура Nx2
image_output.append((class_id, class_name, pts))
overlay_image.fill(0) # очищаем изображение-оверлей
for (class_id, class_name, pts) in image_output: # закрашиваем каждую маску цветом
cv2.fillPoly(
img=overlay_image,
pts=pts[numpy.newaxis, ...], # для работы fillPoly() массив должен быть вида 1xNx2
color=COLORS[class_id % len(COLORS)], # цвет массива определяем по номеру класса для стабильности
)
for (class_id, class_name, pts) in image_output: # выводим надписи отдельным циклом, чтобы их не закрасило
M = cv2.moments(pts) # вычисляем моменты контура
# используем их для расчёта координат центроида контура
cX = int(M["m10"] / M["m00"])
cY = int(M["m01"] / M["m00"])
# выводим имя класса
text_at(overlay_image, class_name,
(cX, cY),
text_color=(255, 255, 255),
bg_color=(1, 1, 1))
# накладываем оверлей с разметкой на кадр
alpha = 0.8 # вес оверлея, чем меньше - тем он прозрачнее.
apply = overlay_image > 0 # изменяем только те части кадра, где есть оверлей
frame[apply] = alpha * overlay_image[apply] + (1 - alpha) * frame[apply] # обожаю numpy
cv2.imshow('Press Esc to exit', frame) # показываем результат
if cv2.waitKey(10) == 27: # если пользователь нажал Esc, выходим
break
finally:
video.release() # не забываем отключиться от источника видео в итоге
# ------------------- main.py -------------------
async def main():
import urllib3 # как бы тяжелый модуль, нужный только главному скрипту
import importlib
... # получаем исходные данные
# динамически подтаскиваем нужный модуль
mod = importlib.import_module('modules.submodule')
m = mod.Module(x=-1) # используем класс из него
# этот метод под капотом использует мультипроцессинг
results = await m.run_tasks(some_data)
... # что-то делаем с результатами
if __name__ == '__main__':
import asyncio
asyncio.run(main())
# ------------------- modules/submodule.py -------------------
import asyncio
import concurrent.futures
import time
import sys
import os
class Module:
def __init__(self, x):
self.x = x
def __repr__(self) -> str:
return f'{self.__class__.__name__}(x={self.x!r}) {hex(id(self)).upper()} @ {os.getpid()}'
async def run_tasks(self, data):
self.x = len(items)
print(f'Using instance {self!r}')
loop = asyncio.get_running_loop()
pool = concurrent.futures.ProcessPoolExecutor(4) # где будем исполнять таск
# готовим таски к исполнению
futures = [loop.run_in_executor(pool, self.worker_func, item) for item in data]
# ждём их завершения
return await asyncio.gather(*futures, return_exceptions=True)
def worker_func(self, item):
# инстансы разные, разумеется, но их состояние, похоже, клонируется в новые процессы...
print(f'Using instance {self!r}. urllib3', 'is' if 'urllib3' in sys.modules else 'is not', 'present', flush=True)
time.sleep(0.1) # имитируем напряжённую работу
return repr(item)
async def show_cpanel(
state: FSMContext,
message_source: typing.Union[Message, CallbackQuery] # значение одного из двух типов, но обязательное!
) -> None:
'''
Send control panel to current user
'''
this_user = message_source.from_user.id # и Message, и CallbackQuery имеют from_user
message = message if isinstance(message_source, Message) else message_source.message
profile_is_visible = await req.check_profile_visible(this_user)
message_id = (await message.answer(
'<b>' + _('Панель управления') + '</b>',
parse_mode='HTML',
reply_markup=kb.control_panel(profile_visible=profile_is_visible)
)).message_id
await state.update_data(cpanel_message_id=message_id)
logger.info(f'Admin #{this_user} opened control panel')
callback_query: Optional[CallbackQuery] = None,
message: Optional[Message] = None,
import tracemalloc
tracemalloc.start()
for i in range(4):
snapshotA = tracemalloc.take_snapshot()
s = f'Строка №{i+1}'
snapshotB = tracemalloc.take_snapshot()
s = s.replace('Строка', 'Другая строка')
snapshotC = tracemalloc.take_snapshot()
print('>', s)
statab = snapshotB.compare_to(snapshotA, 'lineno')
print("[A->B]")
for stat in statab[:10]:
if stat.traceback[0].filename == __file__:
print(stat)
statbc = snapshotC.compare_to(snapshotB, 'lineno')
print("[B->C]")
for stat in statbc[:10]:
if stat.traceback[0].filename == __file__:
print(stat)
где я его должен ловить учитывая что саму urllib3 я через import не подключаю
from urllib3.exceptions import LocationParseError
class MyMeta(type):
def __new__(metacls, name, bases, namespace, **kwargs):
print(f"__new__(): {name}({kwargs})")
return super().__new__(metacls, name, bases, namespace)
class MyBase(metaclass=MyMeta): # __new__(): MyBase({})
pass
class MyClass(MyBase, foo='bar'): # __new__(): MyClass({'foo': 'bar'})
pass
// ==UserScript==
// @name Force Learn.MS to English
// @version 1
// @grant none
// @run-at document-start
// @include https://learn.microsoft.com/ru-ru/*
// ==/UserScript==
(function() {
'use strict';
const new_url = window.location.toString().replace('://learn.microsoft.com/ru-ru/', '://learn.microsoft.com/en-us/');
window.location.replace(new_url);
})();
from collections import Counter
main_word = 'АКСИЛИРОВАНИЕ' + '\n' # основное слово + перенос строки, чтобы не вызывать str.rstrip()
main_set = frozenset(main_word) # множество букв слова без учёта повторов
main_len = len(main_word)
min_length = 8
with open('D:\\Program Files\\Text\\слова\\1.txt', 'r') as f:
candidates = [ # слова-кандидаты, состоящие из тех же букв и подходящие по длине
(word, Counter(word)) # само слово и его состав по буквам
for word in f # для всех слов в файле
# проверяем длину слова и соответствие набора букв без учёта их количества
if min_length<=len(word)<=main_len and main_set.issuperset(word)
]
main_counter = Counter(main_word) # подсчёт числа букв в основном слове
results = [ # итоговый результат
word # те слова
for word, counter in candidates # из числа слов-кандидатов
if all(counter[key] <= main_counter[key] for key in counter) # у которых нет превышения ни по одной букве
]
import discord
from discord.ext import commands
intents = discord.Intents.default()
intents.members = True
intents.message_content = True
bot = commands.Bot(command_prefix='!', intents=intents)
# ...
bot.run('token')
import discord
from discord.ext import commands
async def main():
# асинхронная функция может быть выполнена ТОЛЬКО внутри рабочего цикла
# значит, рабочий цикл уже точно существует и выполняется
intents = discord.Intents.default()
intents.members = True
intents.message_content = True
# конструктор сам по себе не асинхронный, но он выполняется в асинхронном контексте
bot = commands.Bot(command_prefix='!', intents=intents)
# ...
# мы уже в асинхронной функции, поэтому используем await start() вместо run()
await bot.start('token') # main() не завершит работу, пока бот не завершит работу
if __name__ == '__main__':
asyncio.run(main()) # создаём рабочий цикл. он будет работать, пока main() не завершит работу