Допустим у нас есть домены
['ozon', 'yandex']. Я хочу чтобы в зависимости от домена, сообщения расскидывались по нужным очередям. К примеру если домен
ozon то очередь - >
queue_name = f'{domain}-card_offers-{self.task_id}'. Вместо domain
оzon подставляется - >
queue_name = ozon-card_offers-{self.task_id}. Помогите пожалуйста, или наведите меня на верное решение, в rabbitMQ создаются очереди дефолтные странными названиями и залетают такие сообщения:
{"task_id": "3c651433-4713-4afb-b926-baaa9fe96e0b", "status": "SUCCESS", "result": null, "traceback": null, "children": []}
class RabbitmqConnectionService:
def __init__(self, task_id: int, included_domains: set) -> None:
self.task_id = task_id
self.included_domains = included_domains
self.connection = Connection(CONNECTION_URL)
self.connection.connect()
self.exchange = Exchange('parsing', type='direct')
self.queues = {}
for included_domain in included_domains:
queue_name = '{}-card_offers-{}'.format(included_domain, task_id)
queue = Queue(queue_name, exchange=self.exchange,
routing_key=queue_name,
queue_arguments={
'x-expires': 60 * 1000, # 1 min. ((6 * 60 * 60 * 1000) 6 hours in milliseconds)
'x-expires-type': 'direct'
})
queue.declare()
self.queues[included_domain] = queue
self.producer = self.connection.Producer(exchange=self.exchange)
def send_data_to_queue(self, data: dict) -> None:
domain = data['domain']
if domain in self.included_domains:
queue_name = f'{domain}-card_offers-{self.task_id}'
message = {'task_name': 'offers.tasks.handle_products', 'args': [data]}
try:
self.producer.publish(
json.dumps(message),
exchange='parsing',
routing_key=queue_name,
content_type='application/json',
headers={'id': self.task_id, 'task': message['task_name']},
declare = [self.queues[domain].queue_declare(nowait=True, passive=False,
arguments={'x-expires': None})],
)
except OperationalError as exc:
print(f"Failed to send data to offers-api: {exc}")
else:
print(f"Domain '{domain}' is not in the list of included domains.")