Я написал http хендлер, который принимает запрос с WAV. Моя задача передать аудио в потоке на бекенд в формате, там оно декодируется в псм, то есть просто удаляется заголовок ваф, и ресемплируется. Дальше мне нужно принять ответ с бекенда в виде канала, ресемплировать в исходных sample rate и приделать ваф заголовок. И отправить http ответ.
Я написал тесты с моками для проверки того, что на бекенд отправляется запрос. И принимается ответ с бекенда.
У меня проблема с пайплайнами. Горутины возвращают ошибку context cancelled. То есть, как я понимаю, это не DeadlineExceed. Это именно где-то context отменяется. Я этого нигде не делаю. Как я могу определить где это происходит?
Я пробовал увеличить таймауты, но это не помогает.
Это мой тест с моком
const (
serveEndTimeout = time.Millisecond * 300
)
type errResponse struct {
Code int `json:"status"`
Message string `json:"message"`
}
func TestHandler_handleDenoise(t *testing.T) {
log.Init(log.Debug)
logger := log.Get()
mtr := &metrics.Metrics{}
mtr.Init(nil)
tests := []struct {
name string
cfg *api.Config
contentType string
userID string
requestID string
audioSource io.Reader
expectedBackendRequest *denoiser.Request
backendResponses []*denoiser.Response
expectedStatusCode int
expectedErrResp string
}{
{
name: "correct request",
cfg: &api.Config{
ReadTimeout: types.NewDuration("1s"),
WriteTimeout: types.NewDuration("1s"),
HTTPRequestMaxSize: types.NewUnit("1MiB"),
},
contentType: "audio/x-wav",
userID: "user",
requestID: "bar",
audioSource: bytes.NewReader(append(audio.GenerateHeaderWav(audio.WavPCM, 1, 14000, 2, 16), []byte("1234567")...)),
expectedStatusCode: http.StatusOK,
expectedBackendRequest: &denoiser.Request{
RequestID: "bar",
AudioEncoding: audio.PCM_s16le,
SampleRate: 14000,
TargetSampleRate: 48000,
},
backendResponses: []*denoiser.Response{
{Audio: []byte("1234567")},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.TODO()
serveHTTPDone := make(chan struct{})
denoiserMock := makeMockDenoiser(ctx, tt.expectedBackendRequest, tt.backendResponses)
a := acl.New(&acl.Config{Users: map[string]acl.UserConfig{
"user": {},
"userBlocked": {},
}})
h := NewHandler(tt.cfg, logger, mtr, denoiserMock, a)
serv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
r = r.WithContext(util.SetRequestUserID(r.Context(), tt.userID))
r = r.WithContext(util.SetRequestID(r.Context(), tt.requestID))
h.ServeHTTP(w, r)
close(serveHTTPDone)
}))
defer serv.Close()
u, err := url.Parse(serv.URL)
require.Nil(t, err)
u.Path = httpPathPattern
resp, err := http.Post(u.String(), tt.contentType, tt.audioSource)
require.Nil(t, err)
defer func() {
assert.Nil(t, resp.Body.Close())
}()
require.Equal(t, tt.expectedStatusCode, resp.StatusCode)
if tt.expectedErrResp != "" {
var errResp errResponse
decoder := json.NewDecoder(resp.Body)
if err := decoder.Decode(&errResp); err != nil && err != io.EOF {
t.Errorf("parsing response")
}
fmt.Printf("%s\n\n", errResp.Message)
require.Equal(t, tt.expectedErrResp, errResp.Message)
require.Equal(t, tt.expectedStatusCode, errResp.Code)
}
select {
case <-serveHTTPDone:
case <-time.After(serveEndTimeout):
t.Fatal("client request serving didn't end")
}
})
}
}
func newResponsesChannel(ctx context.Context, responses []*denoiser.Response) <-chan *denoiser.Response {
ch := make(chan *denoiser.Response)
go func() {
defer close(ch)
for _, response := range responses {
select {
case <-ctx.Done():
fmt.Printf("new Responce context\n")
return
case ch <- response:
}
// time.Sleep(30 * time.Millisecond)
}
}()
return ch
}
func makeMockDenoiser(ctx context.Context, expectedRequest *denoiser.Request, responses []*denoiser.Response) *mockDenoiser {
m := &mockDenoiser{}
if expectedRequest != nil {
m.On("Denoise", mock.Anything, mock.MatchedBy(func(actualRequest *denoiser.Request) bool {
return *expectedRequest == *actualRequest
}), mock.Anything).Return(newResponsesChannel(ctx, responses)).Once()
}
return m
}
Это мой хендлер
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx = log.ExtendCtx(ctx, "handler", "denoise")
(hidden block)
h.logger.Infof(ctx, "Handling request: %+v", req)
g, gCtx := errgroup.WithContext(ctx)
audioChunks := make(chan []byte)
responsesCh := denoise(ctx, h.backend.Denoise(gCtx, req, audioChunks), req)
var speechSize int
g.Go(func() error {
defer close(audioChunks)
var err error
speechSize, err = h.processIncomingAudioChunks(gCtx, r, audioChunks)
return err
})
g.Go(func() error {
return h.processBackendResponses(gCtx, w, responsesCh)
})
err = g.Wait() // err always "context cancelled"
if err != nil {
h.handleDenoiseError(ctx, err, w)
return
} else {
h.logger.Info(ctx, "Finished request successfully")
h.metrics.SuccessfulRequests.With(h.metricLabels).Inc()
}
billingCtx := util.ContextWithDenoiseBilling(ctx, speechSize)
h.logger.Info(billingCtx, "Billing info")
}
Это denoise() Где происходит принятие ответа с бекенда и ресемплируется ответ в исходный семпл рейт и приделывается заголовок
// resemple to source sampleRate and add WAV header.
func denoise( //nolint: funlen, gocognit
ctx context.Context, resp <-chan *denoiser.Response, req *denoiser.Request,
) <-chan *denoiser.Response {
response := make(chan *denoiser.Response)
decodedAudio := make(chan []byte)
rawData := make(chan []byte)
ok := true
eg, gCtx := errgroup.WithContext(ctx)
eg.Go(func() error {
decodedResults := audio.Decode(
gCtx,
rawData,
audio.FromEncoding(audio.PCM_s16le),
audio.FromSampleRate(denoiser.NativeSampleRate),
audio.ToSampleRate(req.SampleRate),
audio.TargetChannels(1),
)
var (
res *audio.Result
ok bool
)
for {
select {
case <-gCtx.Done():
return gCtx.Err()
case res, ok = <-decodedResults:
}
if !ok {
break
}
if res.Err != nil {
return &denoiser.AudioDecoderError{}
}
select {
case <-gCtx.Done():
return gCtx.Err()
case decodedAudio <- res.Data[0]:
}
}
close(decodedAudio)
return nil
})
data := &denoiser.Response{}
eg.Go(func() error {
for {
data = &denoiser.Response{}
select {
case <-gCtx.Done():
return gCtx.Err()
case data, ok = <-resp:
}
if ok {
rawData <- data.Audio
} else {
close(rawData)
return nil
}
}
})
eg.Go(func() error {
chunk := new(denoiser.Response)
// header generating
chunk.Audio = audio.GenerateHeaderWav(audio.WavPCM, 1, uint32(req.SampleRate), 2, 16)
response <- chunk
for {
chunk = new(denoiser.Response)
select {
case <-gCtx.Done():
return gCtx.Err()
case chunk.Audio, ok = <-decodedAudio:
}
if !ok {
return nil
}
response <- chunk
}
})
go func() {
if err := eg.Wait(); err != nil { // !
select {
case <-ctx.Done():
case response <- &denoiser.Response{Err: err}:
}
return
}
close(response)
}()
return response
}
Вот мой processIncomingAudioChunks()
func (h *Handler) processIncomingAudioChunks(
ctx context.Context, r *http.Request, audio chan<- []byte,
) (int, error) {
var err error
n := 0
speechSize := 0
for {
audioChunk := make([]byte, chunkSize)
err = contextutil.DoWithTimeout(ctx, func() error {
var err error
n, err = r.Body.Read(audioChunk)
return err
}, h.cfg.ReadTimeout.ToNative())
if err == io.EOF {
if n == 0 {
return speechSize, nil
}
} else if err != nil {
return speechSize, err
}
if n > 0 {
select {
case <-ctx.Done():
return speechSize, ctx.Err()
case audio <- audioChunk[:n]:
speechSize += n
}
}
}
}
Я застрял с этой проблемой(