Начну с того, что код представленный автором в комментах к вопросу имеет deadlock между мьютексом и recv() из канала и завершается лишь по тому, что мы не ждем фоновые потоки. Вариант без deadlock будет выглядеть так:
fn test() {
let mut channels = Arc::new(Mutex::new(Vec::with_capacity(PAR)));
let mut joins = Vec::with_capacity(PAR);
for _ in 0..N / PAR {
for _ in 0..PAR {
let mut channels = Arc::clone(&channels);
joins.push(thread::spawn(move || {
get(channels.lock().unwrap());
}));
}
}
for j in joins {
j.join().unwrap();
}
}
#[inline(always)]
fn get(mut channels: MutexGuard<Vec<mpsc::Sender<i32>>>) -> i32 {
let (tx, rx) = mpsc::channel();
channels.push(tx);
if channels.len() == PAR {
exec(channels);
} else {
drop(channels); // drop гварда отпускает мьютекс
}
rx.recv().unwrap()
}
#[inline(always)]
fn exec(mut channels: MutexGuard<Vec<mpsc::Sender<i32>>>) {
let mut i = 0;
for c in channels.iter() {
i += 1;
c.send(1).unwrap();
}
println!("{}", i);
channels.clear();
// а здесь гвард дропнется сам
}
Вторая проблема в том, что все потоки выполняются по сути по очереди, так как ждут разблокировки мьютекса от других потоков, из-за чего многопоточка тут не дает никаких преимуществ, а лишь создает накладные расходы. Ради эксперимента я попробовал заменить мьютекс на еще один канал:
fn test() {
let (tx, rx) = mpsc::channel::<mpsc::Sender<i32>>();
let mut handles = Vec::with_capacity(N + 1);
handles.push(thread::spawn(move || exec(rx)));
for _ in 0..N {
let tx = tx.clone();
handles.push(thread::spawn(move || {
get(tx);
}))
}
drop(tx);
for handle in handles {
handle.join().unwrap();
}
}
fn get(sender: mpsc::Sender<mpsc::Sender<i32>>) -> i32 {
let (tx, rx) = mpsc::channel();
sender.send(tx).unwrap();
rx.recv().unwrap()
}
fn exec(receiver: mpsc::Receiver<mpsc::Sender<i32>>) {
let mut channels = Vec::with_capacity(PAR);
loop {
for _ in 0..PAR {
let Ok(tx) = receiver.recv() else {
return;
};
channels.push(tx);
}
let mut i = 0;
for c in channels.iter() {
i += 1;
c.send(1).unwrap();
}
println!("{}", i);
channels.clear();
}
}
Но особо это профита не дает, так как основной пожиратель перфоманса - switch context в ОС. Тысячи потоков делают только хуже. Запускать потоков сильно больше чем есть ядер - это вообще плохая идея. Просто ради интереса переписал еще раз на асинхронных каналах tokio:
async fn test() {
let (tx, rx) = mpsc::unbounded_channel::<mpsc::UnboundedSender<i32>>();
let mut handles = Vec::with_capacity(N + 1);
handles.push(tokio::spawn(async move { exec(rx).await }));
for _ in 0..N {
let tx = tx.clone();
handles.push(tokio::spawn(async move {
get(tx).await;
}))
}
drop(tx);
for handle in handles {
handle.await.unwrap();
}
}
async fn get(sender: mpsc::UnboundedSender<mpsc::UnboundedSender<i32>>) -> i32 {
let (tx, mut rx) = mpsc::unbounded_channel();
sender.send(tx).unwrap();
rx.recv().await.unwrap()
}
async fn exec(mut receiver: mpsc::UnboundedReceiver<mpsc::UnboundedSender<i32>>) {
let mut channels = Vec::with_capacity(PAR);
loop {
for _ in 0..PAR {
let Some(tx) = receiver.recv().await else {
return;
};
channels.push(tx);
}
let mut i = 0;
for c in channels.iter() {
i += 1;
c.send(1).unwrap();
}
println!("{}", i);
channels.clear();
}
}
и запустил в многопоточном рантайме в дефолтной конфигурации (количество воркеров == количеству ядер), работает быстрее в 19 раз.
P.S. без I/O операций асинхронщина тоже создает ненужные накладные расходы, я ее здесь использовал только из-за простоты переписывания, лучше взять обычный thread pool с синхронными тасками.