Стоит задача следующего вида:
- Есть главный поток который генерит данные для обработчиков.
- Обработчики - это небольшие программные коды, которые могут отличаться как самим кодом, так и контекстом (при одинаковом коде)
- Обработчиков может быть много (до 500 тыщ штук).
- Количество обработчиков заранее не известно, и они выявляются в процессе генерации данных для них.
- Данные, на один из обработчиков, могут поступать несколько раз разрозненно во времени (к примеру очередь данных на обработчики: A, B, C, A, A, C, B, A, C и т.п)
- Таким образом единовременно может (и должен) работать только один обработчик, параллельная работа обработчиков недопустима, так же как и одновременная работа главного потока и обработчика, так как обработчики могут иметь обратную связь и могут повлиять в дальнейшем на работу, как главного потока, так и обработчиков.
- Скорость работы комплекса из главного потока и обработчиков, вместе с инициализацией обработчиков должна быть высокой.
- Используемая платформа NetStandart2.x (NetCore2.x).
С одной стороны для этого как раз можно задействовать потоки Threads, и это очень удобно, но инициализация большого количества требует времени. Если использовать Task, то инициализация быстрая, но конкуренция все портит.
Вопрос следующий. Как реализовать так, чтобы обработчик в ожидании очередного пакета данных, был в спящем режиме и не участвовал в конкуренции.
И вроде бы удобно использовать один поток, но подсовывать ему разные контексты-обработчики, но как?
Следующий код, теоретически должен выполнять поставленную задачу, но учитывая особенности Task, происходит блокировка и все останавливается:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Diagnostics;
namespace ThreadsTests
{
public class WorkData
{
public int Id;
}
public abstract class ProtoBase<T> : IDisposable
{
public volatile bool IsFinish = false;
ManualResetEventSlim WaitData = new ManualResetEventSlim(false);
ManualResetEventSlim WaitRequest = new ManualResetEventSlim(false);
static int NextId = 0;
T NextData;
public int give_count = 1;
public ProtoBase()
{
ID = ++NextId;
}
public int ID;
public void Finish()
{
IsFinish = true;
WaitData.Dispose();
WaitRequest.Dispose();
}
internal void GiveData(T bext_data)
{
WaitRequest.Wait();
WaitRequest.Reset();
NextData = bext_data;
WaitData.Set();
}
public Task<T> GetNextData()
{
Func<T> DataWatingAction = () =>
{
WaitRequest.Set();
WaitData.Wait();
WaitData.Reset();
T b = NextData;
NextData = default(T);
return b;
};
Task<T> DataWaitngTask = new Task<T>(DataWatingAction);
DataWaitngTask.Start();
return DataWaitngTask;
}
public abstract void Parse();
public virtual void Dispose()
{
}
}
public class TestWorker : ProtoBase<WorkData>
{
public override async void Parse()
{
WorkData data = await GetNextData();
Trace.TraceInformation("{0:N0} taked 1 [id={1:N0}]", ID, data.Id);
data = await GetNextData();
Trace.TraceInformation("{0:N0} taked 2 [id={1:N0}]", ID, data.Id);
data = await GetNextData();
Trace.TraceInformation("{0:N0} taked 3 [id={1:N0}]", ID, data.Id);
Finish();
Trace.TraceInformation("{0:N0} Finish ххх", ID);
}
}
public class Main
{
//workers count
int test_count = 2; // 100, 1000, 10000, 100000
List<TestWorker> Workers = new List<TestWorker>();
public void AsyncWorkersTest()
{
LinkedList<TestWorker> wrks = new LinkedList<TestWorker>();
for (int i = 0; i < test_count; i++)
{
TestWorker tp = new TestWorker();
wrks.AddLast(tp);
tp.Parse();
}
Workers = wrks.ToList();
Random rnd = new Random();
int getDataCount = test_count * 3;
for (int i = 0; i < getDataCount; i++)
{
int ind = rnd.Next(0, Workers.Count);
WorkData wd = new WorkData() { Id = i };
if (Workers[ind].IsFinish) continue;
Trace.TraceInformation("{0:N0} give {1} [id={2:N0}]", ind, Workers[ind].give_count++, wd.Id);
Workers[ind].GiveData(wd);
}
}
}
}
Вот лог работы данного кода.
testhost Information: 0 : 0 give 1 [id=0]
testhost Information: 0 : 0 give 2 [id=1]
testhost Information: 0 : 1 taked 1 [id=0]
testhost Information: 0 : 1 give 1 [id=2]
testhost Information: 0 : 0 give 3 [id=3]
testhost Information: 0 : 1 taked 2 [id=1]
testhost Information: 0 : 0 give 4 [id=4]
testhost Information: 0 : 2 taked 1 [id=2]
testhost Information: 0 : 1 taked 3 [id=3]
testhost Information: 0 : 1 Finish ххх