Задать вопрос
drem1lin
@drem1lin
чуть программист, чуть чуть админ...

Как устранить гонку при использовании именованного канала?

Всем привет!
Я хочу связать два процесса двунаправленной связью на основе named pipe, т.е. обе стороны могут пересылать сообщения друг другу используя один pipe. Для этого создал класс Pipe и небольшую программу его использующую.
Код класса
#ifndef __NamedPipeClass_H__
#define __NamedPipeClass_H__

#include <thread>


class NamedPipeClass
{
public:
	NamedPipeClass(char* _pipeName);
	NamedPipeClass(char* _pipeName, void (*_onRead)(unsigned char*, unsigned int, NamedPipeClass*));

	int Connect();
	int Create();
	int Disconnect();
	int Delete();

	int Write(unsigned char * buffer, unsigned int bufferLen);

	~NamedPipeClass();
private:
	char pipeName[256];
	int pipeDescriptor;

	void (*onRead)(unsigned char*, unsigned int, NamedPipeClass*);
	int ReadingThread();

	bool bExit;


};

#endif


с реализацией
#include "NamedPipeClass.h"

#include <stdio.h>
#include <unistd.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <errno.h>
#include <string.h>
#include <thread>

#define MAX_BUF 4096


NamedPipeClass::NamedPipeClass(char* _pipeName, void (*_onRead)(unsigned char*, unsigned int, NamedPipeClass*))
{
	memset(pipeName, 0, 256*sizeof(char));
	strcpy(pipeName, _pipeName);
	onRead = _onRead;
	pipeDescriptor =0;
	bExit = false;
}

NamedPipeClass::NamedPipeClass(char* _pipeName)
 : NamedPipeClass::NamedPipeClass(pipeName, NULL)
{
}


NamedPipeClass::~NamedPipeClass()
{
	close(pipeDescriptor);
	unlink(pipeName);
}

int NamedPipeClass::Create()
{
	/* create the FIFO (named pipe) */
	if(mkfifo(pipeName, S_IFIFO|S_IRWXU|S_IRWXG|S_IRWXO) != 0)
    {
		printf("create named pipe error = %d\n", errno); /* произошла ошибка */
	    return errno;
    }

    return 0;
}
/*
void* call_ReadingThread(void* Pipe) {
    HLTNamedPipeClass* network = static_cast<HLTNamedPipeClass*>(Pipe);

    void* result = network->SocketHandler(somearg);

    // do something w/ result

    return nullptr;
}
*/

int NamedPipeClass::Connect()
{
	//int err = 0;
	pipeDescriptor = open(pipeName, O_RDWR);
	if (pipeDescriptor == -1) {
	    printf("client open FIFO error = %d\n", errno); /* произошла ошибка */
	    return errno;
	}

	/*err = pthread_create(&ReadThreadId, NULL, &call_ReadingThread, this);
    if (err != 0)
    {
        printf("\ncan't create thread :[%s]", strerror(err));
        return err;
    }*/

	std::thread Reading(&NamedPipeClass::ReadingThread, this);
	Reading.detach();


    return 0;
}
int NamedPipeClass::ReadingThread()
{
	unsigned char buf[MAX_BUF];
	ssize_t bytesReaded = 0;
	while(!bExit)
	{
		bytesReaded = read(pipeDescriptor, buf, MAX_BUF);
		if(bytesReaded>0)
		{
			if(onRead!=NULL)
				onRead(buf, bytesReaded, this);
		}
	}
	return 0;
}

int NamedPipeClass::Disconnect()
{
	close(pipeDescriptor);
    return 0;
}

int NamedPipeClass::Delete()
{
	unlink(pipeName);
    return 0;
}

int NamedPipeClass::Write(unsigned char * buffer, unsigned int bufferLen)
{
	return write(pipeDescriptor, buffer, bufferLen);
}


и основной программой
main
#include <stdio.h>
#include <unistd.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/wait.h>
#include <errno.h>
#include <string.h>
#include <thread>
#include <iostream>

#include "NamedPipeClass.h"

bool g_ReaderStop = false;

void OnRead1(unsigned char * buf, unsigned int bufLen, NamedPipeClass* pipe)
{
	printf("OnRead1 %s\n", buf);
}

void OnRead2(unsigned char * buf, unsigned int bufLen, NamedPipeClass* pipe)
{
	printf("OnRead2 %s\n", buf);
	if(buf[0]=='q')
	{
		g_ReaderStop=true;
	}
}

int Writer(NamedPipeClass* hltPipe)
{
	unsigned char c;
	do
	{
		std::cin>>c;
		hltPipe->Write(&c, 1);
	} while(c!='q');
	return 0;
}

int main() {
    char * myfifo = "//tmp//myfifo\0";
    pid_t pid_1 = 0;

    switch(pid_1=fork()) {
	case -1:
        printf("fork error %d\n", pid_1); /* произошла ошибка */
		break;
	case 0:
		{
			NamedPipeClass* hltPipe = new NamedPipeClass(myfifo, OnRead2);

			hltPipe->Create();
			hltPipe->Connect();

			while(!g_ReaderStop)
			{
				sleep(1);
			}
			hltPipe->Disconnect();
			hltPipe->Delete();
		}
	default:
		{
			NamedPipeClass* hltPipe = new NamedPipeClass(myfifo, OnRead1);
			//hltPipe->Create();
			hltPipe->Connect();

			std::thread writer(Writer, hltPipe);
			writer.join();

			hltPipe->Disconnect();
			hltPipe->Delete();
		}
    return 0;
    }
}

и в результате получил гонку
w
OnRead1 w
e
OnRead1 e
r
OnRead1 r
t
OnRead2 t
y
OnRead1 y
u

Как исправить подобную ситуацию? необходимо два pipe? с радостью приму любые комментарии, в том числе и по структуре. Может у кого есть готовый пример подобной реализации?
  • Вопрос задан
  • 364 просмотра
Подписаться 3 Оценить Комментировать
Решения вопроса 1
@Sumor
Можно организовать одной трубой, но мороки много. Вы должны "захватить" канал для передачи, например, послав специальную команду "начало передачи". При получении вторая сторона переключается в режим чтения и обязуется не посылать сообщения до сигнала "конец передачи". Первая сторона в момент передачи ничего из трубы не читает.
Таким образом, ому надо что-то передать - захватывает канал и передаёт, а второй его слушается.

Можно организовать две трубы. Одна только на передачу, вторая только на приём. Если одна сторона только отвечает на вопросы, то это неплохой вариант. Проблема может возникнуть, если вторая сторона может не только отвечать, но и сама что-то передавать. Тогда нужно аккуратно отделить оба случая, что-бы не допустить внутренней гонки за трубу и разделять оба варианта на приёме.

А можно организовать 4 трубы. Две на посылку команды первой стороной и ответ второй. И ещё две на посылку команд второй стороны и ответ первой.
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы