Привет, надо читать stdout и stderr из процессов запущенных самим Go через cmd.Command. В качестве примера запускаю обычные go программы, которые с интервалом 2 секунды пишут в stdout:
Код длинный, но он простой (process.go):
spoilerpackage main
import (
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
processId := getProcessIdent()
welcomeMsg(processId)
toStdout := makeStdoutWriter(processId)
sig := makeNotifier()
loop:
for {
select {
case <-sig:
break loop
case <-time.After(time.Second * 2):
toStdout()
}
}
fmt.Println("Good Luck")
}
func welcomeMsg(processId int) {
fmt.Println(fmt.Sprintf("Welcome to Process %d", processId))
}
func getProcessIdent() int {
var processId int
flag.IntVar(&processId, "id", 999, "number of lines to read from the file")
flag.Parse()
return processId
}
func makeNotifier() <-chan os.Signal {
sig := make(chan os.Signal)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
return sig
}
func makeStdoutWriter(processId int) func() {
write := func(processId int) {
fmt.Fprintln(os.Stdout, fmt.Sprintf("Process work - %d", processId))
}
return func() {
write(processId)
write(processId)
write(processId)
}
}
Далее запускаю child process и устанавливаю io.Pipe для выводов: (часть main.go)
type Executor struct {
cmd *exec.Cmd
r *io.PipeReader
w *io.PipeWriter
}
func executeAnotherGo(id int) (*Executor, error) {
args := []string{
"run", "./process.go", "-id", strconv.Itoa(id),
}
r, w := io.Pipe()
cmd := exec.Command("go", args...)
cmd.Stdout = w
cmd.Stderr = w
if err := cmd.Start(); err != nil {
return nil, err
}
return &Executor{cmd: cmd, r: r, w: w}, nil
}
Ну и основной метод, который слушает stdout процесса и пишет в соответствующий файл (для примера) (часть main.go)
func perform(ctx context.Context, id int) {
executor, err := executeAnotherGo(id)
if err != nil {
log.Fatal("Error execute process ", id, "cause: ", err)
}
writer, err := os.OpenFile(fmt.Sprintf("./process_%d.log", id), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
fmt.Println(err)
return
}
localContext, cancel := context.WithCancel(ctx)
defer func() {
// close pipe writer
if err := executor.w.Close(); err != nil {
fmt.Println(err)
}
// try kill process
if err := executor.cmd.Process.Kill(); err != nil {
fmt.Println(id, executor.cmd.Process.Pid, err)
}
cancel()
}()
buf := make([]byte, 1024)
ffmpegOutput := make(chan []string)
processKilled := make(chan error, 1)
// Runs a separate sub-thread, because when running in a single thread,
// there is a lock while waiting for the buffer to be read.
// In turn blocking by the reader will not allow the background task to finish gracefully
go func() {
for {
count, err := executor.r.Read(buf)
if err != nil {
fmt.Println("Close reader, cause: ", err)
return
}
buf = buf[:count]
str := string(buf)
parts := strings.Split(strings.TrimSpace(str), "\n")
if len(parts) <= 1 {
continue
}
ffmpegOutput <- parts
fmt.Println(str)
}
}()
// We listen to the process termination signal,
// this will provide an opportunity to remove the task from the pool and restart it if necessary
//
// Note: We listen to the context so as not to leave active goroutines when the task is completed
go func() {
select {
case processKilled <- executor.cmd.Wait():
return
case <-localContext.Done():
return
}
}()
loop:
for {
select {
case <-localContext.Done():
fmt.Println("Cancel process: ", id)
break loop
case err := <-processKilled:
fmt.Println("Killed", id, executor.cmd.Process.Pid, err)
break loop
case outPartials := <-ffmpegOutput:
if _, err := writer.WriteString(strings.Join(outPartials, "\n")); err != nil {
fmt.Println(err)
}
}
}
}
Вызываю три child process:
func main() {
fmt.Println("Run program, wait processes")
sig := make(chan os.Signal)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
go perform(ctx, 1)
go perform(ctx, 2)
go perform(ctx, 3)
<-sig
// cancel all sub process
cancel()
// wait all canceled
<-time.After(time.Second * 2)
fmt.Println("graceful exit")
}
Оригинальный процесс пишет в stdout (если запустить отдельно руками так и будет:
Welcome to Process 3
Process work - 3
Process work - 3
Process work - 3
Process work - 3
Process work - 3
Process work - 3
НО, если запустить через cmd.Command() и слушать через io.Pipe получается вообще не разбериха (вывод в спойлере):
spoilerProcess work - 1
Proc
ess work - 1
Process
Process work - 3
Proc
ess work - 3
Process
Process work - 2
Proc
ess work - 2
Process
ork - 2
P
rk - 2
Pr
ork - 3
P
rk - 3
Pr
ork - 1
P
rk - 1
Pr
- 1
P
- 1
Pr
- 2
P
- 2
Pr
- 3
P
- 3
Pr
3
P
2
P
1
P
1
P
3
P
2
P
В какой-то момент он вообще может перестать что-то выводить, а иногда может и во все "заснуть", что stdout не закрывается даже после завершения оригинального (своего) процесса.. А это важно, т.к. если процесс завершен нужно "уничтожить" горутину и все его наследники
Если вместо пайпа использовать обычный os.Stdout - все выводится корректно, но понятно что уже в стандартный stdout
r, w := io.Pipe()
cmd := exec.Command("go", args...)
// заменяем
// cmd.Stdout = w
// cmd.Stderr = w
// на
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
Помогите, пожалуйста, уже мозг сломал, что не так?
UPD: если читать через буфер
bufioReader := bufio.NewReader(executor.r)
for {
line, _, err := bufioReader.ReadLine()
то вывод корректный, и я понял что косякнул
strings.Join(outPartials, "\n") - убрать новую строку
теперь вывод такой:
Welcome to Process 2Process work - 2Process work - 2Processwork - 2Process work - 2Process work - 2Process work - 2Process work- 2Processwork- 2Processwork -2
т.е. сообщение приходит по частям..