Программа-шифратор (но актуально для любого обработчика файлов). До цикла выполняется: начало считывания новой порции; создание независимых данных (для новой порции); ожидание окончания считывания (новой порции). В цикле выполняется: начало считывания новой порции; обработка текущей порции (с использованием независимых данных для текущей порции); затем начало записи текущей обработанной порции; создание независимых данных (для новой порции); ожидание окончания считывания (новой порции) и окончания записи (текущей порции). После цикла: обработка текущей порции (с использованием независимых данных для текущей порции); затем начало записи текущей обработанной порции; ожидание окончания записи (текущей порции).
Организую асинхронный доступ к файлу с помощью WinAPI таким классом:
#include <exception>
#include <windows.h>
#include <cstdint>
class AsyncBinFile
{
public:
class Err : public std::exception {};
class ErrOpen : public Err {
public: const char* what() const noexcept override {
return "cannot open file";
} };
class ErrWriteRead : public Err {
public: const char* what() const noexcept override {
return "file write / read error";
} };
AsyncBinFile() = delete;
enum mode_t {WriteNew, Write, Read};
enum buf_mode_t {NoBuffering = true, Buffering = false};
AsyncBinFile (const char* name, mode_t mode, buf_mode_t buf_mode = Buffering);
void close();
~AsyncBinFile() {close();}
AsyncBinFile(const AsyncBinFile&) = delete;
AsyncBinFile& operator=(const AsyncBinFile&) = delete;
HANDLE file_handler() const noexcept {return fh;}
void begin_write (const uint8_t* a, uint32_t n);
void begin_read (uint8_t* a, uint32_t n);
void wait();
void seek (uint64_t offset)
{ovl.OffsetHigh = DWORD(offset>>(sizeof(DWORD)*8)); ovl.Offset = DWORD(offset);}
uint64_t tell()
{return (uint64_t(ovl.OffsetHigh)<<(sizeof(DWORD)*8)) | ovl.Offset;}
uint64_t length();
private:
OVERLAPPED ovl;
HANDLE fh;
DWORD _n;
};
AsyncBinFile::AsyncBinFile (const char* name, mode_t mode, buf_mode_t buf_mode)
{
ovl.Internal = 0; ovl.InternalHigh = 0;
fh = CreateFile (
name,
(mode == WriteNew || mode == Write) ? GENERIC_WRITE : GENERIC_READ,
/*0*/ FILE_SHARE_READ | FILE_SHARE_WRITE, 0,
(mode == WriteNew) ? CREATE_NEW
: ((mode == Read) ? OPEN_EXISTING : OPEN_ALWAYS),
/*FILE_ATTRIBUTE_NORMAL*/0 | FILE_FLAG_OVERLAPPED
| (buf_mode ? (FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH) : 0),
0
);
if (fh == INVALID_HANDLE_VALUE) throw ErrOpen();
ovl.hEvent = CreateEvent (0, /*TRUE*/FALSE, FALSE, 0);
ovl.OffsetHigh = 0; ovl.Offset = 0; _n = 0;
if (ovl.hEvent == INVALID_HANDLE_VALUE) throw ErrOpen();
}
void AsyncBinFile::close()
{
if (fh != INVALID_HANDLE_VALUE) {
CancelIo (fh);
if (ovl.hEvent != INVALID_HANDLE_VALUE) CloseHandle (ovl.hEvent);
CloseHandle (fh);
fh = INVALID_HANDLE_VALUE;
if (_n != 0) throw ErrWriteRead();
}
}
void AsyncBinFile::begin_write (const uint8_t* a, uint32_t n)
{
if (_n != 0) throw ErrWriteRead();
_n = n;
BOOL r = WriteFile (fh, a, n, /*&act_n*/nullptr, &ovl);
if (!r && GetLastError() != ERROR_IO_PENDING) throw ErrWriteRead();
std::cout << "O="<<r<<'\n';
}
void AsyncBinFile::begin_read (uint8_t* a, uint32_t n)
{
if (_n != 0) throw ErrWriteRead();
_n = n;
BOOL r = ReadFile (fh, a, n, /*&act_n*/nullptr, &ovl);
if (!r && GetLastError() != ERROR_IO_PENDING) throw ErrWriteRead();
std::cout << "I="<<r<<'\n';
}
void AsyncBinFile::wait()
{
//while (!HasOverlappedIoCompleted(&ovl));
DWORD act_n;
BOOL r = GetOverlappedResult (fh, &ovl, &act_n, TRUE);
if (r == 0 || act_n != _n) throw ErrWriteRead();
seek (tell() + _n); _n = 0;
}
uint64_t AsyncBinFile::length()
{
DWORD high;
DWORD low = GetFileSize (fh, &high);
return (uint64_t(high)<<(sizeof(DWORD)*8)) | low;
}
Здесь видны фрагменты отладочного вывода и некоторые альтернативные варианты параметров, заменяя которые, пытался добиться работоспособности. В частности, стал указывать FILE_SHARE_READ | FILE_SHARE_WRITE в аттр. доступа, и убрал FILE_ATTRIBUTE_NORMAL, также менял настройку FALSE / TRUE режима автосброса события (ovl.hEvent).
Проблема в том, что при измерении времени работы, обнаружилась фактическая синхронность операции записи во всех записях кроме первой. Т.е. по идеи begin_write выполняется быстро, а ожидание окончания wait - долго, если конечно после begin_write мало что делали (закомментировали например соотв. строки). Здесь выводится "I=1" если было чтение из кэша (иначе "I=0"). "O=0" видимо всегда должно выводиться.
Возможно ошибка в начале записи др. файла (запроса записи) до ожидания окончания чтения другого. Но я конечно рассчитывал на наличие системной очереди. Тем более запросы идут к разным сущностям, разным файлам; и от очередности выполнения операций не завивисит алгоритм т.к. в конце цикла ждём конца и чтения, и записи.
Тем не менее я для тестирования изменил код, так что теперь в цикле: начало считывания новой порции; обработка текущей порции (с использованием независимых данных для текущей порции); ожидание окончания считывания (новой порции); начало записи текущей обработанной порции; создание независимых данных (для новой порции); ожидание окончания записи (текущей порции).
Вот основной код:
static uint64_t buf_a [(BUF_LEN+7)/8];
static uint64_t buf_b [(BUF_LEN+7)/8];
static uint64_t buf_g [(BUF_LEN+7)/8];
{ /*...*/
AsyncBinFile fin (_f_i, AsyncBinFile::Read);
AsyncBinFile fout (_f_o, AsyncBinFile::WriteNew);
uint64_t bytes_to_proc, file_len;
uint32_t portion, portion_next;
uint64_t *buf, *buf_next;
file_len = fin.length();
bytes_to_proc = file_len;
buf = buf_b; buf_next = buf_a;
init = Encrypt64 (init, key);
portion_next = (bytes_to_proc > BUF_LEN) ? BUF_LEN : bytes_to_proc;
auto x = chrono::steady_clock::now();
fin.begin_read ((uint8_t*)(buf_next), portion_next);
cout << 'i' << chrono::duration<double>(chrono::steady_clock::now()-x).count() << '\n';
//init = MakeGamma (buf_g, portion_next, init, key);
x = chrono::steady_clock::now();
fin.wait();
cout << 'w' << chrono::duration<double>(chrono::steady_clock::now()-x).count() << '\n';
portion = portion_next;
bytes_to_proc -= portion;
portion_next = (bytes_to_proc > BUF_LEN) ? BUF_LEN : bytes_to_proc;
while (portion_next > 0)
{
swap (buf, buf_next);
x = chrono::steady_clock::now();
fin.begin_read ((uint8_t*)(buf_next), portion_next);
cout << 'i' << chrono::duration<double>(chrono::steady_clock::now()-x).count() << '\n';
//XOR (buf, buf_g, portion);
x = chrono::steady_clock::now(); //
fin.wait(); // перенос из низа
cout << 'w' << chrono::duration<double>(chrono::steady_clock::now()-x).count() << '\n'; //
x = chrono::steady_clock::now();
fout.begin_write ((uint8_t*)(buf), portion);
cout << 'o' << chrono::duration<double>(chrono::steady_clock::now()-x).count() << '\n';
//init = MakeGamma (buf_g, portion_next, init, key);
//x = chrono::steady_clock::now();
// fin.wait(); перенос вверх
//cout << 'w' << chrono::duration<double>(chrono::steady_clock::now()-x).count() << '\n';
x = chrono::steady_clock::now();
fout.wait();
cout << 'w' << chrono::duration<double>(chrono::steady_clock::now()-x).count() << '\n';
//PB.show (file_len-bytes_to_proc, file_len);
portion = portion_next;
bytes_to_proc -= portion;
portion_next = (bytes_to_proc > BUF_LEN) ? BUF_LEN : bytes_to_proc;
}
XOR (buf_next, buf_g, portion);
fout.begin_write ((uint8_t*)(buf_next), portion);
fout.wait();
/*...*/ }
Должны выводиться времена выполнения (в сек.) операций "начало чтения", "начало записи", "ожидание конца последней операции" с префиксами "i", "o", "w" соотв. (за исключением самой последней записи после цикла, но это не важно).
При обработке файла размером около 100 Мбайт я получил такой вывод:
I=0
i0.234375
w1.26562
I=0
i0.03125
w1.48438
O=0
o0
w0.125
I=1
i0
w0
O=0
o0.0625
w0
. . .
Когда мы видим "I=0 \ i0.015625 \ w0.296875", это значит, что чтени из кэша не было и операция инициализации чтения прошла за 16 мс, и продолжалась еще 299 мс. Если мы видим "I=0 \ i0 \ w0", то значит данные подгрузились из кэша. Но только при первой записи выводится "O=0 \ o0 \ w0.125", а далее наоборот "O=0 \ o0.234375 \ w0" и аналогично. Т.е. вся запись идёт в момент инициализации начала записи.