import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
public class MyCache<K, V> implements Computable<K, V> {
private final ConcurrentHashMap<K, Future<V>> cache = new ConcurrentHashMap<K, Future<V>>();
private final Computable<K, V> c;
public MyCache(Computable<K, V> c) {
this.c = c;
}
@Override
public V compute(final K key) throws InterruptedException {
while (true) {
Future<V> f = cache.get(key);
if (f == null) {
Callable<V> eval = new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(key);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = cache.putIfAbsent(key, ft);
if (f == null) {
f = ft;
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
cache.remove(key, f);
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
}
}
public static interface Computable<K, V> {
public V compute(K k) throws InterruptedException;
}
}
// key: url, value: list of waiting latches for threads
private static final Map<String, List<CountDownLatch>> waiters;
synchronized(waiters) {
// check if record for url exists and either continue work or wait
// ...
}
// do work
synchronized(waiters) {
// check if record for url and call count() for all waiters, then remove record
// ...
}