Есть некий сервер, который принимает по TCP запрос(несколько запросов в одном TCP сообщении) и возвращает ответ (столько же ответов в одном TCP сообщении). Сервер всегда возвращает ответы в том же порядке, что и запросы. И клиент и сервер имеют одинаковый способ обмена сообщениями: сначала идёт 2 байта, которые означают длину сообщения, затем сообщение данной длины. Если 2 байта заполнены, читаются следующие 4 байта. Я начал писать клиент к рабочему серверу (там точно не ошибок). Получилось, что клиент имеет sync.Pool от Pipe, где
type Pipe struct {
writeBuf []byte
connPool *sync.Pool
readBuf []byte
Result chan Res
queue chan []byte
queueSize int
maxQueueSize int
maxWriteBufferSize int
m *sync.Mutex
wasExecutedLastTime bool
}
Дальше я правильно заполняю Pipe и теперь мне надо выполнить этот Pipe. Я написал функцию (вырезка из кода):
func (pipe *Pipe) ExecPipe() {
// some write...
l, err := conn.Read(pipe.readBuf[:])
if err != nil {
for i := 0; i < pipe.queueSize; i++ {
pipe.Result <- Res{
Slice: nil,
Err: err,
}
}
pipe.queueSize = 0
return
}
offset := 0
size := 0
have := 0
for pipe.queueSize > 0 {
have = l - offset
if have < 3 {
if have != 0 {
copy(pipe.readBuf[0:have], pipe.readBuf[offset:offset+have])
}
l, err = conn.Read(pipe.readBuf[have:])
if err != nil {
for i := 0; i < pipe.queueSize; i++ {
pipe.Result <- Res{
Slice: nil,
Err: err,
}
}
pipe.queueSize = 0
return
}
offset = 0
l += have
}
size = int(fastbytes.B2U16(pipe.readBuf[offset : offset+2]))
offset += 2
if size == 65535 {
size = int(fastbytes.B2U32(pipe.readBuf[offset : offset+4]))
offset += 4
}
have = l - offset
if have < size {
copy(pipe.readBuf[:have], pipe.readBuf[offset:offset+have])
l, err = conn.Read(pipe.readBuf[have:])
if err != nil {
for i := 0; i < pipe.queueSize; i++ {
pipe.Result <- Res{
Slice: nil,
Err: err,
}
}
pipe.queueSize = 0
return
}
offset = 0
l += have
}
pipe.Result <- Res{
Slice: pipe.readBuf[offset : offset+size],
Err: nil,
}
offset += size
pipe.queueSize--
}
}
Но ведёт она себя очень странно. Пока мы не идём на второй круг чтения (нам хватило содержимого буфера), код работает всегда правильно. Затем я нагружаю сервер, чтобы тот возвращал огромные сообщения, которые не могут быть прочитаны сразу. А дальше происходит что-то странное. Первые несколько чтений (точного числа не нашёл, но меньше 10) всё работает правильно, но затем offset становится больше l (выскакивает паника на строкеcopy(pipe.readBuf[0:have], pipe.readBuf[offset:offset+have]) с сообщением panic: runtime error: slice bounds out of range [:-390]). Я признаюсь честно, что не смог применить инструменты debug для решения этой проблемы и был вынужден полагаться на множество log.Println. Из них я получил информацию, что после n чтений offset сдвигается на неизвестное число в любую сторону, что приводит к неправильному нахождению size, и offset становится больше l. Я не знаю, как отловить эту ошибку. Теперь приведу немного кода, чтобы кто-то мог повторить мой случай (но не сервер).
create Pipe
type Config struct {
ConnPool *sync.Pool
MaxQueueSize int
MaxWriteBufferSize int
}
func NewPipe(cfg *Config) *Pipe {
pipe := &Pipe{
writeBuf: make([]byte, 4, bufferSize),
readBuf: make([]byte, bufferSize),
Result: make(chan Res),
queue: make(chan []byte),
maxQueueSize: cfg.MaxQueueSize,
connPool: cfg.ConnPool,
maxWriteBufferSize: cfg.MaxWriteBufferSize,
m: &sync.Mutex{},
queueSize: 0,
wasExecutedLastTime: false,
}
return pipe
}
func (pipe *Pipe) Start() {
for {
select {
case data := <-pipe.queue:
pipe.m.Lock()
l := len(pipe.writeBuf) + len(data)
if l > bufferSize || l > pipe.maxWriteBufferSize {
pipe.ExecPipe()
}
pipe.writeBuf = append(pipe.writeBuf, data...)
pipe.queueSize++
if pipe.queueSize == pipe.maxQueueSize {
pipe.ExecPipe()
}
pipe.m.Unlock()
}
}
}
func (pipe *Pipe) StartTimer() {
for {
time.Sleep(100 * time.Microsecond)
pipe.m.Lock()
if pipe.wasExecutedLastTime {
pipe.wasExecutedLastTime = false
} else {
if pipe.queueSize != 0 {
pipe.ExecPipe()
}
}
pipe.m.Unlock()
}
}
full ExecPipe
func (pipe *Pipe) ExecPipe() {
pipe.wasExecutedLastTime = true
writeBufLen := len(pipe.writeBuf)
if writeBufLen == 0 {
return
}
conn := pipe.connPool.Get().(net.Conn)
defer pipe.connPool.Put(conn)
_, err := conn.Write(pipe.writeBuf[:writeBufLen])
pipe.writeBuf = pipe.writeBuf[:0]
if err != nil {
for i := 0; i < pipe.queueSize; i++ {
pipe.Result <- Res{
Slice: nil,
Err: err,
}
}
pipe.queueSize = 0
return
}
l, err := conn.Read(pipe.readBuf[:])
if err != nil {
for i := 0; i < pipe.queueSize; i++ {
pipe.Result <- Res{
Slice: nil,
Err: err,
}
}
pipe.queueSize = 0
return
}
offset := 0
size := 0
have := 0
for pipe.queueSize > 0 {
have = l - offset
if have < 3 {
if have != 0 {
copy(pipe.readBuf[0:have], pipe.readBuf[offset:offset+have])
}
l, err = conn.Read(pipe.readBuf[have:])
if err != nil {
for i := 0; i < pipe.queueSize; i++ {
pipe.Result <- Res{
Slice: nil,
Err: err,
}
}
pipe.queueSize = 0
return
}
offset = 0
l += have
}
size = int(fastbytes.B2U16(pipe.readBuf[offset : offset+2]))
offset += 2
if size == 65535 {
size = int(fastbytes.B2U32(pipe.readBuf[offset : offset+4]))
offset += 4
}
have = l - offset
if have < size {
copy(pipe.readBuf[:have], pipe.readBuf[offset:offset+have])
l, err = conn.Read(pipe.readBuf[have:])
if err != nil {
for i := 0; i < pipe.queueSize; i++ {
pipe.Result <- Res{
Slice: nil,
Err: err,
}
}
pipe.queueSize = 0
return
}
offset = 0
l += have
}
pipe.Result <- Res{
Slice: pipe.readBuf[offset : offset+size],
Err: nil,
}
offset += size
pipe.queueSize--
}
}
get
func (pipe *Pipe) Get(key []byte, spaceId uint16) chan Res {
length := uint16(len(key)) + 3
slice := make([]byte, 0, length+2)
slice = append(slice, byte(length), byte(length>>8), constants.Get, byte(spaceId), byte(spaceId>>8))
slice = append(slice, key...)
pipe.queue <- slice
return pipe.Result
}
client
type Client struct {
pool chan *pipe.Pipe
}
type Config struct {
Host string
Port string
Par int
MaxQueueSize int
MaxWriteBufferSize int
}
func NewClient(cfg *Config) *Client {
c := &Client{
pool: make(chan *pipe.Pipe, cfg.Par),
}
connPool := &sync.Pool{
New: func() interface{} {
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%s", cfg.Host, cfg.Port))
if err != nil {
log.Println("[NimbleDB] Error connecting to the database: ", err)
return nil
}
return conn
},
}
pipeCfg := &pipe.Config{
ConnPool: connPool,
MaxQueueSize: cfg.MaxQueueSize,
MaxWriteBufferSize: cfg.MaxWriteBufferSize,
}
for i := 0; i < cfg.Par; i++ {
pipeImpl := pipe.NewPipe(pipeCfg)
go pipeImpl.Start()
time.Sleep(17 * time.Microsecond)
go pipeImpl.StartTimer()
c.pool <- pipeImpl
}
return c
}
func (c *Client) CallFunc(f func(conn *pipe.Pipe) chan pipe.Res) ([]byte, error) {
conn := <-c.pool
ch := f(conn)
c.pool <- conn
var res pipe.Res
select {
case res = <-ch:
case <-time.After(3 * time.Second):
return nil, fmt.Errorf("timeout")
}
if res.Err != nil {
return nil, res.Err
}
if len(res.Slice) < 1 {
return nil, fmt.Errorf("empty response")
}
err := result.DefineError(res.Slice[0])
if err != nil {
return nil, err
}
return res.Slice[:], nil
}
client.Get
func (c *Client) Get(key []byte, spaceId uint16) result.Result[[]byte] {
res, err := c.CallFunc(func(conn *pipe.Pipe) chan pipe.Res {
return conn.Get(key, spaceId)
})
if err != nil || res[0] != constants.Done {
return result.Result[[]byte]{
Value: nil,
Err: err,
IsOk: false,
}
}
return result.Result[[]byte]{
Value: res[1:],
Err: nil,
IsOk: true,
}
}
test
func macrosTest(name string, f func(threadNumber int)) {
log.Println(name)
wg := sync.WaitGroup{}
wg.Add(PAR)
for i := 0; i < PAR; i++ {
go func(i int) {
f(i)
wg.Done()
}(i)
}
wg.Wait()
}
func test() {
value := func() []byte {
buf := make([]byte, SIZE)
for i := 1; i != SIZE; i++ {
buf[i-1] = uint8(i)
}
return buf
}()
for i := 0; i < N; i++ {
keys = append(keys, fastbytes.S2B("key"+strconv.Itoa(i)))
values = append(values, value)
}
client := client2.NewClient(&client2.Config{
Host: HOST,
Port: PORT,
Par: 1,
MaxQueueSize: 1024 * 64,
MaxWriteBufferSize: 64 * 1024,
})
macrosTest("Testing client get in one space...", func(threadNumber int) {
for j := 0; j < COUNT; j++ {
if client.Get(keys[threadNumber*COUNT+j], IDs[1]).Err != nil {
errors.Add(1)
}
}
})
}