Задать вопрос
@unique_nickname

Как правильно взаимодействовать с каналами?

Задача следующая, в файле лежит много ссылок, мне нужно проверить их доступность.

Для этого создаю n воркеров, которые должны стукнуть в ссылку, записать результат и вызвать нового воркера, так пока ссылки не закончатся. Но есть проблема, ссылки добавляются в канал медленнее, чем потребляются воркерами. Не очень хочется выгружать весь файл в оперативку, можно ли решить этот вопрос без предварительной загрузки всего файла в канал?

Сейчас есть вот такой код, воркер вместо запроса просто выводит в консоль ссылку:

func main() {
	urls := make(chan string)
	go fillChannel(urls)

	for i := 0; i < 5; i++ {
		go requestWorker(urls)
	}

	time.Sleep(time.Second * 30)
}


func requestWorker(channel chan string) {
	if len(channel) <= 0 {
		println("Worker stopped")
		return
	}
	url := <-channel

	println(url)

	go requestWorker(channel)
}

func fillChannel(channel chan string) {
	file, err := os.Open("data.txt")
	defer file.Close()

	if err != nil {
		fmt.Println(err)
	}

	fileScanner := bufio.NewScanner(file)
	fileScanner.Split(bufio.ScanLines)

	for fileScanner.Scan() {
		channel <- fileScanner.Text()
	}
}
  • Вопрос задан
  • 78 просмотров
Подписаться 1 Простой Комментировать
Решения вопроса 1
То, что в канал данные пишутся медленнее, чем читаются, не должно вызывать никаких проблем при правильном подходе. У вас просто очень странно написан воркер, он на каждое событие запускает горутину и почему-то останавливается, если буфер канала пустой. Воркера надо по-хорошему останавливать, когда канал закрыт, а не пуст.

Лучше сделать так
package main

import (
	"bufio"
	"fmt"
	"os"
	"sync"
)

func main() {
	urls := make(chan string)
	go fillChannel(urls)

	// создаем группу для ожидания, того, что все воркеры завершены
	wg := &sync.WaitGroup{}

	for i := 0; i < 5; i++ {
		// при запуске каждого воркера, увеличиваем счетчик в группе на 1
		wg.Add(1)
		go requestWorker(urls, wg)
	}

	// ждем, пока счетчик в группе не будет равен 0
	wg.Wait()
}

func requestWorker(channel <-chan string, wg *sync.WaitGroup) {
	// По завершении воркера счетчик в группе будет уменьшен на 1
	defer wg.Done()
	// Заодно пишем сообщение о завершении воркера
	defer println("Worker stopped")

	// Постоянно читаем из канала новые сообщения
	// цикл автоматически завершится, когда канал закроется и буфер будет пуст
	for url := range channel {
		println(url)
	}
}

func fillChannel(channel chan<- string) {
	file, err := os.Open("data.txt")
	defer file.Close()

	if err != nil {
		fmt.Println(err)
		return
	}

	fileScanner := bufio.NewScanner(file)
	fileScanner.Split(bufio.ScanLines)

	for fileScanner.Scan() {
		channel <- fileScanner.Text()
	}

	// закрываем канал, когда данные кончились
	// в го принято, чтобы канал закрывал только тот, кто в него пишет
	close(channel)
}



Этот паттерн называется worker pool. Мы пишем в канал все нужные данные и закрываем канал, когда данные кончились. Благодаря тому, что воркеры читают из канала через range, цикл просто выходит, когда канал закрыт и воркеры завершаются.
WaitGroup используется для того, чтобы подождать, пока воркеры доработают последние данные.
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы