Eugene-Usachev
@Eugene-Usachev

Почему код ведёт себя странно при работе со чтением?

Есть некий сервер, который принимает по TCP запрос(несколько запросов в одном TCP сообщении) и возвращает ответ (столько же ответов в одном TCP сообщении). Сервер всегда возвращает ответы в том же порядке, что и запросы. И клиент и сервер имеют одинаковый способ обмена сообщениями: сначала идёт 2 байта, которые означают длину сообщения, затем сообщение данной длины. Если 2 байта заполнены, читаются следующие 4 байта. Я начал писать клиент к рабочему серверу (там точно не ошибок). Получилось, что клиент имеет sync.Pool от Pipe, где
type Pipe struct {
	writeBuf []byte
	connPool *sync.Pool

	readBuf []byte

	Result chan Res

	queue              chan []byte
	queueSize          int
	maxQueueSize       int
	maxWriteBufferSize int

	m *sync.Mutex

	wasExecutedLastTime bool
}


Дальше я правильно заполняю Pipe и теперь мне надо выполнить этот Pipe. Я написал функцию (вырезка из кода):
func (pipe *Pipe) ExecPipe() {
	// some write...
	l, err := conn.Read(pipe.readBuf[:])
	if err != nil {
		for i := 0; i < pipe.queueSize; i++ {
			pipe.Result <- Res{
				Slice: nil,
				Err:   err,
			}
		}
		pipe.queueSize = 0
		return
	}
	offset := 0
	size := 0
	have := 0

	for pipe.queueSize > 0 {
		have = l - offset
		if have < 3 {
			if have != 0 {
				copy(pipe.readBuf[0:have], pipe.readBuf[offset:offset+have])
			}
			l, err = conn.Read(pipe.readBuf[have:])
			if err != nil {
				for i := 0; i < pipe.queueSize; i++ {
					pipe.Result <- Res{
						Slice: nil,
						Err:   err,
					}
				}
				pipe.queueSize = 0
				return
			}
			offset = 0
			l += have
		}

		size = int(fastbytes.B2U16(pipe.readBuf[offset : offset+2]))
		offset += 2
		if size == 65535 {
			size = int(fastbytes.B2U32(pipe.readBuf[offset : offset+4]))
			offset += 4
		}

		have = l - offset
		if have < size {
			copy(pipe.readBuf[:have], pipe.readBuf[offset:offset+have])
			l, err = conn.Read(pipe.readBuf[have:])
			if err != nil {
				for i := 0; i < pipe.queueSize; i++ {
					pipe.Result <- Res{
						Slice: nil,
						Err:   err,
					}
				}
				pipe.queueSize = 0
				return
			}
			offset = 0
			l += have
		}

		pipe.Result <- Res{
			Slice: pipe.readBuf[offset : offset+size],
			Err:   nil,
		}
		offset += size

		pipe.queueSize--
	}
}


Но ведёт она себя очень странно. Пока мы не идём на второй круг чтения (нам хватило содержимого буфера), код работает всегда правильно. Затем я нагружаю сервер, чтобы тот возвращал огромные сообщения, которые не могут быть прочитаны сразу. А дальше происходит что-то странное. Первые несколько чтений (точного числа не нашёл, но меньше 10) всё работает правильно, но затем offset становится больше l (выскакивает паника на строкеcopy(pipe.readBuf[0:have], pipe.readBuf[offset:offset+have]) с сообщением panic: runtime error: slice bounds out of range [:-390]). Я признаюсь честно, что не смог применить инструменты debug для решения этой проблемы и был вынужден полагаться на множество log.Println. Из них я получил информацию, что после n чтений offset сдвигается на неизвестное число в любую сторону, что приводит к неправильному нахождению size, и offset становится больше l. Я не знаю, как отловить эту ошибку. Теперь приведу немного кода, чтобы кто-то мог повторить мой случай (но не сервер).

create Pipe

type Config struct {
	ConnPool           *sync.Pool
	MaxQueueSize       int
	MaxWriteBufferSize int
}

func NewPipe(cfg *Config) *Pipe {
	pipe := &Pipe{
		writeBuf:            make([]byte, 4, bufferSize),
		readBuf:             make([]byte, bufferSize),
		Result:              make(chan Res),
		queue:               make(chan []byte),
		maxQueueSize:        cfg.MaxQueueSize,
		connPool:            cfg.ConnPool,
		maxWriteBufferSize:  cfg.MaxWriteBufferSize,
		m:                   &sync.Mutex{},
		queueSize:           0,
		wasExecutedLastTime: false,
	}
	return pipe
}

func (pipe *Pipe) Start() {
	for {
		select {
		case data := <-pipe.queue:
			pipe.m.Lock()
			l := len(pipe.writeBuf) + len(data)
			if l > bufferSize || l > pipe.maxWriteBufferSize {
				pipe.ExecPipe()
			}

			pipe.writeBuf = append(pipe.writeBuf, data...)

			pipe.queueSize++
			if pipe.queueSize == pipe.maxQueueSize {
				pipe.ExecPipe()
			}

			pipe.m.Unlock()
		}
	}
}

func (pipe *Pipe) StartTimer() {
	for {
		time.Sleep(100 * time.Microsecond)
		pipe.m.Lock()
		if pipe.wasExecutedLastTime {
			pipe.wasExecutedLastTime = false
		} else {
			if pipe.queueSize != 0 {
				pipe.ExecPipe()
			}
		}
		pipe.m.Unlock()
	}
}



full ExecPipe

func (pipe *Pipe) ExecPipe() {
	pipe.wasExecutedLastTime = true
	writeBufLen := len(pipe.writeBuf)
	if writeBufLen == 0 {
		return
	}
	conn := pipe.connPool.Get().(net.Conn)
	defer pipe.connPool.Put(conn)
	_, err := conn.Write(pipe.writeBuf[:writeBufLen])
	pipe.writeBuf = pipe.writeBuf[:0]
	if err != nil {
		for i := 0; i < pipe.queueSize; i++ {
			pipe.Result <- Res{
				Slice: nil,
				Err:   err,
			}
		}
		pipe.queueSize = 0
		return
	}
	l, err := conn.Read(pipe.readBuf[:])
	if err != nil {
		for i := 0; i < pipe.queueSize; i++ {
			pipe.Result <- Res{
				Slice: nil,
				Err:   err,
			}
		}
		pipe.queueSize = 0
		return
	}
	offset := 0
	size := 0
	have := 0

	for pipe.queueSize > 0 {
		have = l - offset
		if have < 3 {
			if have != 0 {
				copy(pipe.readBuf[0:have], pipe.readBuf[offset:offset+have])
			}
			l, err = conn.Read(pipe.readBuf[have:])
			if err != nil {
				for i := 0; i < pipe.queueSize; i++ {
					pipe.Result <- Res{
						Slice: nil,
						Err:   err,
					}
				}
				pipe.queueSize = 0
				return
			}
			offset = 0
			l += have
		}

		size = int(fastbytes.B2U16(pipe.readBuf[offset : offset+2]))
		offset += 2
		if size == 65535 {
			size = int(fastbytes.B2U32(pipe.readBuf[offset : offset+4]))
			offset += 4
		}

		have = l - offset
		if have < size {
			copy(pipe.readBuf[:have], pipe.readBuf[offset:offset+have])
			l, err = conn.Read(pipe.readBuf[have:])
			if err != nil {
				for i := 0; i < pipe.queueSize; i++ {
					pipe.Result <- Res{
						Slice: nil,
						Err:   err,
					}
				}
				pipe.queueSize = 0
				return
			}
			offset = 0
			l += have
		}

		pipe.Result <- Res{
			Slice: pipe.readBuf[offset : offset+size],
			Err:   nil,
		}
		offset += size

		pipe.queueSize--
	}
}



get

func (pipe *Pipe) Get(key []byte, spaceId uint16) chan Res {
	length := uint16(len(key)) + 3
	slice := make([]byte, 0, length+2)
	slice = append(slice, byte(length), byte(length>>8), constants.Get, byte(spaceId), byte(spaceId>>8))
	slice = append(slice, key...)
	pipe.queue <- slice
	return pipe.Result
}



client

type Client struct {
	pool chan *pipe.Pipe
}

type Config struct {
	Host               string
	Port               string
	Par                int
	MaxQueueSize       int
	MaxWriteBufferSize int
}

func NewClient(cfg *Config) *Client {
	c := &Client{
		pool: make(chan *pipe.Pipe, cfg.Par),
	}

	connPool := &sync.Pool{
		New: func() interface{} {
			conn, err := net.Dial("tcp", fmt.Sprintf("%s:%s", cfg.Host, cfg.Port))
			if err != nil {
				log.Println("[NimbleDB] Error connecting to the database: ", err)
				return nil
			}
			return conn
		},
	}

	pipeCfg := &pipe.Config{
		ConnPool:           connPool,
		MaxQueueSize:       cfg.MaxQueueSize,
		MaxWriteBufferSize: cfg.MaxWriteBufferSize,
	}

	for i := 0; i < cfg.Par; i++ {
		pipeImpl := pipe.NewPipe(pipeCfg)
		go pipeImpl.Start()
		time.Sleep(17 * time.Microsecond)
		go pipeImpl.StartTimer()

		c.pool <- pipeImpl
	}

	return c
}

func (c *Client) CallFunc(f func(conn *pipe.Pipe) chan pipe.Res) ([]byte, error) {
	conn := <-c.pool
	ch := f(conn)
	c.pool <- conn
	var res pipe.Res
	select {
	case res = <-ch:
	case <-time.After(3 * time.Second):
		return nil, fmt.Errorf("timeout")
	}
	if res.Err != nil {
		return nil, res.Err
	}
	if len(res.Slice) < 1 {
		return nil, fmt.Errorf("empty response")
	}
	err := result.DefineError(res.Slice[0])
	if err != nil {
		return nil, err
	}
	return res.Slice[:], nil
}



client.Get

func (c *Client) Get(key []byte, spaceId uint16) result.Result[[]byte] {
	res, err := c.CallFunc(func(conn *pipe.Pipe) chan pipe.Res {
		return conn.Get(key, spaceId)
	})
	if err != nil || res[0] != constants.Done {
		return result.Result[[]byte]{
			Value: nil,
			Err:   err,
			IsOk:  false,
		}
	}

	return result.Result[[]byte]{
		Value: res[1:],
		Err:   nil,
		IsOk:  true,
	}
}



test

func macrosTest(name string, f func(threadNumber int)) {
	log.Println(name)
	wg := sync.WaitGroup{}
	wg.Add(PAR)
	for i := 0; i < PAR; i++ {
		go func(i int) {
			f(i)
			wg.Done()
		}(i)
	}
	wg.Wait()
}
func test() {
value := func() []byte {
		buf := make([]byte, SIZE)
		for i := 1; i != SIZE; i++ {
			buf[i-1] = uint8(i)
		}
		return buf
	}()
	for i := 0; i < N; i++ {
		keys = append(keys, fastbytes.S2B("key"+strconv.Itoa(i)))
		values = append(values, value)
	}

client := client2.NewClient(&client2.Config{
		Host:               HOST,
		Port:               PORT,
		Par:                1, 
		MaxQueueSize:       1024 * 64,
		MaxWriteBufferSize: 64 * 1024,
	})

macrosTest("Testing client get in one space...", func(threadNumber int) {
		for j := 0; j < COUNT; j++ {
			if client.Get(keys[threadNumber*COUNT+j], IDs[1]).Err != nil {
				errors.Add(1)
			}
		}
	})
}

  • Вопрос задан
  • 308 просмотров
Пригласить эксперта
Ответы на вопрос 1
@darst
Eugene-Usachev сделай проверку, что l - остаток сообщения не превышает количество нужного для чтения.
offset += size
//////////////////////////////////
здесь сделай проверку, что дочитал сообщение до конца
 и начинай читать новое
//////////////////////////////////
    pipe.queueSize--
  }
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы