@unique_nickname

Как правильно взаимодействовать с каналами?

Задача следующая, в файле лежит много ссылок, мне нужно проверить их доступность.

Для этого создаю n воркеров, которые должны стукнуть в ссылку, записать результат и вызвать нового воркера, так пока ссылки не закончатся. Но есть проблема, ссылки добавляются в канал медленнее, чем потребляются воркерами. Не очень хочется выгружать весь файл в оперативку, можно ли решить этот вопрос без предварительной загрузки всего файла в канал?

Сейчас есть вот такой код, воркер вместо запроса просто выводит в консоль ссылку:

func main() {
	urls := make(chan string)
	go fillChannel(urls)

	for i := 0; i < 5; i++ {
		go requestWorker(urls)
	}

	time.Sleep(time.Second * 30)
}


func requestWorker(channel chan string) {
	if len(channel) <= 0 {
		println("Worker stopped")
		return
	}
	url := <-channel

	println(url)

	go requestWorker(channel)
}

func fillChannel(channel chan string) {
	file, err := os.Open("data.txt")
	defer file.Close()

	if err != nil {
		fmt.Println(err)
	}

	fileScanner := bufio.NewScanner(file)
	fileScanner.Split(bufio.ScanLines)

	for fileScanner.Scan() {
		channel <- fileScanner.Text()
	}
}
  • Вопрос задан
  • 78 просмотров
Решения вопроса 1
То, что в канал данные пишутся медленнее, чем читаются, не должно вызывать никаких проблем при правильном подходе. У вас просто очень странно написан воркер, он на каждое событие запускает горутину и почему-то останавливается, если буфер канала пустой. Воркера надо по-хорошему останавливать, когда канал закрыт, а не пуст.

Лучше сделать так
package main

import (
	"bufio"
	"fmt"
	"os"
	"sync"
)

func main() {
	urls := make(chan string)
	go fillChannel(urls)

	// создаем группу для ожидания, того, что все воркеры завершены
	wg := &sync.WaitGroup{}

	for i := 0; i < 5; i++ {
		// при запуске каждого воркера, увеличиваем счетчик в группе на 1
		wg.Add(1)
		go requestWorker(urls, wg)
	}

	// ждем, пока счетчик в группе не будет равен 0
	wg.Wait()
}

func requestWorker(channel <-chan string, wg *sync.WaitGroup) {
	// По завершении воркера счетчик в группе будет уменьшен на 1
	defer wg.Done()
	// Заодно пишем сообщение о завершении воркера
	defer println("Worker stopped")

	// Постоянно читаем из канала новые сообщения
	// цикл автоматически завершится, когда канал закроется и буфер будет пуст
	for url := range channel {
		println(url)
	}
}

func fillChannel(channel chan<- string) {
	file, err := os.Open("data.txt")
	defer file.Close()

	if err != nil {
		fmt.Println(err)
		return
	}

	fileScanner := bufio.NewScanner(file)
	fileScanner.Split(bufio.ScanLines)

	for fileScanner.Scan() {
		channel <- fileScanner.Text()
	}

	// закрываем канал, когда данные кончились
	// в го принято, чтобы канал закрывал только тот, кто в него пишет
	close(channel)
}



Этот паттерн называется worker pool. Мы пишем в канал все нужные данные и закрываем канал, когда данные кончились. Благодаря тому, что воркеры читают из канала через range, цикл просто выходит, когда канал закрыт и воркеры завершаются.
WaitGroup используется для того, чтобы подождать, пока воркеры доработают последние данные.
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы
19 апр. 2024, в 13:31
10000 руб./за проект
19 апр. 2024, в 13:12
35000 руб./за проект
19 апр. 2024, в 13:06
6000 руб./за проект