Какой структурой можно повесить lock на диапазон?

Я пишу udp client-server с быстрой асинхронной обработкой поступающих пакетов.
Буфер пакетов реализован в виде ring buffer.
Для синхронизации работы между потоками я использую: Interlocked.CompareExchange, ManualResetEvent и lock().
Но они позволяют заблокировать только один конкретный "объект".
Вопрос: Как повесить блок на диапазон?
Упрощённый пример который сейчас использую:
var buffer = new byte[100];
var startIdx = 5;
var len = 10;
var wholeBufferLock = 0;
while (Interlocked.CompareExchange(ref wholeBufferLock , 1, 0) != 0)
    continue;
Process(startIdx, len)
wholeBufferLock = 0;

Хотелось бы:
var buffer = new byte[100];
var startIdx = 5;
var len = 10;

//Блокируем только диапазон -> 
// если диапазоны не пересекаются, то получаем блок и работаем
// если диапазоны пересекаются, то ждём когда блок спадёт
using(var rangeLock = new RangeLock(buffer, startIdx, len))
{
  Process(startIdx, len)
}
  • Вопрос задан
  • 1929 просмотров
Решения вопроса 2
wataru
@wataru Куратор тега Алгоритмы
Разработчик на С++, экс-олимпиадник.
Если у вас диапазоны такие маленькие, то можно прямо в классе буфера завести еще и массив mutex-ов. И каждый поток будет их лочить слева на-право по всему диапазону (важно, что каждый поток делает это в одном и том же порядке, а то дедлоки получите).

Тут "выделение" куска будет не такой тривиальной операцией, как один лок, но зато, никах спинлоков. Система может потоки усипить и будить только когда очередной мютекс освободится.

Если же у вас ожидаются интервалы побольше 100, то тут возможно лучше будет вот такое решение:
У вас будет одна глобальная структура-индекс, где вы будете хранить статус буфера (о ней позже). Потоки будут к ней обращатся и просить у нее выделить потоку кусок.
Внутри этой структуры, похоже, придется сделать один глобальный мьютекс.
В качестве самой структуры хорошо подойдет дерево отрезков с отложенным добавлением. Пусть оно будет считать сумму на отрезке. При запросе на выделение вы смотрите, равна ли сумма на запрошенном отрезке 0. Если да, то прибавляете на отрезке 1. Если нет, то возвращаете неудачу и поток должен будет опять обратиться к структуре. При освобождении интервала прибавляйте -1.

Тут операция выделения сама по себе будет сильно быстрее. Но это фактически спинлок - каждый поток будет в цикле пытаться выделить себе интервал, пока не сможет. Если ожидается, что потокам придется долго ждать, то надо какие-то стратегии back-off добавлять (спать между вызовами). И вообще спинлоки - это плохо.

Как сделать это без спинлоков с деревом отрезков - я не придумал.
Ответ написан
Комментировать
@Degot Автор вопроса
Я реализовал RangeLocker, который делает требуемое на основе spinLock'ов при создании блока + ManualResetEvent для каскада разблокирований.

Для нахождения родительских блокировок я использую алгоритм художника на основе _painterBuffer.
При создании блокировки я прохожусь по требуемому диапазону и прописываю индекс новой блокировки, собирая родительские. Из родителей я собираю Unlock event'ы и жду их завершения WaitHandle.WaitAll(parentsUnlockEvents);

При Release'е блокировки, я прохожу по _painterBuffer и ставлю -1 там, где индекс равен индексу текущей блокировки.

using System.Runtime.CompilerServices;
namespace test_range_locking;

internal class Program
{
    private readonly Random _random = Random.Shared;
    private const int _bufferSize = 20;
    private byte[] _buffer = new byte[_bufferSize];

    private RangeLocker _locker;

    private readonly int _tasksQty;
    private readonly int _loopsQty;
    public Program(int tasksQty, int loopsQty)
    {
        _tasksQty = tasksQty;
        _loopsQty = loopsQty;
    }

    static void Main(string[] args)
    {
        new Program(3, 3).Run();
        Console.WriteLine("Done");
        Console.ReadLine();
    }

    private void Run()
    {
        _locker = new RangeLocker(_bufferSize, 4);
        var padding = _tasksQty.ToString().Length;

        var tasks = Enumerable.Range(0, _tasksQty)
            .Select(i => Task.Run(() => InternalTask($"{i}".PadLeft(padding, ' '), _loopsQty)))
            .ToArray();
        Task.WaitAll(tasks);
    }

    private void InternalTask(string taskId, int loops)
    {

        for (var idx = loops - 1; idx >= 0; idx--)
        {
            var start = _random.Next(0, _bufferSize);
            var len = _random.Next(1, _bufferSize - start + 1);
            using (var @lock = _locker.Aquire(taskId, start, len))
            {
                Thread.Sleep(2000);
            }
        }
    }
}

internal class RangeLocker
{
    private readonly int _bufferSize;

    private int[] _painterBuffer;
    private ulong[] _parentIndices;

    private RangeLock[] _locks;
    private int _locksBufferSize;
    private int _locksHead;
    private int _locksQuantity;

    private Semaphore _semaphoreFreeLocksQuantity;
    private int _mainSpinLock;
    public RangeLocker(int bufferSize, int maximumLocksQty)
    {
        _bufferSize = bufferSize;
        _locksBufferSize = maximumLocksQty;

        _painterBuffer = new int[_bufferSize];
        _painterBuffer.AsSpan().Fill(-1);

        _parentIndices = new ulong[BitUtils.GetRequiredNumberOfUlongs(_locksBufferSize)];

        _locks = new RangeLock[_locksBufferSize];

        _locksHead = 0;
        _locksQuantity = 0;
        _mainSpinLock = 0;

        _semaphoreFreeLocksQuantity = new Semaphore(_locksBufferSize, _locksBufferSize);
    }

    internal IDisposable Aquire(string taskId, int offset, int length)
    {
        Console.WriteLine($"THR {taskId} -> Try {offset} -> {offset + length - 1}");

        _semaphoreFreeLocksQuantity.WaitOne();

        var notified = false;
        while (Interlocked.CompareExchange(ref _mainSpinLock, 1, 0) != 0)
        {
            if (!notified)
            {
                Console.WriteLine($"THR {taskId} -> Waiting for Main");
                notified = true;
            }
        }

        Console.WriteLine($"THR {taskId} -> In Main");

        var lockPosition = _locksHead;
        ref var range = ref _locks[lockPosition];

        var parentsUnlockEvents = GatherParents(offset, length, lockPosition, out var debugParents);

        range.Lock(offset, length);

        _locksHead = GetNextFreePosition(_locksHead);

        Interlocked.Increment(ref _locksQuantity);

        Console.WriteLine($"THR {taskId} -> Main Released -> Free: {_locksBufferSize - _locksQuantity}");
        _mainSpinLock = 0;

        if (parentsUnlockEvents.Length > 0)
        {
            Console.WriteLine($"THR {taskId} -> Waiting for parents: {debugParents}");
            WaitHandle.WaitAll(parentsUnlockEvents);
            Console.WriteLine($"THR {taskId} -> Parents released");
        }

        Console.WriteLine($"THR {taskId} -> LOCK {lockPosition}: {offset} -> {offset + length - 1}");
        return new RangeUnlocker(this, lockPosition);
    }

    private ManualResetEvent[] GatherParents(int offset, int length, int lockPosition, out string debugParents)
    {
        debugParents = "";

        BitUtils.SetAllFalse(_parentIndices);

        var painterIndex = 0;
        var candidateParent = -1;

        for (var idx = length - 1; idx >= 0; idx--)
        {
            painterIndex = offset + idx;
            candidateParent = _painterBuffer[painterIndex];
            if (candidateParent >= 0)
            {
                BitUtils.SetBitTrue(candidateParent, _parentIndices);
            }

            _painterBuffer[painterIndex] = lockPosition;
        }

        var parentsQty = BitUtils.GetIndexesForPositivesCount(_parentIndices);
        var parentsUnlockEvents = default(List<ManualResetEvent>);
        var parentsStr = "";

        if (parentsQty > 0)
        {
            var usedParentIndices = new List<int>();
            parentsUnlockEvents = new List<ManualResetEvent>();
            var parentsIndices = BitUtils.GetIndexesForPositives(_parentIndices);
            var parentIndex = -1;

            for (var idx = parentsQty - 1; idx >= 0; idx--)
            {
                parentIndex = parentsIndices[idx];
                ref var parent = ref _locks[parentIndex];

                if (parent.Locked)
                {
                    usedParentIndices.Add(parentIndex);
                    parentsUnlockEvents.Add(parent.UnlockEvent);
                }
            }

            parentsQty = parentsUnlockEvents.Count;

            if (parentsQty > 0)
            {
                debugParents = string.Join(", ", usedParentIndices);
            }
        }

        BitUtils.SetAllFalse(_parentIndices);
        return parentsQty > 0 ? parentsUnlockEvents.ToArray() : Array.Empty<ManualResetEvent>();
    }

    [MethodImpl(MethodImplOptions.AggressiveInlining)]
    private int GetNextFreePosition(int position)
    {
        var foundPosition = GetBufferPosition(position + 1);

        while (_locks[foundPosition].Locked)
        {
            foundPosition = GetBufferPosition(foundPosition + 1);
        }

        return foundPosition;
    }

    [MethodImpl(MethodImplOptions.AggressiveInlining)]
    private int GetBufferPosition(int position)
    {
        return position % _locksBufferSize;
    }

    private void Release(int lockPosition)
    {
        var notified = false;
        while (Interlocked.CompareExchange(ref _mainSpinLock, 1, 0) != 0)
        {
            if (!notified)
            {
                Console.WriteLine($"UNLOCK {lockPosition} -> Waiting for Main");
                notified = true;
            }
        }

        Console.WriteLine($"UNLOCK {lockPosition} -> In Main");

        ref var range = ref _locks[lockPosition];

        var offset = range.Offset;
        var painterIndex = 0;
        for (var idx = range.Length - 1; idx >= 0; idx--)
        {
            painterIndex = offset + idx;
            if (_painterBuffer[painterIndex] == lockPosition)
            {
                _painterBuffer[painterIndex] = -1;
            }
        }


        Console.WriteLine($"UNLOCK {lockPosition}: {range.Offset} -> {range.Offset + range.Length - 1}");
        range.Unlock();

        Interlocked.Decrement(ref _locksQuantity);
        Console.WriteLine($"UNLOCK {lockPosition} -> Main Released -> Free: {_locksBufferSize - _locksQuantity}");

        _semaphoreFreeLocksQuantity.Release();

        Console.WriteLine($"UNLOCK {lockPosition} -> Semaphore.Release()");

        _mainSpinLock = 0;
        Console.WriteLine($"UNLOCK {lockPosition} -> Release()");

    }

    private struct RangeLock
    {
        public ManualResetEvent UnlockEvent { get; private set; }
        public bool Locked { get; private set; }

        public int Offset { get; private set; }
        public int Length { get; private set; }

        private bool _initialized = false;

        public RangeLock()
        {
        }

        private void Initialize()
        {
            if (!_initialized)
            {
                UnlockEvent = new ManualResetEvent(true);
                Locked = false;
                _initialized = true;
            }
        }
        public void Lock(int offset, int length)
        {
            Initialize();

            if (Locked)
            {
                throw new Exception("Already locked");
            }

            UnlockEvent.Reset();
            Locked = true;
            Offset = offset;
            Length = length;
        }

        public void Unlock()
        {
            if (!Locked)
            {
                throw new Exception("Already unlocked");
            }

            UnlockEvent.Set();
            Locked = false;
        }

    }

    private class RangeUnlocker : IDisposable
    {
        private bool _disposed;
        private readonly RangeLocker _locker;
        private readonly int _lockPosition;
        public RangeUnlocker(RangeLocker locker, int lockPosition)
        {
            _locker = locker;
            _lockPosition = lockPosition;
        }

        protected virtual void Dispose(bool disposing)
        {
            if (!_disposed)
            {
                if (disposing)
                {
                    _locker.Release(_lockPosition);
                }
                _disposed = true;
            }
        }

        public void Dispose()
        {
            Dispose(disposing: true);
            GC.SuppressFinalize(this);
        }
    }
}
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы