Подскажите, пожалуйста, как исправить код и есть ли разные решения с точки зрения скорости работы для случаев:
- такой вектор только читают
- вектор читают и изменяют.
use axum::extract::State;
use axum::http::StatusCode;
use axum::routing::{get, post};
use axum::{Json, Router};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
#[derive(Debug, Clone)]
struct AppState {
d: Arc<Mutex<Vec<String>>>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let state = AppState {
d: Arc::new(Mutex::new(vec![])),
};
let app = Router::new()
.route("/info", post(create_user))
.route("/list_users", get(list_users))
.with_state(state);
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
.await
.unwrap();
axum::serve(listener, app).await.unwrap();
Ok(())
}
#[derive(Deserialize)]
pub struct FromBrowser {
username: String,
}
#[derive(Serialize)]
pub struct User {
username: String,
}
async fn create_user(
State(state): State<AppState>,
Json(payload): Json<FromBrowser>,
) -> (StatusCode, Json<User>) {
// ...
let user = User {
username: payload.username,
};
{
let mut d = state.d.lock().expect("mutex was poisoned");
d.push(user.username.to_owned());
}
println!("state = {:?}", state);
(StatusCode::CREATED, Json(user))
}
async fn list_users(State(state): State<AppState>) -> (StatusCode, Json<Vec<String>>) {
let users = state.d.lock().expect("mutex was poisoned").clone();
(StatusCode::OK, Json(users))
}
Я знаю, что следует избегать всяких "продвинутых" штук из ряда связных списков, самореферентных структур и т.п.Односвязные списки никаких проблем не доставляют (ну кроме того, что они плохо ложатся на процессорный кэш). Для двусвязных списков и самореферентных структур придётся использовать сырые указатели и unsafe.
Ещё я обнаружил, что создание больших структур, с методами, с кучей полей, обычно приводит к проблемам с borrow checker.Borrow checker абсолютно плевать на размер структур. Это никак не связано.
А если в структуре будет ссылка или иное заимствование, то это гарантированные проблемы.Нет ни каких проблем.
Насколько я понимаю, самым рабочим выглядит чисто функциональный подход, а не структур с методами.Одно другому никак не противоречит.
И правильно ли я понимаю, что следует избегать структур хранящих ссылки и имеющими лайфтайм?Не правильно.
Так, наличие в умеренных размерах программе, которая по сути была одной функцией, лишь одной структуры хранящей ссылку, поставило крест на попытке структуризации программы в более человеческий вид.Что-то делаете не так. Без конкретных примеров кода сказать сложно.
И очень часто в Rust программах, мне приходится идти на более уродливую архитектуру, дабы избежать проблем с (почти ненужным в однопоточном коде) borrow checker.Что-то делаете не так. Скорее всего просто не понимаете borrow checker и пытаетесь писать на новом языке так, как привыкли в каком-то другом.
И в вопросе о borrow checker, разве не является тот факт, что большинство библиотек избегает &mut self в изменяющих что-то методах, звоночком к наличию большим проблем в языке?О каком большинстве речь? Библиотеки используют мутабельные ссылки там где это нужно. Если метод действительно что-то меняет, то будет мутабельная ссылка ну и иногда будет использоваться interior mutability там где это необходимо. В языке нет проблем с мутабельными ссылками.
В общем, посоветуйте что-то что-бы помогало меньше бороться с borrow checker, потому что сейчас я очень много времени трачу именно на это.Для начала понять его. Понять какую проблему он решает. Почитайте, что такое undefined behavior. Почитайте, что такое алиасинг.
fn example<'a>(r: &'a u32) -> &'a u32 { r }
fn f_a() {
let a = 1;
let r = example(&a);
}
fn f_b() {
let b = 1;
let r = example(&b);
}
В этом примере очевидно что переменная a в f_a будет иметь время жизни отличное от b в f_b, но example спокойно работает и с тем и с другим, то есть она является обобщённой по времени жизни, в первом случае она вернёт ссылку с временем жизни как у переменной a, во втором - как у b.fn make_array<const SIZE: usize>(el: u32) -> [u32; SIZE] {
[el; SIZE]
}
let arr = make_array::<3>(1); // [1, 1, 1]
use std::sync::atomic::{AtomicU32, Ordering};
async fn create_user(
Json(payload): Json<FromBrowser>,
) -> (StatusCode, Json<User>) {
static COUNTER: AtomicU32 = AtomicU32::new(0);
// подготавливаю данные на отправку в браузер:
let user = User {
id: payload.id,
username: payload.username,
tm: payload.tm,
cnt: COUNTER.fetch_add(1, Ordering::SeqCst), // считаю отправки;
};
(StatusCode::CREATED, Json(user))
}
#[tokio::main(flavor = "multi_thread", worker_threads = 1024)]
1024 - потеряли весь профит от небольшого числа потоков, теперь ОС будет распределять 1024 потока на небольшое количество ядер CPU.async fn set(&mut self, name: String, value: String) {
self.data.insert(name, value);
}
у этого метода нет ни одной причины быть асинхронным, операции с HashMap - чистый CPU-bound.fn main() {
let start = Instant::now();
let handles: Vec<_> = (0..4)
.map(|table_index| {
std::thread::spawn(move || {
let mut table = Table::new();
for i in (0..3000000).filter(|i| (i % 4 + 1) == table_index) {
table.set(format!("{}", i), format!("value{}", i));
}
table
})
})
.collect();
for handle in handles {
let _table = handle.join().unwrap();
// тут добавляем таблицы в менеджер
}
let elapsed = start.elapsed();
println!("Time taken to set 3,000,000 keys: {:?}", elapsed);
}
и даже это можно заморочиться и улучшить, например запускать потоков не больше std::thread::available_parallelism()
или оптимизировать счетчик для каждой таблицы ((0..3000000).filter(|i| (i % 4 + 1) == table_index)
), но это я оставлю Вам в качестве д/з.fn main() {
let a = "x";
println!("1. {:?}", a); // "x"
let b = format!("{}y", a);
println!("2. {:?}", b); // "xy"
}
fn main() {
let mut s = "x".to_string();
println!("1. {}", s); // "x"
s += "y";
println!("2. {}", s); // "xy"
}
use std::slice;
fn main() {
let range = (0..u8::MAX).collect::<Vec<_>>();
let v = range
.iter()
.map(|x| std::str::from_utf8(slice::from_ref(x)).unwrap_or("Err"))
.collect::<Vec<_>>();
println!("{v:?}")
}
fn main() {
let src: Vec<[u8; 1]> = (0..u8::MAX).map(|i| [i]).collect();
let mut info = Vec::<&str>::with_capacity(u8::MAX.into());
for u in &src {
let t = std::str::from_utf8(&*u).unwrap_or("Err");
info.push(t);
}
println!("\n{:#?}", info);
}
info
будет связан лайфтаймом с src
, чтоб избавится от этого нужно хранить в нём не &str
а String
или Box<str>
fn main() {
let info: Vec<Box<str>> = (0..u8::MAX).map(|i| {
let u = [i];
let t = std::str::from_utf8(&u).unwrap_or("Err");
t.into()
}).collect();
println!("\n{:#?}", info);
}
Vec<&str>
элементы которого ссылаются на исходный вектор байт и в каждом элементе строка из 1 символа:let u01 = vec![59, 13, 10, 32, 47, 42];
let u01_str = std::str::from_utf8(&u01).expect("invalid utf8");
let mut u02 = Vec::with_capacity(u01.len());
let mut i0 = 0;
for (i, _) in u01_str.char_indices().skip(1) {
u02.push(&u01_str[i0..i]);
i0 = i;
}
u02.push(&u01_str[i0..]);
println!("u02 = {:?}", u02);
fn test() {
let mut channels = Arc::new(Mutex::new(Vec::with_capacity(PAR)));
let mut joins = Vec::with_capacity(PAR);
for _ in 0..N / PAR {
for _ in 0..PAR {
let mut channels = Arc::clone(&channels);
joins.push(thread::spawn(move || {
get(channels.lock().unwrap());
}));
}
}
for j in joins {
j.join().unwrap();
}
}
#[inline(always)]
fn get(mut channels: MutexGuard<Vec<mpsc::Sender<i32>>>) -> i32 {
let (tx, rx) = mpsc::channel();
channels.push(tx);
if channels.len() == PAR {
exec(channels);
} else {
drop(channels); // drop гварда отпускает мьютекс
}
rx.recv().unwrap()
}
#[inline(always)]
fn exec(mut channels: MutexGuard<Vec<mpsc::Sender<i32>>>) {
let mut i = 0;
for c in channels.iter() {
i += 1;
c.send(1).unwrap();
}
println!("{}", i);
channels.clear();
// а здесь гвард дропнется сам
}
fn test() {
let (tx, rx) = mpsc::channel::<mpsc::Sender<i32>>();
let mut handles = Vec::with_capacity(N + 1);
handles.push(thread::spawn(move || exec(rx)));
for _ in 0..N {
let tx = tx.clone();
handles.push(thread::spawn(move || {
get(tx);
}))
}
drop(tx);
for handle in handles {
handle.join().unwrap();
}
}
fn get(sender: mpsc::Sender<mpsc::Sender<i32>>) -> i32 {
let (tx, rx) = mpsc::channel();
sender.send(tx).unwrap();
rx.recv().unwrap()
}
fn exec(receiver: mpsc::Receiver<mpsc::Sender<i32>>) {
let mut channels = Vec::with_capacity(PAR);
loop {
for _ in 0..PAR {
let Ok(tx) = receiver.recv() else {
return;
};
channels.push(tx);
}
let mut i = 0;
for c in channels.iter() {
i += 1;
c.send(1).unwrap();
}
println!("{}", i);
channels.clear();
}
}
Но особо это профита не дает, так как основной пожиратель перфоманса - switch context в ОС. Тысячи потоков делают только хуже. Запускать потоков сильно больше чем есть ядер - это вообще плохая идея. Просто ради интереса переписал еще раз на асинхронных каналах tokio:async fn test() {
let (tx, rx) = mpsc::unbounded_channel::<mpsc::UnboundedSender<i32>>();
let mut handles = Vec::with_capacity(N + 1);
handles.push(tokio::spawn(async move { exec(rx).await }));
for _ in 0..N {
let tx = tx.clone();
handles.push(tokio::spawn(async move {
get(tx).await;
}))
}
drop(tx);
for handle in handles {
handle.await.unwrap();
}
}
async fn get(sender: mpsc::UnboundedSender<mpsc::UnboundedSender<i32>>) -> i32 {
let (tx, mut rx) = mpsc::unbounded_channel();
sender.send(tx).unwrap();
rx.recv().await.unwrap()
}
async fn exec(mut receiver: mpsc::UnboundedReceiver<mpsc::UnboundedSender<i32>>) {
let mut channels = Vec::with_capacity(PAR);
loop {
for _ in 0..PAR {
let Some(tx) = receiver.recv().await else {
return;
};
channels.push(tx);
}
let mut i = 0;
for c in channels.iter() {
i += 1;
c.send(1).unwrap();
}
println!("{}", i);
channels.clear();
}
}
и запустил в многопоточном рантайме в дефолтной конфигурации (количество воркеров == количеству ядер), работает быстрее в 19 раз.let cwd = std::env::current_dir().unwrap().parent().unwrap();
меня очень смущает постоянно выделять буффер
use std::mem::MaybeUninit;
let mut buffer = unsafe {
#[allow(invalid_value)]
MaybeUninit::<[_; 1024]>::uninit().assume_init()
};
Но я бы так не делал. Во-первых чистота кода не стоит этих копеек производительности, а во-вторых немного накосячите с чтением и будет UB. use std::sync::Arc;
fn main() {
let data = Arc::new(vec![1, 2, 3]);
let thread_1 = std::thread::spawn({
let data = Arc::clone(&data);
move || {
println!("Thread 1 data: {:?}", data);
}
});
let thread_2 = std::thread::spawn({
let data = Arc::clone(&data);
move || {
println!("Thread 2 data: {:?}", data);
}
});
thread_1.join().unwrap();
thread_2.join().unwrap();
}
Я обернул создание отдельного потока в функцию и так передавал в поток данные. Удобно, что такую функцию могу вынести в отдельный файл-модуль. Но не смог такое сделать динамически в цикле для группы потоков. Хочу подойти к варианту, когда поток, который закончил выполнение своего кода (раньше других), можно опять запустить из main и передать ему новую задачу (новые данные), - структуру данных, которую привёл в основном вопросе.Если правильно понял, то Вам нужен thread pool. Можно использовать из библиотеки rayon: https://docs.rs/rayon/1.7.0/rayon/struct.ThreadPoo...
use std::{
sync::{
mpsc::{self, Sender},
Arc, Mutex,
},
thread::{self, JoinHandle},
};
type Task = Box<dyn FnOnce() + Send + 'static>;
pub struct ThreadPool {
handles: Vec<JoinHandle<()>>,
task_sender: Sender<Task>,
}
impl ThreadPool {
pub fn new() -> Self {
let threads_count = thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(2);
let (task_sender, task_receiver) = mpsc::channel::<Task>();
let task_receiver = Arc::new(Mutex::new(task_receiver));
let mut handles = Vec::with_capacity(threads_count);
for _ in 0..threads_count {
let task_receiver = Arc::clone(&task_receiver);
handles.push(thread::spawn(move || loop {
let task_receiver = task_receiver.lock().unwrap_or_else(|e| e.into_inner());
let Ok(task) = task_receiver.recv() else {
return;
};
drop(task_receiver);
task();
}));
}
Self {
handles,
task_sender,
}
}
pub fn spawn_task<F: FnOnce() + Send + 'static>(&self, f: F) {
self.task_sender.send(Box::new(f)).expect("All threads ended");
}
pub fn join(self) -> thread::Result<()> {
drop(self.task_sender);
for handle in self.handles {
handle.join()?;
}
Ok(())
}
}
#[test]
fn thread_pool() {
let pool = ThreadPool::new();
for i in 0..500 {
pool.spawn_task(move || {
println!("Task {} working on thread {:?}", i, thread::current().id());
});
}
pool.join().unwrap();
}
use chrono::prelude::*;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::{thread, time};
fn main() {
println!("- - - - -");
let mut children = Vec::with_capacity(3);
for id in 0..children.capacity() {
let child = thread::spawn(move || {
let mut date_times = Vec::with_capacity(5);
for i in 0..date_times.capacity() {
let t: DateTime<Local> = Local::now();
date_times.push(t);
println!("{:?}_ поток, задача _{:?}, время: {:?}", id, i, t);
thread::sleep(time::Duration::from_millis(3));
}
(id, date_times)
});
children.push(child);
}
for child in children {
let (id, date_times) = child.join().expect("Дочерний поток паникует");
println!("thd_{} = {:?}", id, date_times);
}
println!("- - - - -");
}
mod m01;
mod m02;
mod m03;
fn main() {
m01::f1();
m02::f2();
let (i, p) = m03::f3();
println!("i = {:?}", i);
println!("p = {:?}", p);
}
pub fn f1() {
let num: u8 = 12;
println!("num = {:?}", num);
}
pub fn f2() {
let s: String = "abc".to_string();
println!("s = {:?}", s);
}
pub fn f3() -> (u8, String) {
let i: u8 = 88;
let p: String = "xyz".to_string();
(i, p)
}
macro_rules! f4 {
() => {
let i: u8 = 88;
let p: String = "xyz".to_string();
}
}
macro_rules! f4 {
($a: ident, $b: ident) => {
let $a: u8 = 88;
let $b: String = "xyz".to_string();
}
}
fn main() {
f4!(i, p);
println!("{} {}", i, p);
}