@capiev
Программист C#, WPF, python.

Многопоточность ON DEMAND, или однопоточная многопоточность?

Стоит задача следующего вида:
  • Есть главный поток который генерит данные для обработчиков.
  • Обработчики - это небольшие программные коды, которые могут отличаться как самим кодом, так и контекстом (при одинаковом коде)
  • Обработчиков может быть много (до 500 тыщ штук).
  • Количество обработчиков заранее не известно, и они выявляются в процессе генерации данных для них.
  • Данные, на один из обработчиков, могут поступать несколько раз разрозненно во времени (к примеру очередь данных на обработчики: A, B, C, A, A, C, B, A, C и т.п)
  • Таким образом единовременно может (и должен) работать только один обработчик, параллельная работа обработчиков недопустима, так же как и одновременная работа главного потока и обработчика, так как обработчики могут иметь обратную связь и могут повлиять в дальнейшем на работу, как главного потока, так и обработчиков.
  • Скорость работы комплекса из главного потока и обработчиков, вместе с инициализацией обработчиков должна быть высокой.
  • Используемая платформа NetStandart2.x (NetCore2.x).


С одной стороны для этого как раз можно задействовать потоки Threads, и это очень удобно, но инициализация большого количества требует времени. Если использовать Task, то инициализация быстрая, но конкуренция все портит.

Вопрос следующий. Как реализовать так, чтобы обработчик в ожидании очередного пакета данных, был в спящем режиме и не участвовал в конкуренции.

И вроде бы удобно использовать один поток, но подсовывать ему разные контексты-обработчики, но как?

Следующий код, теоретически должен выполнять поставленную задачу, но учитывая особенности Task, происходит блокировка и все останавливается:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using System.Diagnostics;

namespace ThreadsTests
{
    public class WorkData
    {
        public int Id;
    }

    public abstract class ProtoBase<T> : IDisposable
    {
        public volatile bool IsFinish = false;

        ManualResetEventSlim WaitData = new ManualResetEventSlim(false);
        ManualResetEventSlim WaitRequest = new ManualResetEventSlim(false);
        static int NextId = 0;
        T NextData;

        public int give_count = 1;
        public ProtoBase()
        {
            ID = ++NextId;
        }
        public int ID;

        public void Finish()
        {
            IsFinish = true;
            WaitData.Dispose();
            WaitRequest.Dispose();
        }


        internal void GiveData(T bext_data)
        {
            WaitRequest.Wait();
            WaitRequest.Reset();
            NextData = bext_data;
            WaitData.Set();
            
        }

        public Task<T> GetNextData()
        {
            Func<T> DataWatingAction = () =>
            {
                WaitRequest.Set();
                WaitData.Wait();
                WaitData.Reset();
                T b = NextData;
                NextData = default(T);
                return b;
            };
            Task<T> DataWaitngTask = new Task<T>(DataWatingAction);
            DataWaitngTask.Start();

            return DataWaitngTask;


        }

        public abstract void Parse();

        public virtual void Dispose()
        {

        }
    }

    public class TestWorker : ProtoBase<WorkData>
    {
        public override async void Parse()
        {

            WorkData data = await GetNextData();
            Trace.TraceInformation("{0:N0} taked 1 [id={1:N0}]", ID, data.Id);
            data = await GetNextData();
            Trace.TraceInformation("{0:N0} taked 2 [id={1:N0}]", ID, data.Id);
            data = await GetNextData();
            Trace.TraceInformation("{0:N0} taked 3 [id={1:N0}]", ID, data.Id);
            Finish();
            Trace.TraceInformation("{0:N0} Finish ххх", ID);
        }
    }


    public class Main
    {
        //workers count
        int test_count = 2; // 100, 1000, 10000, 100000
        List<TestWorker> Workers = new List<TestWorker>();
        public void AsyncWorkersTest()
        {
            LinkedList<TestWorker> wrks = new LinkedList<TestWorker>();
            for (int i = 0; i < test_count; i++)
            {
                TestWorker tp = new TestWorker();
                wrks.AddLast(tp);
                tp.Parse();
            }
            Workers = wrks.ToList();
            Random rnd = new Random();

            int getDataCount = test_count * 3;
            for (int i = 0; i < getDataCount; i++)
            {
                int ind = rnd.Next(0, Workers.Count);
                WorkData wd = new WorkData() { Id = i };
                
                if (Workers[ind].IsFinish) continue;
                Trace.TraceInformation("{0:N0} give  {1} [id={2:N0}]", ind, Workers[ind].give_count++, wd.Id);
                Workers[ind].GiveData(wd);
            }
        }

    }
}


Вот лог работы данного кода.
testhost Information: 0 : 0 give  1 [id=0]
testhost Information: 0 : 0 give  2 [id=1]
testhost Information: 0 : 1 taked 1 [id=0]
testhost Information: 0 : 1 give  1 [id=2]
testhost Information: 0 : 0 give  3 [id=3]
testhost Information: 0 : 1 taked 2 [id=1]
testhost Information: 0 : 0 give  4 [id=4]
testhost Information: 0 : 2 taked 1 [id=2]
testhost Information: 0 : 1 taked 3 [id=3]
testhost Information: 0 : 1 Finish ххх
  • Вопрос задан
  • 236 просмотров
Решения вопроса 1
lam0x86
@lam0x86
У меня получился вот такой велосипед:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Diagnostics;

namespace ThreadsTests
{
    public class WorkData
    {
        public int Id;
    }

    public abstract class ProtoBase<T>
    {
        private static int NextId;
        
        private TaskCompletionSource<T> _dataTaskCompletionSource = new TaskCompletionSource<T>();

        public int give_count = 1;

        protected ProtoBase()
        {
            Id = ++NextId;
            new Task(Parse).Start();
        }
        
        public int Id { get; }

        public bool IsFinish { get; private set; }

        protected void Finish()
        {
            IsFinish = true;
        }

        public async Task PushData(T data)
        {
            _dataTaskCompletionSource.SetResult(data);
            await Task.Yield();
        }

        protected async Task<T> GetNextData()
        {
            var taskResult = await _dataTaskCompletionSource.Task;
            _dataTaskCompletionSource = new TaskCompletionSource<T>();
            return taskResult;
        }

        protected abstract void Parse();
    }

    public class TestWorker : ProtoBase<WorkData>
    {
        protected override async void Parse()
        {

            WorkData data = await GetNextData();
            Trace.TraceInformation("{0:N0} take 1 [id={1:N0}]", Id, data.Id);
            data = await GetNextData();
            Trace.TraceInformation("{0:N0} take 2 [id={1:N0}]", Id, data.Id);
            data = await GetNextData();
            Trace.TraceInformation("{0:N0} take 3 [id={1:N0}]", Id, data.Id);
            Finish();
            Trace.TraceInformation("{0:N0} Finish ххх", Id);
        }
    }


    public class Program
    {
        public static async Task Main(string[] args)
        {
            var schedulerPair = new ConcurrentExclusiveSchedulerPair();
            await await Task.Factory.StartNew(
                AsyncWorkersTest,
                CancellationToken.None,
                TaskCreationOptions.None,
                schedulerPair.ExclusiveScheduler);
            Console.WriteLine("FINISHED");
            Console.ReadKey();
        }
        
        public static async Task AsyncWorkersTest()
        {
            //workers count
            const int testCount = 1000; // 100, 1000, 10000, 100000
            var Workers = new List<TestWorker>();

            for (int i = 0; i < testCount; i++)
            {
                Workers.Add(new TestWorker());
            }

            Random rnd = new Random();

            int getDataCount = testCount * 3;
            for (int i = 0; i < getDataCount; i++)
            {
                int ind = rnd.Next(0, Workers.Count);
                WorkData wd = new WorkData() { Id = i };
                
                if (Workers[ind].IsFinish) continue;
                Trace.TraceInformation("{0:N0} push {1} [id={2:N0}]", Workers[ind].Id, Workers[ind].give_count++, wd.Id);
                await Workers[ind].PushData(wd);
            }
        }
    }
}


Вывод:
ConsoleApp1 Information: 0 : 1 push 1 [id=0]
ConsoleApp1 Information: 0 : 1 take 1 [id=0]
ConsoleApp1 Information: 0 : 2 push 1 [id=1]
ConsoleApp1 Information: 0 : 2 take 1 [id=1]
ConsoleApp1 Information: 0 : 1 push 2 [id=2]
ConsoleApp1 Information: 0 : 1 take 2 [id=2]
ConsoleApp1 Information: 0 : 1 push 3 [id=3]
ConsoleApp1 Information: 0 : 1 take 3 [id=3]
ConsoleApp1 Information: 0 : 1 Finish ххх
ConsoleApp1 Information: 0 : 2 push 2 [id=4]
ConsoleApp1 Information: 0 : 2 take 2 [id=4]
ConsoleApp1 Information: 0 : 2 push 3 [id=5]
ConsoleApp1 Information: 0 : 2 take 3 [id=5]
ConsoleApp1 Information: 0 : 2 Finish ххх
Ответ написан
Пригласить эксперта
Ответы на вопрос 2
AlexanderYudakov
@AlexanderYudakov
C#, 1С, Android, TypeScript
Чтобы разгрести всю эту кашу в голове, предлагаю применить RTFM-технологию:

Thread Pooling in C#:
https://docs.microsoft.com/en-us/dotnet/csharp/pro...

Thread Synchronization in C#:
https://docs.microsoft.com/en-us/dotnet/csharp/pro...

Asynchronous Programming (C#):
https://docs.microsoft.com/en-us/dotnet/csharp/async
Ответ написан
arxont
@arxont
C# программист
"единовременно может (и должен) работать только один обработчик" - что-то мне подсказывает, что оператор lock спасёт вас

Посмотрите для примера на вот этот ответ на stackoverflow - https://stackoverflow.com/questions/37242257/why-i...
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы