Есть код на python, один его блок лучше выполнять каждый на разных ядрах, для решения этой задачи используется библиотека joblib. Но процессор нагружен всего на 20% и я не знаю как решить эту проблему.
def calc_w(cl , xs: np.ndarray, ys: np.ndarray, ws: np.ndarray, parallel: Optional[Parallel] = None):
if parallel is None:
parallel = Parallel(n_jobs=-1, backend='threading')
zs = np.array(parallel(delayed(cl)(x) for x in xs))
#zs = []
#for x in xs:
# zs.append(1.0 * cl(x)) # cl(x) - применение признака
#zs = np.array(zs)
border, polarity = get_idx(ws, ys, zs) # нахождение границы и полярности
cl_error = 0.0
for x, y, w in zip(xs, ys, ws):
req = req_weak(x, cl, polarity, border) # нахождение ошибки классификации
cl_error += w * np.abs(req - y)
return clf(cl, border, polarity, 0), cl_error
def build_weak_classifier(numf: int, xs: np.ndarray, ys: np.ndarray, features, ws: Optional[np.ndarray] = None): # есть или нет, хз короче (optional)
if ws is None: # инициализация
m = 0 # negative
l = 0 # positive
for x in ys:
if x == 0:
m += 1
else:
l += 1
ws = []
for x in ys:
if x == 0:
ws.append(1.0 / (2.0 * m))
else:
ws.append(1.0 / (2.0 * l))
ws = np.array(ws)
total_start_time = datetime.now()
with Parallel(n_jobs=-1, backend='threading') as parallel:
weak_classifiers = [] # тут ответ
for t in range(0, numf):
print(f'Building weak classifier {0}/{1} ...'.format(t + 1, numf))
start_time = datetime.now()
ws = normalize_w(ws) # нормализация весов
status_counter = STATUS_EVERY
berror = float('inf')
bfeature = Feature(0, 0, 0, 0, 0, 0)
num = 0
for i, cl in enumerate(features): # i, list[i]
status_counter -= 1
f = False
if KEEP_PROBABILITY < 1.0: # пропускаем KEEP_PROBABILITY классификаторов
skip_probability = np.random.random()
if skip_probability > KEEP_PROBABILITY:
continue
req, error_r = calc_w(cl, xs, ys, ws,parallel)
if error_r < berror:
f = True
bfeature = req
berror = error_r
if (f) or (status_counter <= 0):
current_time = datetime.now()
duration = current_time - start_time
total_duration = current_time - total_start_time
status_counter = STATUS_EVERY
if f:
print(
"t={0}/{1} {2}s ({3}s in this stage) {4}/{5} {6}% evaluated. Classification error improved to {7} using {8} ..."
.format(t + 1, numf, round(total_duration.total_seconds(), 2),
round(duration.total_seconds(), 2), i + 1, len(features),
round(100 * i / len(features), 2), round(berror, 5), str(bfeature)))
else:
print(
"t={0}/{1} {2}s ({3}s in this stage) {4}/{5} {6}% evaluated."
.format(t + 1, numf, round(total_duration.total_seconds(), 2),
round(duration.total_seconds(), 2), i + 1, len(features),
round(100 * i / len(features), 2)))
beta = berror / (1 - berror)
alpha = np.log(1.0 / beta)
classifier = clf(bfeature.cl, bfeature.cl.theta, bfeature.cl.polarity, alpha)
weak_classifiers.append(classifier)
del classifier
for i, (x, y) in enumerate(zip(xs, ys)):
h = run_weak_classifier(x, weak_classifiers[t])
e = np.abs(h - y)
ws[i] = ws[i] * np.power(beta, 1 - e)
return weak_classifiers
Буду благодарен за любую помощь.