custom_objects
при загрузке модели. # built-in modules
import datetime
import functools
import logging
import pathlib
import time
# third-party modules
import cx_Oracle as cxO
# project modules
import utilities_config as cutil
class DBHandler(logging.Handler):
"""
Класс-обработчик логирования в базу данных (оракл)
"""
_query_create = f"""
create table {cutil.LOGFIG_STORAGE_LOCATION['log_table']} (
logger_name varchar2(100 char),
logging_started date,
idx number,
record varchar2(1000 char))"""
_query_insert = f"""
insert into {cutil.LOGFIG_STORAGE_LOCATION['log_table']}
values (:1, :2, :3, :4)"""
def __init__(self, logger_name, logging_started, chunk_size=100):
super().__init__()
self.logger_name = logger_name
self.logging_started = logging_started
self.chunk_size = chunk_size
self.count = 0
self.chunk = list()
self._create()
def emit(self, record):
"""
Обработка каждого лог-сообщения
"""
self.count += 1
self.chunk.append((self.logger_name, self.logging_started, self.count, self.format(record)))
if len(self.chunk) > 0 and len(self.chunk) % self.chunk_size == 0:
self.flush()
def flush(self):
"""
Запись накопившихся лог-сообщений
"""
try:
self._insert()
except cxO.DatabaseError as exc:
logging.getLogger(self.logger_name).error(f"can't flush logs to oracle: {exc}")
self.chunk.clear()
def _create(self):
"""
Подключение к базе и создание таблицы
"""
with cxO.connect(**cutil.LOGFIG_ORACLE_CONNECTION) as connection:
cursor = connection.cursor()
try:
cursor.execute(self._query_create)
except cxO.DatabaseError as exc:
# пропускаем ошибку
# ORA-00955: name is already used by an existing object
# так как в оракле нет синтаксиса 'CREATE TABLE IF NOT EXISTS ...'
if exc.args[0].code != 955:
# все остальные ошибки просто пробрасываем выше
raise exc
def _insert(self):
"""
Подключение к базе и вставка строк
"""
with cxO.connect(**cutil.LOGFIG_ORACLE_CONNECTION) as connection:
cursor = connection.cursor()
cursor.executemany(self._query_insert, self.chunk)
connection.commit()
def get_logger(logger_name, *, log_to_file, log_to_oracle):
"""
Создание логгера с заданным именем
"""
# общий для всех шаблон формата лог-сообщений
fmt = ('[%(asctime)s] [%(levelname)s] %(message)s', '%Y.%m.%d %H:%M:%S')
# создаем логгер
logging_started = datetime.datetime.now()
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
# [1/3] - логирование в консоль - обработчик и формат лог-сообщений
ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter(*fmt))
logger.addHandler(ch)
# [2/3] - логирование в файл - обработчик и формат лог-сообщений
if log_to_file:
# папка с логами
log_dir = pathlib.Path(cutil.LOGFIG_STORAGE_LOCATION['log_dir'])
log_dir.mkdir(exist_ok=True)
# логгер
fh = logging.FileHandler(
pathlib.Path(
log_dir,
f"{logger_name}_log_{logging_started.strftime('%Y.%m.%d_%H.%M.%S')}.log"),
# вот здесь кодировка имеет значение, иначе будет системная кодировка (windows-1251)
encoding='utf-8')
fh.setFormatter(logging.Formatter(*fmt))
logger.addHandler(fh)
# [3/3] - логирование в оракл - обработчик и формат лог-сообщений
# оборачиваем в try/except, так как логирование в оракл опционально
if log_to_oracle:
try:
oh = DBHandler(logger_name, logging_started)
except cxO.DatabaseError as exc:
logger.error(f"can't create logger to oracle: {exc}")
else:
oh.setFormatter(logging.Formatter(*fmt))
logger.addHandler(oh)
return logger
inspect
. Но, если до такого дошло в прикладном коде, значит дела идут не важно и не туда.import inspect
def get_default_args(func):
signature = inspect.signature(func)
return {
k: v.default
for k, v in signature.parameters.items()
if v.default is not inspect.Parameter.empty}
def f1(a=5, *b):
print(a)
print(b)
if __name__ == '__main__':
f1(1, 2, 3)
f1(*get_default_args(f1).values(), 1, 2, 3)
property
- как раз для этого случая они подходят на 100%. Но придется написать много кода.class Person:
has_map = {True: 'Есть', False: 'Нет'}
def __init__(self, name):
self.name = name
self._something_1 = False
self._something_2 = True
@property
def something_1(self):
return self.has_map[self._something_1]
@something_1.setter
def something_1(self, value):
self._something_1 = value
@property
def something_2(self):
return self.has_map[self._something_2]
@something_2.setter
def something_2(self, value):
self._something_2 = value
def attr_mapping(value):
has_map = {True: 'Есть', False: 'Нет'}
if isinstance(value, bool):
return has_map[value]
if __name__ == '__main__':
Anna = Person('Anna')
print(Anna.something_2)
Anna.something_2 = False
print(Anna.something_2)
Anna = Person('Anna')
print(attr_mapping(Anna._something_2))
Anna._something_2 = False
print(attr_mapping(Anna._something_2))
return True/False/1/0
. А потом заводишь нужные переменные, обрабатываешь общий случай с разными ответвлениями и вычислениями, в конце которого уже идет return answer
. GIL который блокирует доступ разных потоков к одному и тому же участку памятиНет, GIL гарантирует, что в каждый момент времени работает только один поток. При этом каждые несколько десятков/сотен тактов процессора работающие потоки сменяют друг друга (если их больше одного).
что собственно является одним из якорей в производетельностиСпорное и корявое утверждение.
зачем нужен класс threading.LockЧтобы дать работать одному потоку, а всем остальным запретить. Потому что иначе переключение между потоками произойдет в общем-то в случайный момент времени, а именно тогда, когда захочет интерпретатор. Он, конечно же, не парится по поводу зашитой в код логики и не будет ждать завершения каких-то конкретных вычислений/чтения/записи.
numpy.where
, которую можно вкладывать в себя, как обычные условные операторы:import numpy as np
import pandas as pd
if __name__ == '__main__':
df = pd.DataFrame({
'movie': [9999999, 2, 3, 1, 9999999],
'rating': [9999999, 2, 9999999, 9999999, 3],
'name': [1, 2, 4, 5, 10]})
df['result'] = np.where(
df['movie'] != 9999999,
df['movie'],
np.where(
df['rating'] != 9999999,
df['rating'],
df['name']))
print(df)
movie rating name result
0 9999999 9999999 1 1
1 2 2 2 2
2 3 9999999 4 3
3 1 9999999 5 1
4 9999999 3 10 3
self.player
равным. Тогда это были бы два различных объекта с абсолютно одинаковым поведением - это кстати один из подходов к созданию синглтонов в питоне.import pandas as pd
class URL_2:
def __init__(self, title=None, description=None):
self.title = title
self.description = description
def __iter__(self):
for attr_name in self.__dict__:
yield getattr(self, attr_name)
if __name__ == '__main__':
site_1 = URL_2(title='Купить телевизор', description='Телевизоры по низкой цене')
site_2 = URL_2(title='Услуги юриста', description='Адвокат спешит к вам')
df = pd.DataFrame([site_1, site_2])
df.columns = site_1.__dict__
print(df)
title description
0 Купить телевизор Телевизоры по низкой цене
1 Услуги юриста Адвокат спешит к вам
uuid[position] < '1'отберет 1/16 от всех значений, а условие
uuid[position] < '11'1/256 часть.
from dateutil import parser
spisok = ['25.05..2001', '25.06.2001', '25.43.2004', '05.02.2005', '27.02.2008']
for elem in spisok:
try:
d = parser.parse(elem, dayfirst=True)
print(f"{elem} -> {repr(d)}")
except parser.ParserError as err:
print(f"{elem} -> {err.__class__.__name__}: {err}")
import dateparser
spisok = ['25.05..2001', '25.06.2001', '25.43.2004', '05.02.2005', '27.02.2008']
for elem in spisok:
d = dateparser.parse(elem, languages=['ru'])
print(f"{elem} -> {repr(d)}")
foo = dict()
min_amount = 1
for type_, amount in (('food', 4), ('food', 3), ('car', 3), ('dog', 1)):
foo[type_] = foo.get(type_, 0) + (amount if amount >= min_amount else 0)
print(*sorted(foo.items(), key=lambda x: x[1]), sep='\n')
import pandas as pd
data = {
'1': ['L', 'M', 'L', 'O', 'M'],
'2': ['M', 'O', 'M', None, 'O'],
'3': ['O', None, 'O', None, None]}
df = pd.DataFrame.from_dict(data)
print('original:\n', df)
column_letter_map = {'1': 'L', '2': 'M', '3': 'O'} # соответствие между названием столбца и буквой
new_data = list()
# перебираем все содержимое словаря
for column, letter in column_letter_map.items():
new_data.append(
(df == letter).any(axis=1) # ищем буквы
.replace({True: letter, False: None}) # если нашли, то меняем на саму эту букву
)
new_data[-1].name = column
new_df = pd.concat(new_data, axis=1)
print('\nmodified:\n', new_df)
import random
import numba as nb
import numpy as np
def bubble1(array):
len_ = len(array)
for i in range(len_):
for j in range(len_ - i - 1):
if array[j] > array[j + 1]:
temp = array[j]
array[j] = array[j + 1]
array[j + 1] = temp
@nb.njit(parallel=True)
def bubble2(array):
len_ = len(array)
for i in nb.prange(len_):
for j in nb.prange(len_ - i - 1):
if array[j] > array[j+1]:
array[j], array[j + 1] = array[j + 1], array[j]
if __name__ == '__main__':
arr1 = random.choices(range(1000), k=1000)
arr2 = np.array(arr1)
bubble1(arr1)
bubble2(arr2)
"""
%timeit bubble1(arr1)
46.1 ms ± 71.5 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit bubble2(arr2)
82.4 µs ± 678 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
"""
Не получается разместить виджеты в фрейм
f1
- это результат вызова метода pack()
, который НЕ возвращает ссылку на фрэйм, а возвращает None
. Это базовый синтаксис питона, переменной присваивается результат вызова функции.Не могу задать высоту и ширину фрейма
.grid(...)
- больше строк кода, но более гибкий и контролируемый результат. Грид - это сетка, у нее есть параметр минимального размера строки/столбца.import *
- это плохо.