Как вывести данные из бесконечного subprocess в python?

Столкнулся с библиотекой subprocess.

У меня есть код ,который должен после получения необходимых данных работать с ними и выдавать результат.
По типу
// Получаем данные ( a = 1 , b = 2 ) #  через subprocess
c = a + b
print(c)


НО:
subprocess подключается к shell скрипту , а тот в свою очередь запускает другой python код с определенными параметрами. В этом коде есть Библиотека Pika , с которой я тоже раньше не работал.

То есть: Я запускаю свой код , он запускает другой код через subprocess ( т.к. в другой код нужно передать определенные параметры , и передает эти параметры sh скрипт ) , и этот "другой код" начинает свою работу.
Суть другого кода - через библиотеку pika подключить к серверу , с которого и берутся столь необходимые для моего кода данные , с которыми я буду дальше работать.

Вот этот "другой" код :
# -*- coding: utf-8 -*-
# Version: 1.02
# Required: pip install pika

import pika
import argparse
import sys

DEFAULT_URL= **нужный мне сервер**
DEFAULT_ROUTING_KEY='#'

# some defaults for Musson server
DEFAULT_TYPE='topic'
DEFAULT_QUEUE_NAME=''
DEFAULT_DURABLE=True
DEFAULT_AUTODELETE=True

def create_parser():
    parser = argparse.ArgumentParser()
    parser.add_argument('-a', '--address', help='amqp server address (default: ' + DEFAULT_URL + ')', default=DEFAULT_URL)
    parser.add_argument('-e', '--exchange', help='exchange name', required=True)
    parser.add_argument('-r', '--routing-key', help='routing key (default: ' + DEFAULT_ROUTING_KEY +')', default=DEFAULT_ROUTING_KEY)
    return parser

def msg_callback(ch, method, properties, body):
    if properties.content_type == 'DataMsg struct':
        print()
        print('Headers: {}, routing key: {}'.format(properties.headers, method.routing_key))
        print('MsgData: {}'.format(body.decode("utf-8")))
        sys.stdout.flush()

def main():
    parser = create_parser()
    args = parser.parse_args(sys.argv[1:] if len(sys.argv) > 1 else ['-h'])

    print('Server address:', args.address)
    print('Exchange name:', args.exchange)
    print('Routing key:', args.routing_key)
    print('-' * 80, '\n[*] Waiting for messages. To exit press CTRL+C')
    sys.stdout.flush()

    params = pika.connection.URLParameters(args.address)
    connection = pika.BlockingConnection(params)
    channel = connection.channel()
    channel.exchange_declare(args.exchange, exchange_type=DEFAULT_TYPE, durable=DEFAULT_DURABLE)
    channel.queue_declare(queue=DEFAULT_QUEUE_NAME, auto_delete=DEFAULT_AUTODELETE)
    channel.queue_bind(DEFAULT_QUEUE_NAME, args.exchange, routing_key=args.routing_key)
    channel.basic_consume(queue=DEFAULT_QUEUE_NAME, auto_ack=True, on_message_callback=msg_callback)

    try:
        channel.start_consuming()
    except:
        pass

if __name__ == '__main__':
    main()


А вот тот shell скрипт:
python3 dump-data-messages.py -a **нужный мне сервер** -e AIS -r "#"


И в результате выполнения всей этой последовательной цепочки моя программа останавливается на моменте подключения через subprocess , и начинает выводить все данные , которые получает с сервера. И данные будут генерироваться вечно , пока я не остановлю программу ( даже если в случае , если данных нет , она просто будет висеть и ждать , когда они появятся )

То есть
# Программа стартует
// Получаем данные ( a = 1 , b = 2 ) # Начинаем получать данные через subprocess
# И всё! Код не идет дальше вниз , потому что данные продолжат выводиться в консоль.
Нельзя будет сделать   c = a + b
print(c)


subprocess я пробовал подключать разными способами:
subprocess.call(['Путь до скрипта.sh'])
или
proc = subprocess.Popen('Путь до скрипта.sh', stdout=subprocess.PIPE)
output = proc.stdout.read()
print output

В любом из способов ввиду постоянной генерации данных с сервера нельзя было продвинуться дальше по коду , они принимаются без остановки.

Мне понравился способ
f = open("text.txt", "w")
subprocess.call(['Путь до скрипта.sh'], stdout=f)

Который вносит все данные в текстовый документ , откуда я уже смогу их считать построчно.

Однако , если в итоге я хочу реализовать такой метод , то часть кода с получением данных через subprocess придется засунуть в другой поток , который не будет мешать основному потоку , а после на основном потоке обращаться к создаваемому txt файлу , чтобы считывать данные.

Пожалуйста , подскажите балбесу , как можно засунуть весь этот subprocess в другое измерение , не затрагивая при этом многопоточность ? Мне кажется , что это немного радикальное решение.

Может быть в библиотеке subprocess есть функция , позволяющая запускать shell скрипт на какое-то время , чтобы я к примеру в течении 10 секунд получал данные с сервера , а после отключался от него и начинал с ними работать? (Она есть , но я не смог ее применить к моему скрипту)

Или может дело в библиотеке Pika , которую использует другой код ?

Весь вечер пытался найти решение , но ничего дельного создать не смог.
Просто хочется иметь доступ к серверу лишь на какое-то время , а не запускать subprocess , который вечно будет принимать данные и не давать мне их обрабатывать.

Спасибо!
  • Вопрос задан
  • 154 просмотра
Решения вопроса 1
Vindicar
@Vindicar
RTFM!
output = proc.stdout.read()
У read() есть параметр, сколько максимум данных принимать. Просто принимай данные в буфер по 1 КБ (или сколько удобно), режь на строки по разделителю (\n), обрабатывай принятые полные строки, но последнюю неполную строку сохраняй и добавляй в начало следующей порции.
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы