@lazari24

Почему не флашатся сущности обрабатываемые в Symfony Messenger?

Всем привет. У меня возникла проблема при работе с очередями в симфони. Для очерей используется связка Symfony Messenger + RabbitMQ.
Суть вопроса:
bus:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                serializer: messenger.transport.symfony_serializer
                options:
                    exchange:
                        name: bus
                        type: direct
                        default_publish_routing_key: create
                    queues:
                        create:
                            binding_keys: [ create ]
                        update:
                            binding_keys: [ update ]


На один консьюмер есть 2 очереди - создание и обновление сущности. Проблема заключается в том, что если я вначале создам сущность persist + flush (Тут все ок, в базу сохраняется), а потом прилетит сообщение на редактирование этой же сущности, я поменяю ей пару полей и после флаша изменения в базу не сохраняются. Однако если в обработчике этого же сообщения вызвать вызвать репозиторий, то значение берет уже верное, значит EntityManager все же видит изменения и сохраняет их у себя в памяти. Но при флаше в БД не пишет.
Если перезапустить консьюмер, то сообщение на редактирование отрабатывают правильно.
Я попробовал написать свой милдвейр для очиски EM но это не помогло.

class DoctrineEntityManagerClearMiddleware implements MiddlewareInterface
{
    private ManagerRegistry $managerRegistry;
    private ?string $entityManagerName;

    public function __construct(ManagerRegistry $managerRegistry, string $entityManagerName = null)
    {
        $this->managerRegistry = $managerRegistry;
        $this->entityManagerName = $entityManagerName;
    }

    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        try {
            $entityManager = $this->managerRegistry->getManager($this->entityManagerName);
        } catch (\InvalidArgumentException $e) {
            throw new UnrecoverableMessageHandlingException($e->getMessage(), 0, $e);
        }

        $entityManager->clear();

        return $stack->next()->handle($envelope, $stack);
    }
}


buses:
            command.bus:
                middleware:
                    - doctrine_ping_connection
                    - doctrine_close_connection
                    - 'App\Middleware\DoctrineEntityManagerClearMiddleware'


Прошу помочь разобраться с этой проблемкой :)
  • Вопрос задан
  • 199 просмотров
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы