Передаю данные о ходе работы программы в другую программу через сокеты, сообщений может быть очень много, но маленьких (сотни/тысячи в секунду) и не очень много больших (сотни мегабайт, но с интервалом в несколько минут). Задержки отправки и получения сообщений не важны (если порядок их доставки не изменяется) и отправляющее и получающее приложения могут "подвисать" на время получения сообщения.
Для отправки/получения использую SocketOutputStream/SocketInputStream и записываю/читаю данные функциями write/read. Чтобы не пытаться одновременно писать в сокет из разных потоков, создание SocketOutputStream окружено Poco:Mutex. (так же пробовал omp critical для эксперимента)
Если сообщения отсылаются редко (например, пошагово в отладчике), то всё работает правильно, независимо от размера передаваемых данных. В реальной же программе сообщения отсылаются очень часто и что-то портится. В начале каждого блока данных я записываю "магическое число", чтобы убедиться, что это правильные данные. И часто получается, что данные в SocketInputStream оказываются неправильные. Для каждого сообщения создаётся новый объект SocketOutputStream, по идее, при удалении объекта делается flush и данные должны отсылаться сразу же.
Выглядит всё так, что к данным первого блока дописана часть данных второго блока. Например, было два объекта по 10 байт, пришло 15 байт, первый объект считал свои 10 байт и "хвост" удалился. И при следующем срабатывании poll() считались оставшиеся 5 байт и в их начале уже нет "магического числа" и всё пропало... Это кажется очевидным, но ведь я специально использую классы SocketOutputStream, чтобы избавиться от ручного управления буферами, хвоставми и т.д. и т.п.
Собственно, вопрос:
Как в Poco правильно передавать и получать большие объёмы разноразмерных данных (сонтимассивов в секунду, размером в десятки байт и (редко) штучные массивы размером сотни мегабайт)? Которые могут отсылаться асинхронно из нескольких потоков.
Коротко, у тебя тут на лицо race condition. В одно и то же горло лить пытаются сразу несколько источников. Горло - это сокет. Источники - это SocketOutputStream. Они не выстраиваются в очередь и не ждут завершения транзакции записи пакета.
Пакет в сокет надо писать сразу и весь, одним вызовом функции записи. Или блокировать сокет на период записи пакета от прочих попыток записи.
Если хочешь, могу всё описать конкретнее в рамках ответа.
К слову, сообщения в несколько мегабайт - это много. У тебя циклический буфер на сколько поставлен? Он не лопнет мегабайтами вычитывать? Ты DoS по оперативной памяти на сервере не боишься?
Race condition быть не может, потому что мьютекс не даёт. Т.е. до создания объекта SocketOutputStream ждётся анлок мьютекса. Анлок происходит после удаления SocketOutputStream. Но если где-то после этого уже возникает гонка (уже после вызовов write и удаления объекта поока), то я даже не представляю, как с этим бороться тогда...
SocketOutputStream использую как раз для того, чтобы самому не управлять буфером. При одиночных запросах сотни мегабайт передаются без проблем, так что не должно быть в размере буфера проблем. Но это не точно, я только начинаю с Poco разбираться.
Обмен данными происходит на одном ПК, либо между компьютерами в локальной сети. Такой же объём данных без проблем передавался через очередь сообщений Windows (но большие данные передавались через файл), так что проблем с DoS быть не должно.
Часто и много передаются только маленькие сообщения, их может быть несколько тысяч за несколько секунд, размером порядка десятков байт, но это включая "магические числа" для проверки правильности данных. Потом можно будет заняться оптимизацией размера, если потребность будет. Сейчас вопрос производительности и загрузки сети не сильно волнует.
Евгений Шатунов, > Или блокировать сокет на период записи пакета от прочих попыток записи.
Если я правильно разобрался с библиотекой, там по умолчанию создаются блокируемые сокеты, а не блокируемые нужно созавать отдельным методом или вызывать функцию, чтобы сделать сокеты не блокируемыми. Т.е., пока не закончится запись в сокет, другой поток в этот сокет писать не сможет (плюс у меня функция записи обёрнута в мьютекс, чтобы уж наверняка - если это лишнее, потом удалю).
Но, если я правильно понял, отправиться может как бльше, так и меньше байт, чем я пытаюсь передать. И я не понял, от чего это зависит и что мне с этим делать.
maaGames, да, сокеты там блокируемые по умолчанию. Это значит, что пока данные не выйдут за пределы локального железа, функция записи в сокет не передаст тебе управление исполнением. За атомарность многопоточных операций я ручаться не могу.
Я, пожалуй, двусмысленно выразился про блокируемость. Я имею в виду вот какой момент. Ты ведь сообщение пишешь участками, по одному полю данных. SocketOutputStream не собирает твои данные в единый бинарный буфер, а сразу пишет их в сокет. Это дает волю кому угодно вклиниться из другого потока в этот сокет между операциями записи в твоем текущем потоке. Т.е. транзакция записи сообщения может быть легко нарушена. flush при этом - это барьер синхронизации между исполнением твоего кода и очисткой конвейера операций устройства (в самом устройстве). В случае блокируемых сокетов flush - фиктивная штука.
Попробуй, как минимум для проверки, писать сообщения в TLS буффер по одному, а далее передавать уже этот TLS буффер в SocketOutputStream. Т.е. передавай на запись в сокет не сообщение, а буфер, куда сообщение сериализовано.
зы
Я не заметил первое твое сообщение. Понимаешь, то, что ты видишь в буфере приема явно говорит о состоянии гонки на стороне отправки. У тебя сообщения телами пересекаются в буфере. Новый magic number в буфере приема не появляется потому что он был записан раньше чем продолжилась запись предыдущего пакета.
Евгений Шатунов, Да, у меня была мысль, что между разными write "кто-то как-то" вклинивается, не смотря на мьютекс. Поэтому я подправил обёртку класса, чтобы не сразу в сокет писалось, а чтобы все данные собирались в strstream, а потом эту строку одним write записываю в сокет. Причём, в первых 4 байтах строки записан её размер в байтах (перед записью в сокет в начало строки её размер записываю). Но это не помогло. Как мне уже подсказали, в poll может быть накоплено больше одного сообщения и они могут быть та мне полностью. Видимо с этим и связаны мои проблемы, что часть данных теряется. Надо перепроектировать чтение.
Евгений Шатунов, Если документация не врёт, то TCP. UDP испльзовать мне рановато и задержки не принципиальны.
Потом планирую маленькие сообщения сделать в ICMP, вроде как раз для малюток в несколько байт они и предназначены. Но пока нужно заставить всё корректно работать в TCP, оптимизация подождёт.
maaGames, значит TCP, стримминговый протокол. Тогда в буфере приема у тебя и правда сколько фрагментов сообщений накапало на момент сигнала от poll, сколько ты и вычитаешь из сокета в промежуточный буфер. TCP оперирует фрагментами малого размера, которые он получает из записанных в сокет данных. При этом TCP считает что ты пишешь потоком и хочешь читать все доставленные в сокет фрагменты как поток данных.
Для чтения из TCP сокета обычно используют циклический буфер. Я у тебя про него немного выше спрашивал.
Общая концепция такова, что ты в циклический буфер читаешь до момента завершения вычитки всего сообщения, дальше это сообщение уже отдается на разбор, а все, что было вычитано из сокета сверх тела текущего сообщения, остается в буфере и ждет следующей сессии по вычитке из сокета. Размер циклического буфера при этом желательно иметь намного больше чем размер самого большого сообщения, чтобы не останавливать операцию чтения и не блокировать чтение операцией разбора сообщения.
Евгений Шатунов, Меня ввело в заблуждение использование класса SocketInput/OutputStream. Я ожидал, что закрытие outputStream гарантированно прервёт данные в сокете и poll вернёт только их этого объекта массив байт. Т.е. ожидал от обёртки намного больше, чем она реально предоставляет.
Буду после poll вычитывать все байты, которые там есть, даже если больше одного объекта прилетело (с блокировкой сокета на чтение, если не весь массив байт уже загружен).
Без кода сказать можно мало что. Если есть возможность залить на гитхаб, то шансов стало бы больше. Есть 3 подозрительных момента:
Для каждого сообщения создаётся новый объект SocketOutputStream
А нужно ли это? Почему бы не использовать этот stream все время пока живет сокет?
создание SocketOutputStream окружено Poco:Mutex
Попадает ли непосредственно запись в стрим и flush(деструктор) под этот mutex?
использую классы SocketOutputStream, чтобы избавиться от ручного управления буферами, хвоставми и т.д. и т.п.
Насколько я успел понять по документации этот класс ничего не знает про границы сообщений. Это примерно такой же стрим как и для записи в файл. Поэтому если размер сообщений может быть разный, то надо смотреть как вы вычитываете данные.
По отладке сетевого кода могу посоветовать логирование вместо отладчика.
Мне нужно как-то узнавать, что пришло новое сообщение (или событие, не знаю, какая терминология правильная), получать его идентификатор и, в зависимости от этого, считывать различный объём байт.
Условно, отправляю три сообщения (идентификатор + данные):
версия+номер ,
название+текст ,
тип_объекта+массив_байт_этого_объекта
И на сервере я слушаю порт методом poll(), когда что-то прилетает, я считываю 4 байта идентификатора и в зависимости от него считываю различный объём байт. Размер считываемых байт в начале каждого блока данных записан, т.е. я знаю, сколько байт нужн осчитать.
Вернее, после срабатывания poll я создаю SocketInputStream передавая тот же сокет, который ждал poll и уже из этого объекта считываю идентификатор и данные. И в итоге у меня получается, что объект SocketInputStream начинает считывать данные "со смещением", если можно так выразиться, в начале должно быть магическое число, но по факту там находятся данные предыдущего или следующего сообщения (сложно понять, тех или тех, но точно не случайный мусор).
По идее, мутекс анлочится после fush. Но у меня нет уверенности, что сокет будет пересылать данные сразу при этом, а не будет ждать заполнения буфера до 1024 байт (по умолчанию там такой буфер указан) и/или закрытия сокета.
tsarevfs Про логирование - спасибо. А можете что-то посоветовать для сравнения двух файлов по тайм кодам? Просто глазами два файла с тысячами строк смотреть и сихронизировать их по времени - немного неприятно.
maaGames, poll говорит вам только о том, что пришли какие-то данные. Это может быть половина или 2.5 сообщения, которые вы ждете. Вы поддерживаете эти ситуации?
tsarevfs, Самостоятельно не поддерживаю. Разве классы-обёртки SocketInputStream не реализуют как раз функционал "поддержки"? В тестовом примере я передавал std::vector на сотню мегабайт, он "самостоятельно" разбился на куски разных размеров и на сервере из этих кусков собрался обратно. Т.е. я в кленте написал: отправить 4 байта с размером массива, отправить N байт самого массива. А на сервере после срабатывания poll написал: считать 4 байта размера, vector.resize(N), считать N байт в массив. Т.е. ни циклических буферов, ни чего подобного самостоятельно не реализовывал, все фрагменты подтянул SocketInputStream.
tsarevfs, Видимо, если в poll пришло больше 1 сообщения, то все данные, которые не относились к первому сообщению - теряются. И при последующем срабатывании poll я уже пытаюсь работать с неправильными данными, потому что старый "хвост" потерялся.
Видимо, после poll и чтения первого объекта нужно не заново запускать poll, а проверять, достигнут ли eof в потоке? Или как правильно проверить, что данных в сокете не осталось?
maaGames, Если как текст сравнивать, то консольный diff, он же есть как git diff. Есть куча ui утилит meld, p4merge
Именно временные зависимости тоже можно как-то отслеживать, нашлось например: simple-evcorr.github.io
Но выглядит сложно и лучше начать с более простых методов.
Можно использовать NVTX аннотации в коде + https://developer.nvidia.com/nsight-systems и тогда будет таймлайн с событиями как у них на скриншотах. Но это тоже потребует времени на вход.
maaGames, Вроде не должно пропадать ничего. Да, чтения могут быть блокирующими, если еще не получено все целиком, но в целом ваш подход выглядит рабочим.
tsarevfs, Думаю, у меня проблема именно когда в poll больше 1 объекта успевает прилететь. Ведь если после срабатывания poll я считаю из сокета не все данные, то при следующем срабатывании poll те недочитанные данные потеряются? По поведению похоже, что именно это происходит. Попробую что-то типа такого (сейчас у меня есть только while-poll, без внутреннего do-while):
tsarevfs, Под словом "сообщение" я подразумевал бинарные данные. Т.е. именно текста там нет в принципе. В логах будет только время, человеко-дружелюбный идентификатор и размер блока байт...
Спасибо за подскзки. Надеюсь, я правильно понял причину проблемы и смогу починить.
maaGames, Похоже все так. SocketInputStream использует BufferedBidirectionalStreamBuf . Это значит что он может вычитать в свой внутренний буфер больше чем вы ожидаете. Простое решение -- не создавать SocketInputStream для каждого сообщения, а только 1 раз для каждого сокета.
Вероятно придется для каждого poll читать пока sis.rdbuf()->in_avail() не вернет 0. eof https://en.cppreference.com/w/cpp/io/basic_streamb...
tsarevfs, Да, нет никакого смысла создавать каждый раз inputstream, я ожидал другого поведения, поэтмоу создавал. Раз всё-равно нужно объём переданных данных самому контролировать, то можно одновременно с сокетом его создать один раз и всё.
tsarevfs, Добавил чтение всех объектов внутри poll, всё сразу заработало, как планиорвалось.
Только вместо rdbuf->in_avail нужно использовать sis.eof(). in_avail, похоже, буферизирует число байт сразу после poll и не уменьшает значение (или как-то не очевидно там значение меняется, в общем падает чтение). А c eof отработало всё нормально.
Для передачи вы видимо используете TCP. В ТСР нет разделения на сообщения, все данные передаются в одном потоке в том порядке, в котором были отправлены. Т.е. вполне реальна ситуация, когда вы посылаете например 2 сообщения по 10 байт каждое, а читает 3 раза по 5, 10, 5 байт за раз. Таким образом разделять на сообщения вы должны на принимающей стороне самостоятельно.
РОСО тут не причем - эменно так работает протокол ТРС. Вместо TCP можете использовать UDP - тут будете принимать именно сообщениями. Но есть сложность с дефрагментацией - если размер сообщения больше MTU, то сообщение будет фрагментировано при передаче и не факт, что куски дойдут и соберуться. Ну и еще UDP не гарантирует доставку, т.е. сообщения могут теряться и ни получатель ни отправитель об этом не узнает.
Так же можно использовать протокол SCTP, он так же как UDP работает с сообщениями, но при этом гарантирует доставку. SCTP входит в стандартный стек TCP/IP. Есть ли его поддержка в POCO я не в курсе.
Автор пытается построить свой прикладной протокол поверх сокетов. Этого не надо делать т.к такие протоколы уже созданы. Ключевые слова: jms, mq, apache-mq, kafka, rabbitmq, ibmmq.