Задать вопрос
Eugene-Usachev
@Eugene-Usachev

Как передать из функции значения в разные потоки?

Я хочу сделать autopipeling на Rust. Суть примерно такая. Из разных потоков и функций вызываются функции, для упрощения Get(). После вызова 256 функций Get исполняется запрос на сервер (пусть функция Exec), получается ответ и только затем мы можем что-то вернуть из функции Get.

У меня фантазии хватило только для использования каналов. Вот только хотелось бы получить один отправитель, который отправит по одному сообщению в много получателей, и mpsc такого функционала не предоставляет. Я попробовал записывать в вектор отправителей, которых вместе с получателями я создаю в Get, и записать в каждый из них при помощи личного отправителя. Код то работает, вот только очень медленно (для передачи 3 000 сообщений пачками по 256 (размер вектора) потребовалось 185 миллисекунд).

Не могли бы подкинуть пару идей?
  • Вопрос задан
  • 176 просмотров
Подписаться 2 Простой 2 комментария
Решения вопроса 1
bingo347
@bingo347
Crazy on performance...
Начну с того, что код представленный автором в комментах к вопросу имеет deadlock между мьютексом и recv() из канала и завершается лишь по тому, что мы не ждем фоновые потоки. Вариант без deadlock будет выглядеть так:
fn test() {
    let mut channels = Arc::new(Mutex::new(Vec::with_capacity(PAR)));
    let mut joins = Vec::with_capacity(PAR);
    for _ in 0..N / PAR {
        for _ in 0..PAR {
            let mut channels = Arc::clone(&channels);
            joins.push(thread::spawn(move || {
                get(channels.lock().unwrap());
            }));
        }
    }
    for j in joins {
        j.join().unwrap();
    }
}

#[inline(always)]
fn get(mut channels: MutexGuard<Vec<mpsc::Sender<i32>>>) -> i32 {
    let (tx, rx) = mpsc::channel();
    channels.push(tx);
    if channels.len() == PAR {
        exec(channels);
    } else {
        drop(channels); // drop гварда отпускает мьютекс
    }
    rx.recv().unwrap()
}

#[inline(always)]
fn exec(mut channels: MutexGuard<Vec<mpsc::Sender<i32>>>) {
    let mut i = 0;
    for c in channels.iter() {
        i += 1;
        c.send(1).unwrap();
    }
    println!("{}", i);
    channels.clear();
    // а здесь гвард дропнется сам
}

Вторая проблема в том, что все потоки выполняются по сути по очереди, так как ждут разблокировки мьютекса от других потоков, из-за чего многопоточка тут не дает никаких преимуществ, а лишь создает накладные расходы. Ради эксперимента я попробовал заменить мьютекс на еще один канал:
fn test() {
    let (tx, rx) = mpsc::channel::<mpsc::Sender<i32>>();
    let mut handles = Vec::with_capacity(N + 1);
    handles.push(thread::spawn(move || exec(rx)));
    for _ in 0..N {
        let tx = tx.clone();
        handles.push(thread::spawn(move || {
            get(tx);
        }))
    }
    drop(tx);
    for handle in handles {
        handle.join().unwrap();
    }
}

fn get(sender: mpsc::Sender<mpsc::Sender<i32>>) -> i32 {
    let (tx, rx) = mpsc::channel();
    sender.send(tx).unwrap();
    rx.recv().unwrap()
}

fn exec(receiver: mpsc::Receiver<mpsc::Sender<i32>>) {
    let mut channels = Vec::with_capacity(PAR);
    loop {
        for _ in 0..PAR {
            let Ok(tx) = receiver.recv() else {
                return;
            };
            channels.push(tx);
        }
        let mut i = 0;
        for c in channels.iter() {
            i += 1;
            c.send(1).unwrap();
        }
        println!("{}", i);
        channels.clear();
    }
}
Но особо это профита не дает, так как основной пожиратель перфоманса - switch context в ОС. Тысячи потоков делают только хуже. Запускать потоков сильно больше чем есть ядер - это вообще плохая идея. Просто ради интереса переписал еще раз на асинхронных каналах tokio:
async fn test() {
    let (tx, rx) = mpsc::unbounded_channel::<mpsc::UnboundedSender<i32>>();
    let mut handles = Vec::with_capacity(N + 1);
    handles.push(tokio::spawn(async move { exec(rx).await }));
    for _ in 0..N {
        let tx = tx.clone();
        handles.push(tokio::spawn(async move {
            get(tx).await;
        }))
    }
    drop(tx);
    for handle in handles {
        handle.await.unwrap();
    }
}

async fn get(sender: mpsc::UnboundedSender<mpsc::UnboundedSender<i32>>) -> i32 {
    let (tx, mut rx) = mpsc::unbounded_channel();
    sender.send(tx).unwrap();
    rx.recv().await.unwrap()
}

async fn exec(mut receiver: mpsc::UnboundedReceiver<mpsc::UnboundedSender<i32>>) {
    let mut channels = Vec::with_capacity(PAR);
    loop {
        for _ in 0..PAR {
            let Some(tx) = receiver.recv().await else {
                return;
            };
            channels.push(tx);
        }
        let mut i = 0;
        for c in channels.iter() {
            i += 1;
            c.send(1).unwrap();
        }
        println!("{}", i);
        channels.clear();
    }
}
и запустил в многопоточном рантайме в дефолтной конфигурации (количество воркеров == количеству ядер), работает быстрее в 19 раз.

P.S. без I/O операций асинхронщина тоже создает ненужные накладные расходы, я ее здесь использовал только из-за простоты переписывания, лучше взять обычный thread pool с синхронными тасками.
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы