Задать вопрос
Luffy1
@Luffy1
Student, Junior .NET programmer, C#, JS, HTML/CSS

Можно ли упростить данный код (см. внутри), заменив ConcurrentBag list'ом и установив lock?

У меня есть задача: есть много текстовых файлов, заполненных рандомным текстом, в котором иногда встречаются имэйлы. Нужно, используя три потока, параллельно считывать данных с этих файлов, при этом после того, как какой-то поток считал имэйлы, он должен записать их на один файл, при этом всех три потока должны записывать считаннные данные на этот один файл, не на разные файлы.
Я ж решил сделать так, чтобы доступ к файлам организовался последовательно и максимально потокобезопасно. Вот в итоге код для считывания имэйлов из файла:
internal class EmailReaderWriter : IEmailReaderWriter
{
    public ConcurrentBag<string> ReadEmails { get; private set; }
    public ConcurrentBag<string> WrittenEmails { get; private set; }
    public string FolderPath { get; }

    private delegate void EmailReaderWriterHandler(string message);
    private event EmailReaderWriterHandler SuccessNotification, ErrorNotification;
    private FileOptions customFileOptions;

    public EmailReaderWriter(string folderpath)
    {
        FolderPath = folderpath;
        ReadEmails = new ConcurrentBag<string>();
        WrittenEmails = new ConcurrentBag<string>();
        SuccessNotification += OperationSuccessNotifier.Success;
        ErrorNotification += OperationSuccessNotifier.Error;
        customFileOptions = FileOptions.Asynchronous | FileOptions.SequentialScan;
    }

    public async Task<ConcurrentBag<string>?> ReadAsync(string filepath)
    {
        if (!File.Exists(filepath))
        {
            ErrorNotification($"File {filepath} doesn't exist!");
            return null;
        }

        using FileStream fstream = new FileStream(filepath, FileMode.Open,
            FileAccess.Read, FileShare.None, 0, customFileOptions);
        using StreamReader reader = new StreamReader(fstream);

        var allEmails = new ConcurrentBag<string>(
            (
            await reader.ReadToEndAsync()).
            Split(new string[] { " ", ". ", ", " }, StringSplitOptions.RemoveEmptyEntries).
            Where(x => x.Contains("@gmail.com"))
            );

        if (allEmails.Count() == 0)
        {
            ErrorNotification($"0 emails were found!");
            return null;
        }

        ConcurrentBag<string> nonReadEmails = new ConcurrentBag<string>();
        int readEmailsAddedCounter = 0;

        foreach (var email in allEmails)
        {
            if (ReadEmails.Contains(email))
            {
                ErrorNotification($"Email {email} is already read!");
                continue;
            }
            else
            {
                readEmailsAddedCounter++;
                nonReadEmails.Add(email);
                ReadEmails.Add(email);
                SuccessNotification($"Email {email} is read!");
            }
        }
        allEmails.Clear();

        if (readEmailsAddedCounter > 0)
            SuccessNotification($"Such quantity of emails was read: {readEmailsAddedCounter}!");

        return nonReadEmails;
    }

    public async Task<bool> WriteAsync(string filepath, string email)
    {
        if (WrittenEmails.Contains(email))
        {
            ErrorNotification($"Email {email} is already written on file!");
            return false;
        }

        using (FileStream fstream = new FileStream(filepath, FileMode.Append,
            FileAccess.Write, FileShare.None, 0, customFileOptions |
            FileOptions.WriteThrough))
        {
            using (StreamWriter writer = new StreamWriter(fstream))
            {
                await writer.WriteLineAsync(email);
                WrittenEmails.Add(email);
            }
        }

        SuccessNotification($"Email {email} is successfully written on file!");
        return true;
    }
}


Меня интересует метод ReadAsync: я хочу его упростить. Мне подсказали тут, что можно все коллекции в принципе заменить на тип List, а в методе просто locker установить. Если я правильно понял, то таким образом:
var allEmails = new ConcurrentBag<string>(
    (
    await reader.ReadToEndAsync()).
    Split(new string[] { " ", ". ", ", " }, StringSplitOptions.RemoveEmptyEntries).
    Where(x => x.Contains("@gmail.com"))
    );

if (allEmails.Count() == 0)
{
    _errorNotification($"0 emails were found!");
    return null;
}

ConcurrentBag<string> nonReadEmails = new ConcurrentBag<string>();
int readEmailsAddedCounter = 0;

foreach (var email in allEmails)
{
    if (ReadEmails.Contains(email))
    {
        _errorNotification($"Email {email} is already read!");
        continue;
    }
    else
    {
        readEmailsAddedCounter++;
        nonReadEmails.Add(email);
        ReadEmails.Add(email);
        _successNotification($"Email {email} is read!");
    }
}
allEmails.Clear();

if (readEmailsAddedCounter > 0)
    _successNotification($"Such quantity of emails was read: {readEmailsAddedCounter}!");

Могу ошибаться щас и не уверен, что это правильно, ибо сам в процессе изучения сейчас темы асинхронности и параллельности. Так, как стоит установить локер и как можно упростить метод ReadAsync (а по возможности и WriteAsync)?

UPD: в двух и больше файлах могут быть несколько одинаковых имэйлов и считывать мне надо именно первую копию имэйла из первого файла, где эта копия встретилась, если этот имэйл имеет несколько копий в разных файлах.
  • Вопрос задан
  • 163 просмотра
Подписаться 2 Средний 3 комментария
Решения вопроса 1
Код не читал, но попробуй так:
У меня есть задача: есть много текстовых файлов, заполненных рандомным текстом, в котором иногда встречаются имэйлы.
после того, как какой-то поток считал имэйлы, он должен записать их на один файл, при этом всех три потока должны записывать считаннные данные на этот один файл, не на разные файлы.


1. Для каждого из файлов запусти по потоку (таске), каждый из которых пусть свой файл читает в поисках имеилов.
2. Для записи в итоговый файл - заведи ещё 1 поток (таску).
3. Коммуникацию между N читающими и 1 пишущим организуй через System.Threading.Channel

Таким образом ты избавишься от ненужных блокировок и затрат на синхронизацию доступа к какому-то списку.

UPD: в двух и больше файлах могут быть несколько одинаковых имэйлов и считывать мне надо именно первую копию имэйла из первого файла, где эта копия встретилась, если этот имэйл имеет несколько копий в разных файлах.

UPD: тогда смотрим на количество данных.
Если имеилов мало (по сравнению с количеством ОЗУ), то тогда можем прямо в памяти держать HashSet и проверять его в пишущем потоке.
Если имеилов побольше - можем сделать HashSet не по самим и имеилам, а по их хешам.
Если имеилов совсем много, то тогда можно записывать в отсортированную структуру данных на диск (двоичное дерево поиска например).

UPD2:
Для каждого из файлов запусти по потоку (таске), каждый из которых пусть свой файл читает в поисках имеилов.

На самом деле можно попробовать запустить несколько потоков, разделив каждый файл ещё на N сегментов и назначив потокам эти сегменты. Плодить новые можно до тех пор, пока у тебя IO не кончится.
Ответ написан
Пригласить эксперта
Ответы на вопрос 1
AleksejMsk
@AleksejMsk
Программирую от души за деньги
В многопоточной среде могу предложить не просто лок, а асинхронный лок.
Это позволит отпустить поток в ожидании на другую работу.

Пример тут:
https://github.com/KlestovAlexej/Wattle3.Examples?...

using var lockObject = locks.GetLock(123);
if (await lockObject.TryEnterAsync(cancellationToken))
{
    ...
}
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы