Где ошибка, почему 0.000000000000000001% проиходит ошибка на 0.0000000000000000001% разницу.
Задача, список, который может читать только 1 поток, другие потоки пишут.
Читающий поток изредка проверяет сколько данных накапало, и выгружает их все cразу же. При этом писатели продолжают добавлять.
Идея на переключение 2 списков. 1 для чтения 1 для записи, как только запрашивается чтение, происходит свап.
Где ошибка, гонка или аба проблемма. не понимаю, ведь у меня всегда перед чтением ставиться флаг в -1, и поток добавление блокируется, пока не произойдет свап.
Далее есть момент, поток добавляющий может добавить последнее значение, и в этот момент произойдет свап, так как у него сохранена локальная ссылка головы, то есть он будет добавлять все еще в старый список, и типа такое может быть(хотя это не влияет как предполагаю), что он не успеет записать данные, до чтения. Но опять же это не влияет, ведь эти данные всю-равно пишуться в конец, и когда читатель до них дойедт они уж точно будут записаны. В любом случае туда можно SpinWait напихать всю-равно не измениться.
public struct ConcurrentList<T>
{
class Node
{
public Node Next;
public T[] Items;
public int Counter;
}
public ConcurrentList(int capacity)
{
Node a = new Node() { Counter = 0, Items = new T[capacity] };
Node b = new Node() { Counter = 0, Items = new T[capacity] };
a.Next = b;
b.Next = a;
_head = a;
_locker = new();
}
object _locker;
private Node _head;
public void Add(T data)
{
while (true)
{
Node head = _head;
int currentCounter = head.Counter;
if (currentCounter < 0)
continue;
if (Interlocked.CompareExchange(ref head.Counter, currentCounter + 1, currentCounter) == currentCounter)
{
int index = currentCounter;
if (index >= head.Items.Length)
{
lock (_locker)
{
Grow(ref head, index);
head.Items[index] = data;
}
return;
}
head.Items[index] = data;
return;
}
}
}
[MethodImpl(MethodImplOptions.NoInlining)]
private void Grow(ref Node head, int index)
{
lock (_locker)
{
if (index < head.Items.Length)
return;
Array.Resize(ref head.Items, int.Max(1, index * 2));
}
}
public int Count
{
get
{
var head = _head;
return head.Counter + head.Next.Counter;
}
}
public bool IsEmpty => _head.Counter == 0;
public ReadOnlySpan<T> Items
{
get
{
{
// 1. Берем текущий head
Node oldHead = _head;
// 2. Помечаем узел как "закрытый для записи" (-1)
int counter;
do
{
counter = oldHead.Counter;
} while (Interlocked.CompareExchange(ref oldHead.Counter, -1, counter) != counter);
// 5. Теперь безопасно меняем голову
// Другие потоки Add увидят Counter = -1 и начнут с начала
Interlocked.Exchange(ref _head, oldHead.Next);
{
if (counter > oldHead.Items.Length)
{
Grow(ref oldHead, counter);
}
// Для value types все элементы валидны
oldHead.Counter = 0;
if (counter <= 0)
return ReadOnlySpan<T>.Empty;
return oldHead.Items.AsSpan(0, counter);
}
}
}
}
}
static async Task Main(string[] args)
{
Task[] writers = new Task[11];
int numWriteRequest = 111011;
var list = new ConcurrentList<int>(writers.Length*numWriteRequest); // даже так, но ошибки не при изменений
long ComplectedCount = 0;
for (int i = 0; i < writers.Length; i++)
{
writers[i] = new Task(() =>
{
for (int i = 0; i < numWriteRequest; i++)
{
list.Add(i);
}
Interlocked.Increment(ref ComplectedCount);
});
}
foreach(var w in writers)
{
w.Start();
}
long sum = 0;
int operations = 0;
while (ComplectedCount != writers.Length || list.Count>0)
{
var items = list. Items ;
foreach(var item in items)
{
sum += item;
}
operations += items.Length;
}
long expected = writers.LongLength * (numWriteRequest * ((long)numWriteRequest - 1)) / 2L;
Console.WriteLine(expected==sum);
Console.WriteLine(operations==writers.Length*numWriteRequest);
Console.WriteLine(expected );
Console.WriteLine( sum);
Console.WriteLine(operations );
return;
}
число чтений записей совпадает, а вот сумма отличается на нанопроцент.