Сообщество IT-специалистов
Ответы на любые вопросы об IT
Профессиональное развитие в IT
Удаленная работа для IT-специалистов
class ApplConfig(AppConfig): name = 'main.appl' def ready(self): thr = Thread(name='mq_thread', target=self.run_mq) thr.daemon = True thr.start() @classmethod def run_mq(cls): mq = RabbitMQBase() mq.start_mq() ... class RabbitMQBase(object): def start_mq(self): connection = self.init_connection() channel = connection.channel() channel.queue_declare(queue='queue') channel.basic_consume( queue=queue, on_message_callback=self.callback, auto_ack=True ) channel.start_consuming() channel.stop_consuming() connection.close() @classmethod def init_connection(cls): credentials = pika.PlainCredentials( 'USERNAME', 'PASSWORD' ) parameters = pika.ConnectionParameters( host='HOST', port='PORT', virtual_host='VIRTUAL_HOST', credentials=credentials ) return pika.BlockingConnection(parameters) @classmethod def make_message(cls, action: str, message: str) -> str: return json.dumps({ 'action': action, 'body': message }) @staticmethod def parse_body(body: str) -> dict: return json.loads(body) @classmethod def dispatch(cls, data): action = data['action'] body = data['body'] return process_methods[action].process(body) def callback(self, ch: BlockingChannel, method: 'pika.spec.Basic.Deliver', properties: 'pika.spec.BasicProperties', body: str): data = self.parse_body(body) return self.dispatch(data) class PingPongRMQ(RabbitMQBase): @classmethod def process(cls, body): conn = cls.init_connection() channel = conn.channel() channel.queue_declare(queue=cls.ping_queue) channel.basic_publish(exchange='', routing_key=cls.ping_queue, body=cls.make_message('pong', 'test')) conn.close() process_methods = { 'ping': PingPongRMQ, }