Есть код:
package main
import (
"context"
)
type sequenced interface {
getSequence() int
}
type fanInRecord[T sequenced] struct {
index int // порядковый номер горутины, из которой получено сообщение
data T // непосредственно данные
pause chan struct{} // канал для синхронизации
}
func processTempCh[T sequenced](
ctx context.Context,
inputChannelsNum int, // количество входных каналов
fanInCh <-chan fanInRecord[T], // временный канал с данными
) <-chan T {
// выходной канал с упорядоченными данными
outputCh := make(chan T)
go func() {
defer close(outputCh)
// порядковый номер очередного элемента
expected := 0
// буфер для ожидания элементов по количеству входных каналов
queuedData := make([]*fanInRecord[T], inputChannelsNum)
for in := range fanInCh {
// если получили элемент с номером, который ожидаем
if in.data.getSequence() == expected {
select {
// запишем элемент в выходной канал
case outputCh <- in.data:
in.pause <- struct{}{}
// инкремент номера очередного элемента
expected++
case <-ctx.Done():
return
}
} else {
// если НЕ получили элемент с номером, который ожидаем
// запишем элемент в буфер
in := in
queuedData[in.index] = &in
}
}
}()
return outputCh
}
И есть само условие задачи:
"Для функции processTempCh дополните функционал сканирования элементов из буфера queuedData. Если элемент queuedData[i] - очередной для записи в выходной канал, то должны быть реализованы следующие действия:
- запись в выходной канал queuedData[i]
- разблокировка исходного канала с помощью pause."
Я не понимаю как можно решить эту задачу. Можете помочь и желательно объяснить ?