@nakem

Как понять причину отмененного контекста?

Я написал http хендлер, который принимает запрос с WAV. Моя задача передать аудио в потоке на бекенд в формате, там оно декодируется в псм, то есть просто удаляется заголовок ваф, и ресемплируется. Дальше мне нужно принять ответ с бекенда в виде канала, ресемплировать в исходных sample rate и приделать ваф заголовок. И отправить http ответ.
Я написал тесты с моками для проверки того, что на бекенд отправляется запрос. И принимается ответ с бекенда.
У меня проблема с пайплайнами. Горутины возвращают ошибку context cancelled. То есть, как я понимаю, это не DeadlineExceed. Это именно где-то context отменяется. Я этого нигде не делаю. Как я могу определить где это происходит?
Я пробовал увеличить таймауты, но это не помогает.
Это мой тест с моком

const (
    serveEndTimeout = time.Millisecond * 300
)

type errResponse struct {
    Code    int    `json:"status"`
    Message string `json:"message"`
}

func TestHandler_handleDenoise(t *testing.T) {
    log.Init(log.Debug)
    logger := log.Get()
    mtr := &metrics.Metrics{}
    mtr.Init(nil)

    tests := []struct {
        name        string
        cfg         *api.Config
        contentType string
        userID      string
        requestID   string
        audioSource io.Reader

        expectedBackendRequest *denoiser.Request
        backendResponses       []*denoiser.Response

        expectedStatusCode int
        expectedErrResp    string
    }{
        {
            name: "correct request",
            cfg: &api.Config{
                ReadTimeout:        types.NewDuration("1s"),
                WriteTimeout:       types.NewDuration("1s"),
                HTTPRequestMaxSize: types.NewUnit("1MiB"),
            },
            contentType:        "audio/x-wav",
            userID:             "user",
            requestID:          "bar",
            audioSource:        bytes.NewReader(append(audio.GenerateHeaderWav(audio.WavPCM, 1, 14000, 2, 16), []byte("1234567")...)),
            expectedStatusCode: http.StatusOK,
            expectedBackendRequest: &denoiser.Request{
                RequestID:        "bar",
                AudioEncoding:    audio.PCM_s16le,
                SampleRate:       14000,
                TargetSampleRate: 48000,
            },
            backendResponses: []*denoiser.Response{
                {Audio: []byte("1234567")},
            },
        },
    }

    for _, tt := range tests {
        t.Run(tt.name, func(t *testing.T) {
            ctx := context.TODO()
            serveHTTPDone := make(chan struct{})

            denoiserMock := makeMockDenoiser(ctx, tt.expectedBackendRequest, tt.backendResponses)
            a := acl.New(&acl.Config{Users: map[string]acl.UserConfig{
                "user":        {},
                "userBlocked": {},
            }})
            h := NewHandler(tt.cfg, logger, mtr, denoiserMock, a)
            serv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                r = r.WithContext(util.SetRequestUserID(r.Context(), tt.userID))
                r = r.WithContext(util.SetRequestID(r.Context(), tt.requestID))
                h.ServeHTTP(w, r)
                close(serveHTTPDone)
            }))
            defer serv.Close()

            u, err := url.Parse(serv.URL)
            require.Nil(t, err)
            u.Path = httpPathPattern
            resp, err := http.Post(u.String(), tt.contentType, tt.audioSource)
            require.Nil(t, err)
            defer func() {
                assert.Nil(t, resp.Body.Close())
            }()

            require.Equal(t, tt.expectedStatusCode, resp.StatusCode)
            if tt.expectedErrResp != "" {
                var errResp errResponse
                decoder := json.NewDecoder(resp.Body)
                if err := decoder.Decode(&errResp); err != nil && err != io.EOF {
                    t.Errorf("parsing response")
                }
                fmt.Printf("%s\n\n", errResp.Message)
                require.Equal(t, tt.expectedErrResp, errResp.Message)
                require.Equal(t, tt.expectedStatusCode, errResp.Code)
            }

            select {
            case <-serveHTTPDone:
            case <-time.After(serveEndTimeout):
                t.Fatal("client request serving didn't end")
            }
        })
    }
}

func newResponsesChannel(ctx context.Context, responses []*denoiser.Response) <-chan *denoiser.Response {
    ch := make(chan *denoiser.Response)
    go func() {
        defer close(ch)
        for _, response := range responses {
            select {
            case <-ctx.Done():
                fmt.Printf("new Responce context\n")
                return
            case ch <- response:
            }
            // time.Sleep(30 * time.Millisecond)
        }
    }()

    return ch
}

func makeMockDenoiser(ctx context.Context, expectedRequest *denoiser.Request, responses []*denoiser.Response) *mockDenoiser {
    m := &mockDenoiser{}
    if expectedRequest != nil {
        m.On("Denoise", mock.Anything, mock.MatchedBy(func(actualRequest *denoiser.Request) bool {
            return *expectedRequest == *actualRequest
        }), mock.Anything).Return(newResponsesChannel(ctx, responses)).Once()
    }
    return m
}


Это мой хендлер

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context()
    ctx = log.ExtendCtx(ctx, "handler", "denoise")

    (hidden block)

    h.logger.Infof(ctx, "Handling request: %+v", req)

    g, gCtx := errgroup.WithContext(ctx)
    audioChunks := make(chan []byte)
    responsesCh := denoise(ctx, h.backend.Denoise(gCtx, req, audioChunks), req)

    var speechSize int
    g.Go(func() error {
        defer close(audioChunks)
        var err error
        speechSize, err = h.processIncomingAudioChunks(gCtx, r, audioChunks)
        return err
    })
    g.Go(func() error {
        return h.processBackendResponses(gCtx, w, responsesCh)
    })
    err = g.Wait() // err always "context cancelled"
    if err != nil {
        h.handleDenoiseError(ctx, err, w)
        return
    } else {
        h.logger.Info(ctx, "Finished request successfully")
        h.metrics.SuccessfulRequests.With(h.metricLabels).Inc()
    }

    billingCtx := util.ContextWithDenoiseBilling(ctx, speechSize)
    h.logger.Info(billingCtx, "Billing info")
}


Это denoise() Где происходит принятие ответа с бекенда и ресемплируется ответ в исходный семпл рейт и приделывается заголовок

// resemple to source sampleRate and add WAV header.
func denoise( //nolint: funlen, gocognit
	ctx context.Context, resp <-chan *denoiser.Response, req *denoiser.Request,
) <-chan *denoiser.Response {
	response := make(chan *denoiser.Response)

	decodedAudio := make(chan []byte)
	rawData := make(chan []byte)
	ok := true
	eg, gCtx := errgroup.WithContext(ctx)

	eg.Go(func() error {
		decodedResults := audio.Decode(
			gCtx,
			rawData,
			audio.FromEncoding(audio.PCM_s16le),
			audio.FromSampleRate(denoiser.NativeSampleRate),
			audio.ToSampleRate(req.SampleRate),
			audio.TargetChannels(1),
		)
		var (
			res *audio.Result
			ok  bool
		)

		for {
			select {
			case <-gCtx.Done():
				return gCtx.Err()
			case res, ok = <-decodedResults:
			}

			if !ok {
				break
			}

			if res.Err != nil {
				return &denoiser.AudioDecoderError{}
			}

			select {
			case <-gCtx.Done():
				return gCtx.Err()
			case decodedAudio <- res.Data[0]:
			}
		}
		close(decodedAudio)
		return nil
	})

	data := &denoiser.Response{}
	eg.Go(func() error {
		for {
			data = &denoiser.Response{}
			select {
			case <-gCtx.Done():
				return gCtx.Err()
			case data, ok = <-resp:
			}
			if ok {
				rawData <- data.Audio
			} else {
				close(rawData)
				return nil
			}
		}
	})
	eg.Go(func() error {
		chunk := new(denoiser.Response)

		// header generating
		chunk.Audio = audio.GenerateHeaderWav(audio.WavPCM, 1, uint32(req.SampleRate), 2, 16)
		response <- chunk

		for {
			chunk = new(denoiser.Response)
			select {
			case <-gCtx.Done():
				return gCtx.Err()
			case chunk.Audio, ok = <-decodedAudio:
			}
			if !ok {
				return nil
			}
			response <- chunk
		}
	})

	go func() {
		if err := eg.Wait(); err != nil { // !
			select {
			case <-ctx.Done():
			case response <- &denoiser.Response{Err: err}:
			}
			return
		}
		close(response)
	}()
	return response
}


Вот мой processIncomingAudioChunks()
func (h *Handler) processIncomingAudioChunks(
	ctx context.Context, r *http.Request, audio chan<- []byte,
) (int, error) {
	var err error
	n := 0
	speechSize := 0

	for {
		audioChunk := make([]byte, chunkSize)
		err = contextutil.DoWithTimeout(ctx, func() error {
			var err error
			n, err = r.Body.Read(audioChunk)
			return err
		}, h.cfg.ReadTimeout.ToNative())

		if err == io.EOF {
			if n == 0 {
				return speechSize, nil
			}
		} else if err != nil {
			return speechSize, err
		}

		if n > 0 {
			select {
			case <-ctx.Done():
				return speechSize, ctx.Err()
			case audio <- audioChunk[:n]:
				speechSize += n
			}
		}
	}
}


Я застрял с этой проблемой(
  • Вопрос задан
  • 275 просмотров
Пригласить эксперта
Ответы на вопрос 1
@nakem Автор вопроса
Решил проблему! Все дело в автосгенерированном моке. В моке бекенда `h.backend.Denoise()` не было чтения из канала audioChunks. Поэтому происходил дедлок и соответствующая отмена контекста. А если добавить буфер в канал, то программа не застревала. Для меня был главный вопрос почему программа возвращала StatusOK если проблемы с чтением из канала, но я забыл, что я сам добавил возврат слайс байт из мока, поэтому программа и отрабатывала. Спасибо всем, кто хотел помочь!
Ответ написан
Комментировать
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы