Как читать stdout и stderr из процесса запущенного через cmd.Command?

Привет, надо читать stdout и stderr из процессов запущенных самим Go через cmd.Command. В качестве примера запускаю обычные go программы, которые с интервалом 2 секунды пишут в stdout:

Код длинный, но он простой (process.go):
spoiler
package main

import (
	"flag"
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"
)

func main() {
	processId := getProcessIdent()

	welcomeMsg(processId)

	toStdout := makeStdoutWriter(processId)
	sig := makeNotifier()
loop:
	for {
		select {
		case <-sig:
			break loop
		case <-time.After(time.Second * 2):
			toStdout()
		}
	}

	fmt.Println("Good Luck")
}

func welcomeMsg(processId int) {
	fmt.Println(fmt.Sprintf("Welcome to Process %d", processId))
}

func getProcessIdent() int {
	var processId int
	flag.IntVar(&processId, "id", 999, "number of lines to read from the file")
	flag.Parse()

	return processId
}

func makeNotifier() <-chan os.Signal {
	sig := make(chan os.Signal)
	signal.Notify(sig, os.Interrupt, syscall.SIGTERM)

	return sig
}

func makeStdoutWriter(processId int) func() {
	write := func(processId int) {
		fmt.Fprintln(os.Stdout, fmt.Sprintf("Process work - %d", processId))
	}

	return func() {
		write(processId)
		write(processId)
		write(processId)
	}
}


Далее запускаю child process и устанавливаю io.Pipe для выводов: (часть main.go)
type Executor struct {
	cmd *exec.Cmd
	r   *io.PipeReader
	w   *io.PipeWriter
}

func executeAnotherGo(id int) (*Executor, error) {
	args := []string{
		"run", "./process.go", "-id", strconv.Itoa(id),
	}

	r, w := io.Pipe()

	cmd := exec.Command("go", args...)
	cmd.Stdout = w
	cmd.Stderr = w

	if err := cmd.Start(); err != nil {
		return nil, err
	}

	return &Executor{cmd: cmd, r: r, w: w}, nil
}


Ну и основной метод, который слушает stdout процесса и пишет в соответствующий файл (для примера) (часть main.go)
func perform(ctx context.Context, id int) {
	executor, err := executeAnotherGo(id)

	if err != nil {
		log.Fatal("Error execute process ", id, "cause: ", err)
	}

	writer, err := os.OpenFile(fmt.Sprintf("./process_%d.log", id), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)

	if err != nil {
		fmt.Println(err)
		return
	}

	localContext, cancel := context.WithCancel(ctx)

	defer func() {
		// close pipe writer
		if err := executor.w.Close(); err != nil {
			fmt.Println(err)
		}

		// try kill process
		if err := executor.cmd.Process.Kill(); err != nil {
			fmt.Println(id, executor.cmd.Process.Pid, err)
		}

		cancel()
	}()

	buf := make([]byte, 1024)
	ffmpegOutput := make(chan []string)
	processKilled := make(chan error, 1)

	// Runs a separate sub-thread, because when running in a single thread,
	// there is a lock while waiting for the buffer to be read.
	// In turn blocking by the reader will not allow the background task to finish gracefully
	go func() {
		for {
			count, err := executor.r.Read(buf)
			if err != nil {
				fmt.Println("Close reader, cause: ", err)
				return
			}

			buf = buf[:count]
			str := string(buf)

			parts := strings.Split(strings.TrimSpace(str), "\n")

			if len(parts) <= 1 {
				continue
			}

			ffmpegOutput <- parts

			fmt.Println(str)
		}
	}()

	// We listen to the process termination signal,
	// this will provide an opportunity to remove the task from the pool and restart it if necessary
	//
	// Note: We listen to the context so as not to leave active goroutines when the task is completed
	go func() {
		select {
		case processKilled <- executor.cmd.Wait():
			return
		case <-localContext.Done():
			return
		}
	}()

loop:
	for {
		select {
		case <-localContext.Done():
			fmt.Println("Cancel process: ", id)
			break loop
		case err := <-processKilled:
			fmt.Println("Killed", id, executor.cmd.Process.Pid, err)
			break loop
		case outPartials := <-ffmpegOutput:
			if _, err := writer.WriteString(strings.Join(outPartials, "\n")); err != nil {
				fmt.Println(err)
			}
		}
	}
}


Вызываю три child process:

func main() {
	fmt.Println("Run program, wait processes")

	sig := make(chan os.Signal)
	signal.Notify(sig, os.Interrupt, syscall.SIGTERM)

	ctx, cancel := context.WithCancel(context.Background())

	go perform(ctx, 1)
	go perform(ctx, 2)
	go perform(ctx, 3)

	<-sig

	// cancel all sub process
	cancel()

	// wait all canceled
	<-time.After(time.Second * 2)

	fmt.Println("graceful exit")
}


Оригинальный процесс пишет в stdout (если запустить отдельно руками так и будет:

Welcome to Process 3
Process work - 3
Process work - 3
Process work - 3
Process work - 3
Process work - 3
Process work - 3


НО, если запустить через cmd.Command() и слушать через io.Pipe получается вообще не разбериха (вывод в спойлере):

spoiler
Process work - 1
Proc
ess work - 1
Process
Process work - 3
Proc
ess work - 3
Process
Process work - 2
Proc
ess work - 2
Process
ork - 2
P
rk - 2
Pr
ork - 3
P
rk - 3
Pr
ork - 1
P
rk - 1
Pr
- 1
P
- 1
Pr
- 2
P
- 2
Pr
- 3
P
- 3
Pr
3
P
2
P
1
P
1
P
3
P
2

P


В какой-то момент он вообще может перестать что-то выводить, а иногда может и во все "заснуть", что stdout не закрывается даже после завершения оригинального (своего) процесса.. А это важно, т.к. если процесс завершен нужно "уничтожить" горутину и все его наследники

Если вместо пайпа использовать обычный os.Stdout - все выводится корректно, но понятно что уже в стандартный stdout

r, w := io.Pipe()

cmd := exec.Command("go", args...)

// заменяем
// cmd.Stdout = w
// cmd.Stderr = w

// на
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr


Помогите, пожалуйста, уже мозг сломал, что не так?

UPD: если читать через буфер

bufioReader := bufio.NewReader(executor.r)

for {
	line, _, err := bufioReader.ReadLine()


то вывод корректный, и я понял что косякнул

strings.Join(outPartials, "\n") - убрать новую строку


теперь вывод такой:

Welcome to Process 2Process work - 2Process work - 2Processwork - 2Process work - 2Process work - 2Process work - 2Process work- 2Processwork- 2Processwork -2

т.е. сообщение приходит по частям..
  • Вопрос задан
  • 671 просмотр
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы