В каких случаях fread/fwrite для стрима (сокета) разумнее сделать чанками? А когда наоборот — обязательно завершить чтение/запись?
Пишу клиент для приложения, которое работает на tcp сокете.
Клиент может как получать, так и отдавать большое количество данных.
С одной стороны понимая, что опрос всех соединений в пуле будет происходит по очереди и чтобы одно из них не простаивало разумно в каждом шаге опроса отсылать например 1024 байта, в следующем шаге - еще 1024...
(ведь если сделать "отсылать всё что передали в функцию", то следующий шаг (а равно и опрос других потоков) не наступит, стало быть потоки будут висеть. Ну а вдруг я гигабайт передаю, и на сервере в сокет могут приходить ответы, но я их не считываю, потому что занят записью, сервер за время отправки гигабайта может вообще соединение потушить по пингу.)
Осложняет так же то, что в приложении для которого клиент реализовано понятие "подписки". Если я из одного клиента отправлю подписку на три топика, каждый топик может получать ответы. Это значит что если я реализую отправку по чанкам, мне почему-то кажется, что до приложения может дойти каша. Впрочем насчет каши я сделаю пул соединений и в каждое чанками будет отсылаться то что нужно отправить именно в это соединение.
Но меня смущает чтение (может там так же будет, коль скоро я отправляю по очереди, то и ответы будут приходить в том же порядке и все нормально?). Отсылая команды по кусочкам, некоторые будут отосланы, соответственно придут на них ответы или подтверждения. А понимая что я считываю чанками (да еще и по нескольким топикам), может прийти тоже каша (опять же не уверен, может и нормально будет).
Соответственно как правильно разруливается такая работа считывания записи понимая, что в каждом шаге будет происходит отправка и получение только части информации?
Вы явно недостаточно изучили механизмы работы сокетов и методы работы с ними. Есть два основных механизма: блокирующая передача/прием и не блокирующая передача/прием (в разных ОС есть свои ньюансы, которые можно прочитать в документации). Т.е., в первом случае поток приложения, которое принимает/передает данные, ожидает окончания передачи или приема данных, а во втором же случае приложение просто передает буфер системе и далее занимается уже своей работой. А вот уже как и с какой скоростью данные будут переданы или приняты - это уже работа операционной системы, а по окончании передачи/приема - либо ОС уведомляет приложение о том, что столько-то данных лежит в буфере - надо забрать, в случае приема либо приложение само проверяет периодически сколько там байт уже в буфере лежит. Аналогично с передачей данных - приходит уведомление о том, что данные переданы (колббэк или еще как - не принципиально в данный момент). В общем случае надо организовать очередь и пул "воркеров" - потоков, которые будут заниматься передачей данных между приложением и системой. Т.е., прочитали блок данных с диска - отправили в очередь, а первый освободившийся воркер эти данные передаст. Аналогично с приемом - воркеры принимают данные и складывают в очередь, а приложение забирает их из очереди. Само собой, в очередь надо класть не безымянный блок данных, а вместе с информацией о том, откуда и куда эти данные, а так же номер блока и общее их число. Число воркеров обычно соответствует числу ядер процессора или на один меньше и работают они все тоже асинхронно с сокетами. Воркер может быть и один, если не стоит цель максимально утилизровать процессор и сеть.
Вы имеете в виду, что очередь в итоге одна для всех соединений, вместо того, чтобы держать по очереди на соединение?
пс. в пхп не знаю функции "сколько там накопилось в сокете для чтения". предполагаю что нужно читать N байт и процессить по мере чтения и останавливать по какому-то критерию, ведь конца сокета пока его на той стороне не закроют - нету
пс.2. даже если узнаю, то вопрос вешать ли все потоки пока все не считаю - открыт и пока не понят мной. любая параллелька, будь то async/await в яваскрипте или pcntl_fork имеет начало и конец, и логика в том, чтобы начав параллель в какой-то момент опросить завершенность процесса, чтобы закрыть потомков. если воркеры начинают выполнять действие в параллели, но чтение должно продолжаться, то вопрос подвешивания и ожидания пока "все потомки закончат" (он же Promise.all()) остановит чтение, стало быть ожидалки быть не должно, просто запускаю параллель и в каждом опросе потоков уточняю что закончилось и можно закрыть, и только после закрытия имею столько-то команд на запись в ответ что "я всё прочел, спасибо".
===
кроме того мне с трудом понимается для чего подымать несколько воркеров для того чтобы обслуживать одно соединение, хотя это разумно, ведь каждое сообщение входящее нужно процессить (а процесс может и час занимать), а значит другие будут ждать, отсюда как бы и воркеры обязательно.
и еще вот это вызывает вопросы "а так же номер блока и общее их число", в пхп есть fread() которая вернет мне столько то байт, разбиение по блокам откуда берется?
====
представим ситуацию что в ответ на сообщение "пришлите мне видео" я должен отослать целый фильм. если я подыму десять воркеров и буду в один сокет ими всеми отвечать отсылая видео по кускам, то они придут вразнобой, и торрент механизма на принимающей стороне нету. стало бы отсылать я их должен строго в нужном порядке, синхронно, но при этом не мешая продолжающемуся процессу чтения, то есть обходя циклом все соединения кусок считал, кусок передал, и так либо бесконечно, либо до наступления некоего критерия.
и больше всего здесь ломает именно то, что продолжающийся процесс чтения (и процессинг новых приходящих ответов) заставит писать в тот момент, когда прошлая запись еще не завершилась, и стало быть если соединение одно - поток записи будет кашей...
безусловно можно открывать новое соединение на каждый чих, но мне кажется такая система недолго проживет.
Хмм, тогда, возможно стоит все же озвучить свою итоговую задачу? Потому что для пыха скорее всего есть какая-нибудь стандартная библиотека, которая успешно решает вашу задачу. Ну или решать вашу задачу в рамках более удобного инструмента.
VoidVolker, это клиент для очереди сообщений с подписками на них (PUB/SUB) и режимом запрос-ответ (REQUEST/REPLY). Сервер будет слать на клиент входящие, когда посчитает нужным. А клиент - отвечать на них, если это требуется, опять же - как посчитает нужным.
То есть это не выполнение какой-то задачи на проекте, это именно библиотека через которую другие будут выполнять задачи на проекте, обертка, позволяющая навешать на происходящие в очереди события некие действия. Но никто не мешает отправить исходящим в очередь гиг, если там нет лимита он таки должен дойти, и при этом продолжать чтение как с других соединений, так и с этого.
И пишу я её не из общего интереса, а потому что php клиента нету, я пытаюсь попросту повторить с тем же синтаксисом, что есть например для NodeJS, только на пыхе, перевести, если по-простому.
И в пыхе есть функции
считать
вписать
проверить_окончание_потока (там нету "скока байт, есть только - поток все еще открыт на той стороне?")
открыть_соединение
закрыть_соединение
и функция stream_select() (которая по сути foreach только с таймаутом - перебирает по очереди список потоков и дает им время что-то прочитать или написать туда, после чего переходит к следующему).
И что-то мне подсказывает, что точно такие же функции есть и в других языках, и других быть не может, просто кто-то уже выполнил ту задачу что делаю я, и красиво их обернул, чтобы они "были в другом инструменте"
Я понимаю что чтение бывает блокирующее и не-блокирующее, я лишь хочу понять в каких случаях - какое.
Так например клиент на открытие соединения отвечает настройками сервера, и ожидает что получив их, я выполню авторизацию.
В данном случае мне вроде как нужно блокирующее чтение до конца всего сообщения с этой информацией. Значит я должен запустить чтение циклом с ограничением "команда считана или пинг превышен". С другой стороны, когда я соберу соединения в пул, то заблокировать пул пока кто-то из них читает тож нельзя получается.
php не самый удачный выбор для реализации данной задачи, т.к. очевидно, что клиент должен работать в течении продолжительного времени, а php изначально проектировался как скриптовый язык для разового запуска и отдачи результата в виде веб-странички.
VoidVolker, понимаю вас, но с момента появления в php консоли - это уже давно не так. Он умеет все то же что и другие языки - и подымать скрипты-демоны, и делать асинхронно задачи. Меня интересует принципиально подход для чего блокирующее чтение и не-блокирующее, язык для меня вторичен.
Если гипотетически рассуждать про параллельность - то её не бывает, бывает свободные исполнители синхронного кода, которые можно нагрузить задачами, что с точки зрения промежутка времени будет выглядеть как одновременность. Все равно есть первый скрипт, который держит контроль над параллелью, и в случае бесперебойного чтения записи этот скрипт прерывается по интервалу и проверяет состояние следующего исполнителя. Так или иначе параллель это про "дать работу тем троим" и либо подождать "пока закончат" (подвесить приложение опросом до тех пор, пока состояние не изменится в единицу) либо "сходить к ним через таймаут, и если еще нет, то еще через таймаут", чтобы понять, что пора закрывать форки.
Но вопрос не совсем про параллельность, а про то, в какие моменты блокирующее чтение/запись маст хэв, потому что во всех остальных случаях - неблокирующее лучше. Если мы смотрим на это через призму языка - то программист решает, как ему делать и как он привык. А вот с призмы надежности наверняка есть какие-то критерии.
Правильно ли я понимаю, что если протокол сервер-клиент такой, что после коннекта нужно дождаться ответа, а затем ответить, авторизовавшись, то эта часть должна быть блокирующей, тогда как слушание пула соединений в блокирующем виде превратится в слушание одного соединения.
а про то, в какие моменты блокирующее чтение/запись маст хэв
Например, когда надо полностью утилизировать ресурс. Пример: диск -> ядро -> сеть. Для простоты примера будем считать что все работает одинаково быстро. Тогда, сначала читаем блок с диска и загружаем в память, далее передаем блок данных по сети. Пока идет передача данных по сети - диск простаивает, а пока читаем с диска - сеть простаивает. Удваиваем число ядер (либо ускоряем процессор в два раза) и запускаем два потока: первый считывает блок с диска и передает второму потоку, который блок данных передает по сети. А вот в реале все гораздо сложнее: все элементы системы работают с разной скоростью и производительностью, плюс она непостоянная - есть еще кэши, которые заканчиваются, есть другие потребители ресурсов и прочее. Поэтому задача передачи и приема данных и разбивается на модули, которые могут работать независимо, для более эффективного использования ресурсов. Для этого и используется отдельные потоки для чтения/передачи данных и очереди. Конечно, все то же самое можно делать и в одном потоке и асинхронно, но с меньшей гибкостью и управляемостью. Например, пришла тысяча запросов на тысячу файлов - а диск за адекватное время выдаст только 10 и мы без разбора асинхронно сразу тысячу пытаемся прочитать и получаем перегрузку диска. И опять же - можно комбинировать все разных вариациях. Нужно просто сделать декомпозицию задачи, все расписать и разбить на отдельные модули.
тогда добавлю инфу что в моем случае утилизация железа недоступна. я не могу управлять раздельно памятью и процессором и пытаюсь применить утилизацию чтение и запись к нескольким подключениям в сети стараясь обеспечить равномерную их нагрузку вместо полной утилизации каждого по очереди. при этомимею некоторые действия требующие загрузить процессор работой но при этом избежать простоя сети. основных таких действий тут два. авторизация при старте требующая почитать одно соединение чтобы продолжить писать и важно что в случае потери связи такое же действие придется выполнить не останавливая те соединения где все ок и обработка входящих которые сами по себе могут занимать большое время изза навешанных действий но результат мне нужно отправить в поток записи то есть одно соединение должно висеть обмениваясь пинг понг пока выполняется операция
Я тут немного подумал и исходя из ваших советов моя задача сделать методы, которые стартуют бесконечный цикл для чтения, после чтения парсят прочитанное разделяя таким образом на то, что уже можно процессить и то, что еще "не дочиталось" и складывают первое в буфферы по каждому соединению из пула.
Условный другой исполнитель должен подключиться к этим буфферам и точно так же запустить одновременный процессинг забирая по одному сообщению из каждого пула, результаты обработки кладет в свои собственные буфферы готово.
Условный третий исполнитель подключается к буфферам "готово" и запускает бесконечный цикл для записи по мере того как в пулах готовности что-то появляется и пишет ответы, если они нужны, в том числе он же может писать команды, делая прерывание, если такая команда приходит.
Выглядит то разумно. Но остаются вопросы.
1. Чтобы восстановить потерянное соединение, нужно подключиться, потом считать, и потом написать ответ. Без этого система попросту не будет получать дальнейшие сообщения. И здесь задействованы все три исполнителя, оно разве не повлияет на остальные вызвав какие-то простои? Или это мне кажется, на самом деле в простое будет только то соединение, которое просто ожидает, когда процесс пройдет по его собственному пути, остальные будут продолжать себе работать?
2. Еще остается вопрос с большими сообщениями. В протоколе указано число байт в теле сообщения при его чтении. Но ведь если я на одном из соединений вызову условный "читай 1 гигабайт пока не закончишь", то система сожрет всю сетку только на чтение а заодно подвесит поток. Значит ли это что здоровенные файлы нужно продолжать равномерно читать допустим по 1килобайту на соединение за шаг цикла чтения и складывать это в буффер пока все данные не будут получены? Или здесь нужен какой-то умный балансировщик который еще и учитывает что если мы знаем заранее известное число байт, то он пропорционально отдает больше сети тому, у кого впереди скачка целого гигабайта? Понимаете, как это на коде будет выглядеть - цикл бесконечный, в котором мы пробегаемся по соединениям и на каждом вызываем чтение. Если вызвать чтение гигабайта, то шаг цикла закончится минут через 10 (добавляем сюда возможный обрыв интернета и тогда = никогда), остальные будут терпеливо ждать. Либо я всем жестко ставлю качай по 1 килобайту, и тогда все будет равномерно, но скорее всего гиг будет качаться куда больше часов чем мне хотелось бы, потому что даже те соединения где читать в данный момент нечего все равно будут пытаться подключиться, чтобы убедится, что там ничего нет. Где баланс?
в моем случае утилизация железа недоступна. я не могу управлять раздельно памятью и процессором
Этого и не нужно, т.к. этим занимается операционная система. А вот со стороны ЯП нам доступны API операционной системы и вот используя эти API и логику работы приложения вполне можно регулировать нагрузку на железную часть. Т.е., диск медленный - реже запрашиваем данные с него, процессор очень быстрый - грузим его вычислениями, сеть просто быстрая - значит, максимально быстро отдаем данные в сеть. И вот тут становятся очень полезны такие штуки как потоки и очереди. Например, один поток/пул получает запросы и добавляет их в первую очередь для чтения с диска, второй поток забирает задачи из первой очереди и читает с диска данные блоками и кладет их в буферы оперативной памяти, а ссылки на буферы - в очередь, при этом он проверяет предел заполнения очереди (ограничен объемом ОЗУ и следовательно числом элементов в очереди) и если много - то просто ждет вместо чтения данных (как и первый поток с первой очередью), третий поток или пул потоков забирает данные из второй очереди и отправляет их в сеть. При этом, каждый поток будет работать ровно с той скоростью, с какой позволяет доступный ему ресурс - диск, процессор или сеть.
Выглядит то разумно. Но остаются вопросы....
Скорость диска и сети все равно будут делиться на все параллельные загрузки. Как только мелкие файлы уйдут в сеть - большие файлы будут отдаваться быстрее. Поэтому и данные с диска читаем небольшими блоками, а не файл целиком. В идеале - блоками размером с сетевой пакет, что позволит избежать фрагментации пакетов. А так, там еще много всяких ньюансов вылезет в процессе.
Напоминаю еще раз: сделайте декомпозицию своей задачи. Возьмите DrawIO/FlowChart/бумагу и цветные ручки/что угодно для рисования диаграмм и полностью распишите требуемую логику в виде графа. Должно получиться что-то типа такого:
Логическая схема
И вот уже имея логическую схему - можно легко выделить нужные модули, доработать схему и реализовать уже в коде и посмотреть как оно работает.
Я прочел всё до запятой, и даже понял про что написано, но мои вопросы никуда не делись. Абстрактно то это всё понятно, но конкретика ломает.
Сейчас я написал несколько функций, которые сами по себе блокирующие (как и любые другие - пока они выполняются - простаиваем), но представляющие собой некое атоморное действие - т.е. блокирующее на очень малое время. Если их соединить в бесконечном цикле в логику - имею несколько точек выхода (т.н. стримов), и оно в общем-то пашет.
Чтение ведется через стрим чтения. Парсинг - либо наличие разделителя \n в стриме говорит о том, что "до него = команда" либо если команда состоит из её и бади - то в её теле есть число байт и пока число байт нед остигнуто парсинг не производится, а только продолжается чтение.
Запись ведется через стрим записи, все что записать в шаге не удалось возвращается назад в стрим записи. Вот тут есть вопрос - насколько адекватно стрим записи делить на "текущая команда" и "оставшийся буфер" (потому что работает и без этого). То есть я в стрим сейчас просто контент сую, после записи из него выгрызаю столько байт сколько в чанке, если записалось меньше, возвращаю кусочек обратно с помощью buffer.concat(rest, buffer), что вроде верно, с другой стороны не укладывается в идею что писать нужно строго так, как приказывали, то есть если команда не до конца отдана на запись, то нужно повторять команду, а не пару символов из нее которые не дослали. Опять же - имею дело с сокетом, ему в теории пофиг, пока не пришлю разделитель - ничего сделано не будет. С другой стороны к моменту записи мог произойти дисконнект, который по-хорошему означает очистить все стримы, или есть кейзы когда так нельзя (в очередях сообщений бывает такое?). Также есть кейз отправки сообщения с бадиком, в котором, если мы не дошлем бадик сообщения, но отправим команду "новое сообщение" - то сервер (наверное) вызовет операцию на кривом сообщении т.е. сообщение нужно писать "блокирующим способом" - или целиком или не отсылать совсем? Опять же, решает походу разделитель, т.е. кусок сообщения не содержит завершающего разделителя, а значит не является "кривым сообщением".
В итоге на текущий момент сообщения парсятся и кладутся в стрим с сообщениями.
Теперь вопрос классов собственно - как этим всем удобно будет пользоваться? Не мне, я то по-любому могу написать, а тому, кто мой клиент поставит. Лучшим ли способом будет просто повторить все возможности исходного консольного клиента, предоставив методы для их вызова, а дальше как хочешь?
В старой библиотеке было, что можно просто добавить обработчик подписки, колбэк пропихивали и когда сообщение приходит - вызывали. Если колбэк тяжелый - операция блокировала поток чтения далее. Мне это совсем не нравилось, чессговоря я не знаю правильно ли так вообще делать - приостанавливать весь поток чтения пока сообщение не будет обработано. Ведь в этом случае даже PING/PONG обмен не идет, и сервер через N секунд вообще соединение закроет и будет пофиг на ответы.
А как сделать это же умнее - у меня снова непонятки. Вроде как сделать такое же атомарное действие "обработать одно сообщение" - и пусть программист второй поток подымает, если операционка это позволяет. Второй вариант - написать адаптер для параллельного запуска, но по-умолчанию закрыть его последовательным, кому надо - реализует, можно реализовать свой пример.
Или другой вариант - вообще не делать обработку, но позволять вычитывать сообщения по нескольку штук, то есть какую-то операцию вроде readBatch(min, max), которая выдаст сообщения, как захотел - обработал, и вызвал на клиенте действия "ответить" хоть через час, если соединение еще живое. В редисе например так, хочешь обработать - делаешь lpop/lmove, а не ждешь пока колбэк завершится.
Второй вариант как-то грамотнее смотрится (особенно учитывая что консольный клиент ничего не обрабатывал, его задача была ПЕРЕДАТЬ сообщение, а не ОБРАБОТАТЬ его), но что-то я не ловлю до какой степени упарываться в фильтрах чтения. Ведь блокирующую операцию вычитывания можно остановить: по времени, по числу байт, по ручному вызову в обработчике, по числу сообщений в стриме - и это самое интересное что "число сообщений" - это вообще фильтр "и или", что-то вроде WHERE LIMIT OFFSET и это вроде все здорово смотрится, только какая-то сложная дичь, а не клиент для очереди получится.
В документации самого протокола предполагается, что любая команда может вернуть ошибку, значит ли это, что после отправки чего-то стоит обязательно убедится (а значит приостановить запись и дождаться подтверждения или ошибки) что нет косяка.
Ну, исходя из всего этого не самого детального и подробного описания задачи, я могу сделать вывод, что в данном случае идет передача каких-то сообщений и/или команд. Принципиально логика работы с данными тут никак не будет отличаться от описанного мною ранее алгоритма. В целом же, я бы рекомендовал сделать так - т.к. это библиотека, то она должна: предоставлять метод для запуска сервера отдельным потоком и предоставлять метод для оповещения о пришедшем сообщении / колббэк, а вот коллбэк уже должен запускаться в пуле воркеров для процессинга данных, число которых указывается в опциях запуска сервера. Т.е., библиотека предоставляет класс Server, пользователь создает новый экземпляр с нужными опциями и далее используя методы этого класса управляет соединением, сообщениями и прочими опциями. А сервер для каждого соединения читает данные маленькими блоками и складирует их в очередь. А из очереди данные берут воркеры. Для соединений - тоже отдельный класс для управления конкретным соединением. И все же, еще раз напомню, что для упрощения задачи надо всю её логику расписать в виде графа - сразу все станет намного понятнее и очевиднее.
VoidVolker, так в том то и веселуха, что я не пытаюсь "написать", я пытаюсь понять как в этой штуке работать.
Вот кейз например.
Каждое сообщение возвращает подтверждение. Стало быть после отсылки сообщения я должен дождаться этого подтверждения, то есть включить блокирующее чтение пока не получу ответ. Однако к тому времени как я отослал команду в потоке чтения будет уже не только этот ответ. Или если я отправлю 3 команды. Я должен получить три подтверждения. И я не увижу какое к какой. Если команда кривая - я не получу подтверждения вовсе. И я случайно могу попасть в бесконечный луп ожидания подтверждения. Или перед подтверждением получить еще какие-то входящие, которые успели прийти с сервера пока я слал свою команду, сокет то один.
1. Отправляю команду
2. Пишу "читай пока не встретишь 200 OK"
3а) 200 ОК пришел сразу же
3б) 200 ОК вообще не пришел. Кроме того сам брокер выдает это только если реально все хорошо, если что-то не так - ничего не выдает.
3в) 200 ОК пришел, но перед ним были другие сообщения. Их нужно начать процессить в пуле, но это пол-беды. Результатом процессинга может быть ответ, который добавится в очередь (поток) на запись. Запись в свою очередь может породить уже свои собственные 200 ОК и возврщаемся в начала этого списка.
3г) я отправлял несколько комманд и пришло несколько 200 ОК и там не написано какое подтверждение к какой команде
Таймаут? Не выход, можно недополучить.
Фильтр входящих с подсчетом? Все так же - подтверждение может вообще не прийти.
Приходят мысли только на очередь ожидания подтверждения, и как приходит ожидаемое сообщение из очереди первый вынимается. Но от такой очереди мало толку, ведь если подтверждения не придет - то очередь будет копиться и резолвится как сама хочет. Отправил 3 команды, на вторую не пришло подтверждение, но она зарезолвилась потому что пришло на третью и третья думает что это она не завершилась.