class ApplConfig(AppConfig):
name = 'main.appl'
def ready(self):
thr = Thread(name='mq_thread', target=self.run_mq)
thr.daemon = True
thr.start()
@classmethod
def run_mq(cls):
mq = RabbitMQBase()
mq.start_mq()
...
class RabbitMQBase(object):
def start_mq(self):
connection = self.init_connection()
channel = connection.channel()
channel.queue_declare(queue='queue')
channel.basic_consume(
queue=queue,
on_message_callback=self.callback,
auto_ack=True
)
channel.start_consuming()
channel.stop_consuming()
connection.close()
@classmethod
def init_connection(cls):
credentials = pika.PlainCredentials(
'USERNAME',
'PASSWORD'
)
parameters = pika.ConnectionParameters(
host='HOST',
port='PORT',
virtual_host='VIRTUAL_HOST',
credentials=credentials
)
return pika.BlockingConnection(parameters)
@classmethod
def make_message(cls, action: str, message: str) -> str:
return json.dumps({
'action': action,
'body': message
})
@staticmethod
def parse_body(body: str) -> dict:
return json.loads(body)
@classmethod
def dispatch(cls, data):
action = data['action']
body = data['body']
return process_methods[action].process(body)
def callback(self,
ch: BlockingChannel,
method: 'pika.spec.Basic.Deliver',
properties: 'pika.spec.BasicProperties',
body: str):
data = self.parse_body(body)
return self.dispatch(data)
class PingPongRMQ(RabbitMQBase):
@classmethod
def process(cls, body):
conn = cls.init_connection()
channel = conn.channel()
channel.queue_declare(queue=cls.ping_queue)
channel.basic_publish(exchange='',
routing_key=cls.ping_queue,
body=cls.make_message('pong', 'test'))
conn.close()
process_methods = {
'ping': PingPongRMQ,
}