Задать вопрос

Синхронизация по списку значений?

Есть список URL с различными объектами. Есть несколько потоков, которые асинхронно пытаются выкачать объекты и закешировать. Потоки стартуют все вместе, и велика вероятность, что несколько потоков попытаются закачать один и тот же объект, выполняя лишнюю работу и тратя лишний трафик.


Как правильнее всего сделать взаимную потоко-безопасную проверку на предмет того, идет ли в данный момент закачка данного объекта, и усыпить поток до тех пор, пока другой работающий поток не закончит закачку и не закеширует объект, чтобы в текущем потоке мы могли сразу достать его из кэша и прекратить выполнение без старта закачки?


То есть нужна некая синхронизация, но в качестве мьютекса должен выступать строковый ключ. Пока что у меня работает примерно вот такой код, но он совсем некошерен:

private static volatile LinkedHashSet<String> sDownloadingNow = new LinkedHashSet<String>();

...

Object getObject(String url) {
   Object object = cache.get(url);
   if( object == null ) {
      while( !sDownloadingNow.add(url) ) {
         SystemClock.sleep(100);
      }				
      object = cache.get(url);
      if(object == null) {
         object = fetchObject(url);
         cache.put(url, object);
      }
      sDownloadingNow.remove(url);
   }
   return object;
}
  • Вопрос задан
  • 2766 просмотров
Подписаться 6 Оценить Комментировать
Решения вопроса 1
@Moxa
как-то так…

import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class MyCache<K, V> implements Computable<K, V> {

    private final ConcurrentHashMap<K, Future<V>> cache = new ConcurrentHashMap<K, Future<V>>();
    private final Computable<K, V> c;

    public MyCache(Computable<K, V> c) {
        this.c = c;
    }

    @Override
    public V compute(final K key) throws InterruptedException {
        while (true) {
            Future<V> f = cache.get(key);
            if (f == null) {
                Callable<V> eval = new Callable<V>() {

                    @Override
                    public V call() throws Exception {
                        return c.compute(key);
                    }
                };
                FutureTask<V> ft = new FutureTask<V>(eval);
                f = cache.putIfAbsent(key, ft);
                if (f == null) {
                    f = ft;
                    ft.run();
                }
            }
            try {
                return f.get();
            } catch (CancellationException e) {
                cache.remove(key, f);
            } catch (ExecutionException e) {
                throw new RuntimeException(e.getCause());
            }
        }
    }

    public static interface Computable<K, V> {

        public V compute(K k) throws InterruptedException;
    }
}
Ответ написан
Комментировать
Пригласить эксперта
Ответы на вопрос 1
serso
@serso
// key: url, value: list of waiting latches for threads
private static final Map<String, List<CountDownLatch>> waiters;


По входу нового потока идёт проверка waiters на наличие списка по url:
если записи нет (waiters.get(url) == null), то создаётся новый пустой список и помещается в Map, сам поток начинает делать работу;
если запись есть, то данный поток создаёт объект типа CountDownLatch с параметром 1, добавляет его в список и вызывает метод await().

Тот поток, который получил право на выполнение работы по окончанию проверяет Map на наличие списка ждущих потоков, удаляет его из Map'a и вызывает методы count() для каждого latch'а.

Все операции с waiters — должны быть синхронизованы:
synchronized(waiters) {
    // check if record for url exists and either continue work or wait
    // ...
}

// do work

synchronized(waiters) {
    // check if record for url and call count() for all waiters, then remove record
    // ...
}


Вроде решает поставленную вами задачу.
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы