Maksclub
@Maksclub
maksfedorov.ru

Как оповестить пачку горутин о завершении работы?

Запущено 20 горутин такой функции.

func (w *Writer) Work(ctx context.Context) {
	// start

	f, err := os.Create("./tmp/" + filename)
	if err != nil {
		panic(err)
	}

	defer f.Close()


	// Запись шапки в файл

	for {
		select {
		case <-ctx.Done():
			// Как  выполнить тут работу в конце работы горутины? сейчас не работает
		case it := <-*w.Channel:
			// Запись элементов в файл
		}
	}

	// Или может тут  выполнить  работу в конце работы горутины ? тоже не работает
}


В цикле я каждой из них (через канал для каждой, 20 горутин — значит 20 каналов) отправляю сообщения, а каждая пишет в свой файл

ctx := context.Background()
	ctx, cancel := context.WithCancel(ctx)

defer func() {
		fmt.Println("Finish items loading!") // это сообщеие выводится
		cancel()
	}()

// тут стартанули нашу пачку горутин
 
	for {
		res, err := esScroller.Do(ctx)
		
		for _, hit := range res.Hits.Hits {

    		// ... билд item

    		//  отправляем во все каналы элемент
    		for _, w := range f.Writers {
        		*w.Channel <- item
   		 }
	    }
}


Когда закончились сообщения — как завершить все и проделать в каждой из них полезную финальную работу?
  • Вопрос задан
  • 378 просмотров
Решения вопроса 2
EvgenyMamonov
@EvgenyMamonov Куратор тега Go
Senior software developer, system architect
Попробуйте вот так
package main

import (
    "context"
    "fmt"
    "sync"
)

func worker(ctx context.Context, worderID int, data chan int, wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Printf("worker %d started\n", worderID)
    for {
        fmt.Printf("worker %d enter for\n", worderID)
        select {
        case <-ctx.Done():
            fmt.Printf("worker %d cancelled\n", worderID)
            return
        case v, ok := <-data:
            fmt.Printf("worker %d got data: %v, ok: %v\n", worderID, v, ok)
            if !ok {
                fmt.Printf("worker %d data channel was closed\n", worderID)
                return
            }
        }
    }
}

func main() {
    var wg sync.WaitGroup
    ctx, cancel := context.WithCancel(context.Background())
    channels := make([]chan int, 10)

    for i := 0; i < 10; i++ {
        wg.Add(1)
        channels[i] = make(chan int)
        go worker(ctx, i, channels[i], &wg)
    }

    for i := 0; i < 10; i++ {
        channels[i] <- i
    }

    cancel()
    wg.Wait()
}
Ответ написан
Maksclub
@Maksclub Автор вопроса
maksfedorov.ru
Сработал такой вариант со SO с применением еще канала, который не позволит завершить выполнение main()

func main() {
    forever := make(chan struct{})
    ctx, cancel := context.WithCancel(context.Background())

    go func(ctx context.Context) {
        for {
            select {
            case <-ctx.Done():  // if cancel() execute
                forever <- struct{}{}
                return
            default:
                fmt.Println("for loop")
            }

            time.Sleep(500 * time.Millisecond)
        }
    }(ctx)

    go func() {
        time.Sleep(3 * time.Second)
        cancel()
    }()

    <-forever
    fmt.Println("finish")
}


Src: https://stackoverflow.com/questions/6807590/how-to...

Вариант 2: Можно через WaitGroup завершить или через отдельную горутину с буферезированным каналом (на число горутин-воркеров)
Ответ написан
Комментировать
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы