public class RssLoader extends RecursiveTask<String>{
private ConcurrentLinkedQueue<String> channels;
public RssLoader(ConcurrentLinkedQueue<String> channels)
{
this.channels = channels;
}
@Override
protected String compute()
{
System.out.println(Thread.currentThread().getName() + "- compute1");
System.out.println("CHANNELS:"+channels.size());
if(channels.size()<1) return "";
if(channels.size()==1)
{
System.out.println(Thread.currentThread().getName() + "- compute2");
String url = channels.poll(); // ПОЛУЧАЮ ЭЛЕМЕНТ И УДАЛЯЮ ЕГО ИЗ ОЧЕРЕДИ
System.out.println("CHANNELS 2:"+channels.size());
System.out.println(url);
HttpRequest request = HttpRequest.newBuilder().GET().uri(URI.create(url)).build();
HttpClient client = HttpClient.newHttpClient();
String res="";
try
{
System.out.println(Thread.currentThread().getName() + "- compute2_1");
HttpResponse<String> resp = client.send(request, BodyHandlers.ofString());
res = resp.body();
System.out.println("RES length:"+res.length());
}
catch(InterruptedException|IOException ex)
{
System.out.println(Thread.currentThread().getName() + "- compute2_2");
System.out.println(ex);
Thread.currentThread().interrupt();
}
finally
{
System.out.println(Thread.currentThread().getName() + "- compute2_3");
}
return res;
}
else
{
System.out.println(Thread.currentThread().getName() + "- compute3");
int size = channels.size();
int start1=0;
int end1 = (int)Math.ceil(size/2);
int start2 = end1;
List<String> part1 = channels.stream().collect(Collectors.toList()).subList(start1, end1);
List<String> part2 = channels.stream().collect(Collectors.toList()).subList(start2, size);
ConcurrentLinkedQueue<String> p1 = new ConcurrentLinkedQueue<>();
p1.addAll(part1);
ConcurrentLinkedQueue<String> p2 = new ConcurrentLinkedQueue<>();
p1.addAll(part2);
RssLoader loader1 = new RssLoader(p1);
RssLoader loader2 = new RssLoader(p2);
invokeAll(loader1, loader2);
return loader1.join() + loader2.join();
}
}
}
Вызываю данный загрузчик:
ConcurrentLinkedQueue<String> channels = new ConcurrentLinkedQueue<>();
channels.add("https://ria.ru/export/rss2/archive/index.xml");
RssLoader loader = new RssLoader(channels);
var pool = new ForkJoinPool();
pool.execute(loader);
String news=loader.join();
pool.shutdown();
в консоле вижу следующее
ForkJoinPool-1-worker-1- compute1
CHANNELS:1
ForkJoinPool-1-worker-1- compute2
CHANNELS 2:0
https://ria.ru/export/rss2/archive/index.xml
ForkJoinPool-1-worker-1- compute2_1
RES length:56978
ForkJoinPool-1-worker-1- compute2_3
ForkJoinPool-2-worker-1- compute1
CHANNELS:1
ForkJoinPool-2-worker-1- compute2
CHANNELS 2:0
https://ria.ru/export/rss2/archive/index.xml
ForkJoinPool-2-worker-1- compute2_1
RES length:57119
ForkJoinPool-2-worker-1- compute2_3
Т.е. ConcurrentLinkedQueue не синхронизирует данные! Почему? Что интересно, метод возвращает результат, т.е. нет бесконечного цикла. Но если ввести в channels хотя бы 2 элемента, то происходит переполнение буфера.