Eugene-Usachev
@Eugene-Usachev

Как сделать много вставок в HashMap за минимальное время?

Немного предыстории. Я решил померить скорость записи в map Rust. Создал импровизированную key-value субд. Мысль очень проста, создаём таблицу, дальше в таблицу кладём данные. Таблица - обычный hashmap. В синхронной версии всё работает, но медленно (во-первых, я вынужден вызывать get у map дважды, во-вторых, писать синхронно в целом долго). Появилась странная мысль: создать n map и при помощи id % n получать название map, куда писать данные. Таким образом можно распараллелить код и, в теории, линейно увеличить скорость записи. Однако я только изучаю Rust и у меня просто огромные проблемы с асинхронным кодом. Я слышал про tokio, который очень популярный и позиционирует себя быстрым (сразу вопрос знатокам, есть ли что-то быстрее?) и решил им воспользоваться. Код вышел таким:

Длинный код

use std::borrow::BorrowMut;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Instant};
use tokio::task;

struct Table {
    data: HashMap<String, String>,
}

impl Table {
    fn new() -> Self {
        Self {
            data: HashMap::new(),
        }
    }

    async fn set(&mut self, name: String, value: String) {
        self.data.insert(name, value);
    }

    fn get(&self, name: String) -> Option<String> {
        return self.data.get(&name).cloned();
    }
}

struct TableManager {
    tables: HashMap<String, Table>,
}

impl TableManager {
    fn new() -> Self {
        return Self {
            tables: HashMap::new(),
        }
    }

    fn create_table(&mut self, name: String) {
        self.tables.insert(name, Table::new());
    }

    async fn set(&mut self, name: String, key: String, value: String) {
        self.tables.get_mut(&name).unwrap().set(key, value).await;
    }

    fn get(&self, name: String, key: String) -> Option<String> {
        return self.tables.get(&name).unwrap().get(key);
    }

    fn delete_table(&mut self, name: String) {
        self.tables.remove(&name);
    }
}

#[tokio::main(flavor = "multi_thread", worker_threads = 1024)]
async fn main() {
    let mut manager = Arc::new(TableManager::new());
    manager.create_table("table1".to_string());
    manager.create_table("table2".to_string());
    manager.create_table("table3".to_string());
    manager.create_table("table4".to_string());

    let start = Instant::now();

    let mut tasks = vec![];

    for i in 0..3000000 {
        let table_name = format!("table{}", i % 4 + 1);
        let mut manager = manager.borrow_mut();
        let task = task::spawn(async move {
            manager.set(table_name, format!("{}", i), format!("value{}", i)).await;
        });
        tasks.push(task);
    }

    for task in tasks {
        task.await.unwrap();
    }

    let elapsed = start.elapsed();

    println!("Time taken to set 3,000,000 keys: {:?}", elapsed);
}



Проблема в

error[E0596]: cannot borrow data in an `Arc` as mutable
--> src\main.rs:79:13
|
79 | manager.set(table_name, format!("{}", i), format!("value{}", i)).await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ cannot borrow as mutable
|
= help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc`

Тут я упёрся в стену. Ошибка мне понятно, но, как её исправить, я не представляю.
Так же пару дополнительный вопросов (можете не тратить время, они мне не так важны):
1 - есть ли что-то быстрее, чем tokio
2 - правильно ли я использую tokio
3 - насколько хорошая с точки зрения производительности идея использовать Arc
4 - можно ли ускорить саму по себе структуру HashMap или только переписывать?
  • Вопрос задан
  • 306 просмотров
Решения вопроса 2
vabka
@vabka
Токсичный шарпист
Отвечая на твои вопросы:
1 - есть ли что-то быстрее, чем tokio
2 - правильно ли я использую tokio
3 - насколько хорошая с точки зрения производительности идея использовать Arc
4 - можно ли ускорить саму по себе структуру HashMap или только переписывать?


1. В твоём случае лучше взять rayon для параллельной обработки, тк tokio предназначен для асинхронного io.
2. см п1. Как именно ты tokio использовал я не смотрел. Дмитрий Беляев хорошо ответил по этому поводу
3. Плохая. Лучше взять другую структуру данных
4а. Можно процентов на 30 ускорить HashMap если заранее сделать with_capacity
4б. И в n раз ускорить если сделать несколько HashMap по одному для каждого из N потоков (и передать во владение каждому потоку, чтобы не тратиться на синхронизацию и подсчёт ссылок).
Для большого количества вставок неизвестного количества данных лучше подойдёт BTreeMap

1. Многопоток тебе тут не поможет (а нет, обманул. Многопоток поможет. А вот async-нет)
2. Ты бенчмаркаешь format!("{}", i)
3. Вообще тебе тут стоит посмотреть на какие-нибудь concurrency-safe lockfree структуры. Например есть достаточно популярный крейт dashmap который такое предлагает.
UPD: я обманул сам себя. dashmap не lockfree. Под капотом это как раз несколько HashMap, спрятанных за RwLock:
pub struct DashMap

pub struct DashMap<K, V, S = RandomState> {
    shift: usize,
    shards: Box<[RwLock<HashMap<K, V, S>>]>,
    hasher: S,
}


Мои бенчмарки с использованием criterion

Результат:
Обрати внимание, что тесты без format на порядок быстрее проходят.
Но я не уверен, что корректно написал бенчмарк для btree_known_key__3M
hashmap_no_capacity_format_key__3M
                        time:   [1.4810 s 1.5362 s 1.5952 s]

hashmap_set_capacity_format_key__3M
                        time:   [1.0688 s 1.0744 s 1.0804 s]

btree_format_key__3M    time:   [754.93 ms 843.10 ms 933.95 ms]


vec_set_apacity__3M     time:   [1.7122 ms 1.7309 ms 1.7655 ms]

dashmap_rayon_format_key__3M
                        time:   [294.76 ms 303.70 ms 316.85 ms]

btree_known_key__3M     time:   [554.56 ms 556.18 ms 558.41 ms]


Код

use std::{
    collections::{BTreeMap, HashMap},
    time::Instant,
};

use criterion::{black_box, criterion_group, criterion_main, Criterion};

fn hashmap_no_capacity_format_key(n: usize) -> HashMap<String, usize> {
    let mut map = HashMap::new();
    for i in 0..n {
        let key = format!("key_{i}");
        map.insert(key, i);
    }
    map
}

fn hashmap_set_capacity_format_key(n: usize) -> HashMap<String, usize> {
    let mut map = HashMap::with_capacity(n + 1);
    for i in 0..n {
        let key = format!("key_{i}");
        map.insert(key, i);
    }
    map
}

fn btreemap_format_key(n: usize) -> BTreeMap<String, usize> {
    let mut map = BTreeMap::new();
    for i in 0..n {
        let key = format!("key_{i}");
        map.insert(key, i);
    }
    map
}
fn vec_set_capacity(n: usize) -> Vec<usize> {
    let mut vector = Vec::with_capacity(n);
    for i in 0..n {
        vector.push(i);
    }
    vector
}

fn btreemap_known_key(keys: impl Iterator<Item = (String, usize)>) -> usize {
    let mut map = BTreeMap::new();
    for (k, v) in keys {
        map.insert(k, v);
    }
    map.len()
}

fn dashmap_rayon_format_key(n: usize) -> dashmap::DashMap<String, usize> {
    use rayon::prelude::*;
    let map = dashmap::DashMap::with_capacity(n);
    (0..n).into_par_iter().for_each(|i| {
        let key = format!("key_{i}");
        map.insert(key, i);
    });
    map
}
fn bench(c: &mut Criterion) {
    c.bench_function("hashmap_no_capacity_format_key__3M", |b| {
        b.iter(|| hashmap_no_capacity_format_key(black_box(3_000_000)))
    });
    c.bench_function("hashmap_set_capacity_format_key__3M", |b| {
        b.iter(|| hashmap_set_capacity_format_key(black_box(3_000_000)))
    });
    c.bench_function("btree_format_key__3M", |b| {
        b.iter(|| btreemap_format_key(black_box(3_000_000)))
    });
    c.bench_function("vec_set_apacity__3M", |b| {
        b.iter(|| vec_set_capacity(black_box(3_000_000)))
    });
    c.bench_function("dashmap_rayon_format_key__3M", |b| {
        b.iter(|| dashmap_rayon_format_key(black_box(3_000_000)))
    });
    c.bench_function("btree_known_key__3M", |b| {
        b.iter_custom(|times| {
            let mut total = vec![];

            for _ in 0..times {
                let mut keys = Vec::with_capacity(3_000_000);
                for i in 0..3_000_000 {
                    keys.push((format!("key_{i}"), i));
                }
                let start = Instant::now();
                black_box(btreemap_known_key(black_box(keys.drain(..))));
                total.push(start.elapsed());
            }
            total.iter().sum()
        });
    });
}
criterion_group! {
    name = benches;
    config = Criterion::default().sample_size(10);
    targets = bench
}
criterion_main!(benches);


Ответ написан
Eugene-Usachev
@Eugene-Usachev Автор вопроса
Должен отметить, что Василий Банников дал прекрасный ответ. Дальше я пошёл в увеличение количества map (срез содержит срез map, который содержат другие map) и добился скорости записи 3 000 000 элементов на моём компьютере за 180 миллисекунд (вместо 4 секунд).

И вот сейчас я наткнулся на замечательную статью от DragonflyDB https://github.com/dragonflydb/dragonfly/blob/main.... Там подход схожий с моим, советую к ознакомлению всем, кто хочет масштабировать map вертикально (за счёт числа ядер).
Ответ написан
Комментировать
Пригласить эксперта
Ответы на вопрос 1
bingo347
@bingo347
Crazy on performance...
Начну с того, что Вы абсолютно не понимаете зачем нужен async. Есть 2 вида нагрузки - CPU-bound и I/O-bound (где I/O - это input/output). Особенность I/O-bound нагрузки в том, что процессор большую часть времени простаивает в ожидании I/O операций (диска, сети, базы данных и т.д.). Async как раз решает эту задачу за счет кооперативной многозадачности, пока одна задача ждет ответ от I/O мы можем нагрузить CPU другой задачей. Естественно все это не бесплатно, но выигрыш тут в отсутствии простоя CPU за счет того, что мы запускаем тысячи задач на небольшом количестве потоков (в некоторых средах, вроде node.js или asyncio в python такой поток вообще 1), например tokio по-умолчанию запускает пул потоков по количеству ядер.
Из этого уже можно выделить проблемы в Вашем коде:
#[tokio::main(flavor = "multi_thread", worker_threads = 1024)]
1024 - потеряли весь профит от небольшого числа потоков, теперь ОС будет распределять 1024 потока на небольшое количество ядер CPU.
async fn set(&mut self, name: String, value: String) {
    self.data.insert(name, value);
}
у этого метода нет ни одной причины быть асинхронным, операции с HashMap - чистый CPU-bound.

Вообще, данную задачу можно распараллелить, и для этого достаточно обычных потоков:
fn main() {
    let start = Instant::now();
    let handles: Vec<_> = (0..4)
        .map(|table_index| {
            std::thread::spawn(move || {
                let mut table = Table::new();
                for i in (0..3000000).filter(|i| (i % 4 + 1) == table_index) {
                    table.set(format!("{}", i), format!("value{}", i));
                }
                table
            })
        })
        .collect();
    for handle in handles {
        let _table = handle.join().unwrap();
        // тут добавляем таблицы в менеджер
    }
    let elapsed = start.elapsed();

    println!("Time taken to set 3,000,000 keys: {:?}", elapsed);
}
и даже это можно заморочиться и улучшить, например запускать потоков не больше std::thread::available_parallelism() или оптимизировать счетчик для каждой таблицы (
(0..3000000).filter(|i| (i % 4 + 1) == table_index)
), но это я оставлю Вам в качестве д/з.

В общем, Ваша проблема не в том, что какие-то структуры/библиотеки медленные, а в том, что Вы их используете не по назначению.
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы