func (w *Writer) Work(ctx context.Context) {
// start
f, err := os.Create("./tmp/" + filename)
if err != nil {
panic(err)
}
defer f.Close()
// Запись шапки в файл
for {
select {
case <-ctx.Done():
// Как выполнить тут работу в конце работы горутины? сейчас не работает
case it := <-*w.Channel:
// Запись элементов в файл
}
}
// Или может тут выполнить работу в конце работы горутины ? тоже не работает
}
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer func() {
fmt.Println("Finish items loading!") // это сообщеие выводится
cancel()
}()
// тут стартанули нашу пачку горутин
for {
res, err := esScroller.Do(ctx)
for _, hit := range res.Hits.Hits {
// ... билд item
// отправляем во все каналы элемент
for _, w := range f.Writers {
*w.Channel <- item
}
}
}
package main
import (
"context"
"fmt"
"sync"
)
func worker(ctx context.Context, worderID int, data chan int, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("worker %d started\n", worderID)
for {
fmt.Printf("worker %d enter for\n", worderID)
select {
case <-ctx.Done():
fmt.Printf("worker %d cancelled\n", worderID)
return
case v, ok := <-data:
fmt.Printf("worker %d got data: %v, ok: %v\n", worderID, v, ok)
if !ok {
fmt.Printf("worker %d data channel was closed\n", worderID)
return
}
}
}
}
func main() {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
channels := make([]chan int, 10)
for i := 0; i < 10; i++ {
wg.Add(1)
channels[i] = make(chan int)
go worker(ctx, i, channels[i], &wg)
}
for i := 0; i < 10; i++ {
channels[i] <- i
}
cancel()
wg.Wait()
}
func main() {
forever := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
for {
select {
case <-ctx.Done(): // if cancel() execute
forever <- struct{}{}
return
default:
fmt.Println("for loop")
}
time.Sleep(500 * time.Millisecond)
}
}(ctx)
go func() {
time.Sleep(3 * time.Second)
cancel()
}()
<-forever
fmt.Println("finish")
}