@RizoKadiev

Как установить соединение RabbitMQ, чтобы сообщения по очередям расскидывались?

Допустим у нас есть домены ['ozon', 'yandex']. Я хочу чтобы в зависимости от домена, сообщения расскидывались по нужным очередям. К примеру если домен ozon то очередь - > queue_name = f'{domain}-card_offers-{self.task_id}'. Вместо domain оzon подставляется - > queue_name = ozon-card_offers-{self.task_id}. Помогите пожалуйста, или наведите меня на верное решение, в rabbitMQ создаются очереди дефолтные странными названиями и залетают такие сообщения:

{"task_id": "3c651433-4713-4afb-b926-baaa9fe96e0b", "status": "SUCCESS", "result": null, "traceback": null, "children": []}

class RabbitmqConnectionService:
    def __init__(self, task_id: int, included_domains: set) -> None:
        self.task_id = task_id
        self.included_domains = included_domains
        self.connection = Connection(CONNECTION_URL)
        self.connection.connect()

        self.exchange = Exchange('parsing', type='direct')
        self.queues = {}
        for included_domain in included_domains:
            queue_name = '{}-card_offers-{}'.format(included_domain, task_id)

            queue = Queue(queue_name, exchange=self.exchange,
                          routing_key=queue_name,
                          queue_arguments={
                              'x-expires': 60 * 1000,  # 1 min. ((6 * 60 * 60 * 1000) 6 hours in milliseconds)
                              'x-expires-type': 'direct'
                          })
            queue.declare()
            self.queues[included_domain] = queue

        self.producer = self.connection.Producer(exchange=self.exchange)

    def send_data_to_queue(self, data: dict) -> None:
        domain = data['domain']
        if domain in self.included_domains:
            queue_name = f'{domain}-card_offers-{self.task_id}'
            message = {'task_name': 'offers.tasks.handle_products', 'args': [data]}
            try:
                self.producer.publish(
                    json.dumps(message),
                    exchange='parsing',
                    routing_key=queue_name,
                    content_type='application/json',
                    headers={'id': self.task_id, 'task': message['task_name']},
                    declare = [self.queues[domain].queue_declare(nowait=True, passive=False,
                                                             arguments={'x-expires': None})],
                )
            except OperationalError as exc:
                print(f"Failed to send data to offers-api: {exc}")
        else:
            print(f"Domain '{domain}' is not in the list of included domains.")
  • Вопрос задан
  • 106 просмотров
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы