Как правильно принимать данные в потоках?

Привёл пример кода, в котором не получается в разных потоках использовать данные из main:

код

use std::sync::Arc;

fn main() {

let data = Arc::new(vec![1, 2, 3]);

let thread_1 = std::thread::spawn(move || {
    // let d1 = data.clone();
    let d1 = Arc::clone(&data);
    println!("Thread 1 data: {:?}", d1);
});


let thread_2 = std::thread::spawn(move || {
    // let d2 = data.clone();
    let d2 = Arc::clone(&data);
    println!("Thread 2 data: {:?}", d2);
});


thread_1.join().unwrap();
thread_2.join().unwrap();    
}



1. Как правильно использовать Arc в данном примере?

2. Если данные будут иметь вид:

тип данных

#[derive(Debug)]
pub struct Data {
  pub name: String,
	pub dtime: chrono::DateTime<chrono::Local>, 
	pub id: i32,
  pub n: u32,
}
//
let data = Data { ... };


правильно ли использовать Arc в таком случае,
если важна скорость выполнения кода, и как?

3. Если потоки будут формироваться в цикле, например, так:

потоки в цикле

use std::{thread, time};
use chrono::prelude::*;

fn main() {
   let mut children = Vec::new();
   let p = 3;         // задаю количество потоков; 
   let n = 5;         // задаю имитацию работы кода в потоке 
   //                   (выполнение задач) через цикл в 5 итераций; 
   for id in 1..p+1 { // создаю три потока;
      //
      let child = thread::spawn(move || {
           let mut date_times: Vec<DateTime<Local>> = vec![]; // let mut date_times = Vec::with_capacity(5);        
           for i in 1..n+1 {

               let t: DateTime<Local> = Local::now();
               date_times.push(t);
               // 
               println!("{:?}_ поток, задача _{:?}, время: {:?}", id, i, t);
               if id == 1 {
                  thread::sleep(time::Duration::from_millis(3));
               }
               if id == 2 {
                  thread::sleep(time::Duration::from_millis(5));
               }
               if id == 3 {
                  thread::sleep(time::Duration::from_millis(4));
               }               
           }
      (id, date_times)
      }); 
      children.push(child);
   }
   
   //    
   
   for child in children {
       println!("\nchild = {:?}", child.join().unwrap()); 
   }
   
}


как вызывать выполнение потока и передать ему данные,
например, передать задержку в 3, 4, 5 мс., соответственно,
а в идеале, - передать, приведенную структуру: let data = Data { ... }; .
  • Вопрос задан
  • 200 просмотров
Решения вопроса 1
bingo347
@bingo347
Crazy on performance...
Arc нужно клонировать до move в замыкание, которое запускается на потоке. Если данные используются только на чтение, то этого будет достаточно, если данные будут меняться из нескольких потоков, то следует дополнительно обернуть их в Mutex/RwLock (или из std::sync или из библиотеки parking_lot).
use std::sync::Arc;

fn main() {
    let data = Arc::new(vec![1, 2, 3]);

    let thread_1 = std::thread::spawn({
        let data = Arc::clone(&data);
        move || {
            println!("Thread 1 data: {:?}", data);
        }
    });

    let thread_2 = std::thread::spawn({
        let data = Arc::clone(&data);
        move || {
            println!("Thread 2 data: {:?}", data);
        }
    });

    thread_1.join().unwrap();
    thread_2.join().unwrap();
}

Передавать так можно хоть вектор, хоть свою структуру, главное чтоб у типа был трейт Send и лайфтайм 'static (все владеющие типы имеют такой лайфтайм).
Так как передаем мы по сути Arc, то Send должен быть у него, а он будет для любого содержимого реализующего Sync.

Я обернул создание отдельного потока в функцию и так передавал в поток данные. Удобно, что такую функцию могу вынести в отдельный файл-модуль. Но не смог такое сделать динамически в цикле для группы потоков. Хочу подойти к варианту, когда поток, который закончил выполнение своего кода (раньше других), можно опять запустить из main и передать ему новую задачу (новые данные), - структуру данных, которую привёл в основном вопросе.
Если правильно понял, то Вам нужен thread pool. Можно использовать из библиотеки rayon: https://docs.rs/rayon/1.7.0/rayon/struct.ThreadPoo...
Но если хочется повелосипедить, можно нечто такое сделать:
use std::{
    sync::{
        mpsc::{self, Sender},
        Arc, Mutex,
    },
    thread::{self, JoinHandle},
};

type Task = Box<dyn FnOnce() + Send + 'static>;

pub struct ThreadPool {
    handles: Vec<JoinHandle<()>>,
    task_sender: Sender<Task>,
}

impl ThreadPool {
    pub fn new() -> Self {
        let threads_count = thread::available_parallelism()
            .map(|n| n.get())
            .unwrap_or(2);
        let (task_sender, task_receiver) = mpsc::channel::<Task>();
        let task_receiver = Arc::new(Mutex::new(task_receiver));
        let mut handles = Vec::with_capacity(threads_count);
        for _ in 0..threads_count {
            let task_receiver = Arc::clone(&task_receiver);
            handles.push(thread::spawn(move || loop {
                let task_receiver = task_receiver.lock().unwrap_or_else(|e| e.into_inner());
                let Ok(task) = task_receiver.recv() else {
                    return;
                };
                drop(task_receiver);
                task();
            }));
        }
        Self {
            handles,
            task_sender,
        }
    }

    pub fn spawn_task<F: FnOnce() + Send + 'static>(&self, f: F) {
        self.task_sender.send(Box::new(f)).expect("All threads ended");
    }

    pub fn join(self) -> thread::Result<()> {
        drop(self.task_sender);
        for handle in self.handles {
            handle.join()?;
        }
        Ok(())
    }
}

#[test]
fn thread_pool() {
    let pool = ThreadPool::new();
    for i in 0..500 {
        pool.spawn_task(move || {
            println!("Task {} working on thread {:?}", i, thread::current().id());
        });
    }
    pool.join().unwrap();
}
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы