Всем привет. У меня возникла проблема при работе с очередями в симфони. Для очерей используется связка Symfony Messenger + RabbitMQ.
Суть вопроса:
bus:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
serializer: messenger.transport.symfony_serializer
options:
exchange:
name: bus
type: direct
default_publish_routing_key: create
queues:
create:
binding_keys: [ create ]
update:
binding_keys: [ update ]
На один консьюмер есть 2 очереди - создание и обновление сущности. Проблема заключается в том, что если я вначале создам сущность persist + flush (Тут все ок, в базу сохраняется), а потом прилетит сообщение на редактирование этой же сущности, я поменяю ей пару полей и после флаша изменения в базу не сохраняются. Однако если в обработчике этого же сообщения вызвать вызвать репозиторий, то значение берет уже верное, значит EntityManager все же видит изменения и сохраняет их у себя в памяти. Но при флаше в БД не пишет.
Если перезапустить консьюмер, то сообщение на редактирование отрабатывают правильно.
Я попробовал написать свой милдвейр для очиски EM но это не помогло.
class DoctrineEntityManagerClearMiddleware implements MiddlewareInterface
{
private ManagerRegistry $managerRegistry;
private ?string $entityManagerName;
public function __construct(ManagerRegistry $managerRegistry, string $entityManagerName = null)
{
$this->managerRegistry = $managerRegistry;
$this->entityManagerName = $entityManagerName;
}
public function handle(Envelope $envelope, StackInterface $stack): Envelope
{
try {
$entityManager = $this->managerRegistry->getManager($this->entityManagerName);
} catch (\InvalidArgumentException $e) {
throw new UnrecoverableMessageHandlingException($e->getMessage(), 0, $e);
}
$entityManager->clear();
return $stack->next()->handle($envelope, $stack);
}
}
buses:
command.bus:
middleware:
- doctrine_ping_connection
- doctrine_close_connection
- 'App\Middleware\DoctrineEntityManagerClearMiddleware'
Прошу помочь разобраться с этой проблемкой :)