private static volatile LinkedHashSet<String> sDownloadingNow = new LinkedHashSet<String>();
...
Object getObject(String url) {
Object object = cache.get(url);
if( object == null ) {
while( !sDownloadingNow.add(url) ) {
SystemClock.sleep(100);
}
object = cache.get(url);
if(object == null) {
object = fetchObject(url);
cache.put(url, object);
}
sDownloadingNow.remove(url);
}
return object;
}
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
public class MyCache<K, V> implements Computable<K, V> {
private final ConcurrentHashMap<K, Future<V>> cache = new ConcurrentHashMap<K, Future<V>>();
private final Computable<K, V> c;
public MyCache(Computable<K, V> c) {
this.c = c;
}
@Override
public V compute(final K key) throws InterruptedException {
while (true) {
Future<V> f = cache.get(key);
if (f == null) {
Callable<V> eval = new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(key);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = cache.putIfAbsent(key, ft);
if (f == null) {
f = ft;
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
cache.remove(key, f);
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
}
}
public static interface Computable<K, V> {
public V compute(K k) throws InterruptedException;
}
}
// key: url, value: list of waiting latches for threads
private static final Map<String, List<CountDownLatch>> waiters;
synchronized(waiters) {
// check if record for url exists and either continue work or wait
// ...
}
// do work
synchronized(waiters) {
// check if record for url and call count() for all waiters, then remove record
// ...
}