@IdsaMikado

Как реализовать RPC на Masstransit?

Господа, ситуация следующая:
я пытаюсь отправить запрос на поиск данных из одного микросервиса в другой и ожидать от него ответ, но по итогу выдает сообщение о превышенным временем ожидания, вроде код правильный но в чем проблема понять не могу
микросервис поиска
startrup
services.AddMassTransit(x =>
            {
                x.AddRequestClient<SearchRequest>();
                x.UsingRabbitMq((context, cfg) =>
                {
                    var rabbitMqConfig = Configuration.GetSection("RabbitMQ");
                    cfg.Host(rabbitMqConfig["Hostname"], h =>
                    {
                        h.Username(rabbitMqConfig["Username"]);
                        h.Password(rabbitMqConfig["Password"]);
                    });
                    cfg.ConfigureEndpoints(context);
                });
            });

appsetings.json
"RabbitMQ": {
    "Hostname": "rabbitmq://localhost",
    "Username": "guest",
    "Password": "guest"
  },

SearchServices
public class SearchRequest
    {
        public string Text { get; set; }
    }

    public class SearchResponse
    {
        public List<object> Data { get; set; }
    }
    public class SearchServices
    {
        private readonly IRequestClient<SearchRequest> _requestClient;

        public SearchServices(IRequestClient<SearchRequest> requestClient)
        {
            _requestClient = requestClient;
        }

        public async Task<List<object>> SearchAsync(string text)
        {
            var searchRequest = new SearchRequest
            {
                Text = text
            };

            Log.Information("Отправка запроса на поиск: {Text}", searchRequest.Text);

            var response = await _requestClient.GetResponse<SearchResponse>(searchRequest);

            Log.Information("Получен ответ на запрос поиска");

            var searchResponse = response.Message;

            return searchResponse.Data;
        }

    }

controller
public class SearchController : ControllerBase
    {
        private readonly SearchServices _searchService;
        private readonly ILogger<SearchController> _logger;

        public SearchController(SearchServices searchService, ILogger<SearchController> logger)
        {
            _searchService = searchService;
            _logger = logger;
        }

        [HttpPost("search")]
        public async Task<IActionResult> SearchAsync(string text)
        {
            _logger.LogInformation("Получен запрос на поиск");
            var searchResults = await _searchService.SearchAsync(text);

            if (searchResults.Count == 0)
            {
                _logger.LogInformation("Результаты поиска не найдены");
                return BadRequest(new { message = "Пользователи или товары не найдены" });
            }

            _logger.LogInformation("Возвращены результаты поиска для текста");
            return Ok(searchResults);
        }
    }

микросервис пользователей
startup
services.AddMassTransit(config =>
            {
                config.AddConsumer<SearchRequestHandler>();
                config.UsingRabbitMq((context, cfg) =>
                {
                    var rabbitMqConfig = Configuration.GetSection("RabbitMQ");
                    cfg.Host(new Uri(rabbitMqConfig["Hostname"]), h =>
                    {
                        h.Username(rabbitMqConfig["Username"]);
                        h.Password(rabbitMqConfig["Password"]);
                    });
                    cfg.ConfigureEndpoints(context);
                });
            });

SearchRequestHandler
public class SearchRequest
    {
        public string Text { get; set; }
    }

    public class SearchResponse
    {
        public List<object> Data { get; set; }
    }

    public class SearchRequestHandler : IConsumer<SearchRequest>
    {
        private readonly AddDbContext _dbContext;
        private readonly ILogger<SearchRequestHandler> _logger;

        public SearchRequestHandler(AddDbContext dbContext, ILogger<SearchRequestHandler> logger)
        {
            _dbContext = dbContext;
            _logger = logger;
        }

        public async Task Consume(ConsumeContext<SearchRequest> context)
        {
            var searchRequest = context.Message;

            _logger.LogInformation("Получен запрос на поиск: {Text}", searchRequest.Text);

            var searchResult = await PerformSearch(searchRequest.Text);

            _logger.LogInformation("Поиск завершен. Найдено {Count} результатов", searchResult.Count);

            var searchResponse = new SearchResponse { Data = searchResult };

            await context.RespondAsync(searchResponse);
        }

        private async Task<List<object>> PerformSearch(string searchText)
        {
            var users = await _dbContext.Users
                .Where(p => p.Username.Contains(searchText, StringComparison.OrdinalIgnoreCase))
                .ToListAsync();
            var results = users.Select(p => new { p.Username }).Cast<object>().ToList();
            return results;
        }
    }


При запуске проекта регаются 2 обменника и 1 очередь
spoiler

64d4d878ef9d7886048031.jpeg
64d4d882ac310374573665.jpeg
64d4d88f92a03040497886.jpeg

после отправки запроса появляются еще 2 обменника и 1 очередь
spoiler

64d4d8c9d48ec136069942.jpeg
64d4d8f5dfffa916037189.jpeg
64d4d8fcaeb29938960005.jpeg


Как я полагаю, второй обменник созданный после отправки запроса должен привязываться к очереди чтобы отправить запрос, т.к. судя по логам
spoiler

(SearchService)
Request starting HTTP/2 POST https://localhost:44327/search?text=sas - 0
[INF] Executing endpoint 'SearchService.Controllers.SearchController.SearchAsync
[INF] Route matched with {action = "Search", controller = "Search"}. Executing controller action with signature System.Threading.Tasks.Task`1[Microsoft.AspNetCore.Mvc.IActionResult] SearchAsync(System.String) on controller SearchService.Controllers.SearchController

2023-08-09 21:18:22.136 +04:00 [INF] Получен запрос на поиск
2023-08-09 21:18:22.138 +04:00 [INF] Отправка запроса на поиск: sas
[INF] Executed action SearchService.Controllers.SearchController.SearchAsync
[INF] Executed endpoint 'SearchService.Controllers.SearchController.SearchAsync
[ERR] An unhandled exception has occurred while executing the request.

отправка запроса
идет но до микросервиса не доходит, подскажите в чем дело а то я уже без понятия
  • Вопрос задан
  • 169 просмотров
Решения вопроса 1
@OwDafuq
MassTransit для запросов (через IRequestClient) создает себе временную очередь и привязывает её к такому-же обменнику для внутренних нужд, можете обратить внимание, что очередь exp, скорее всего, со временем в 60000 мсек (1 минута). В общем - это нормальное поведение для него, он таким образом определяет одного Producer'a от другого для запросов. Для публикации (publish) создание 2-х очередей тоже норма (например: Producer -> MyEntity (Exchange) -> MyEntityQueue (Exchange) -> MyEntityQueue (Queue) -> Consumer). С чем это конкретно связано - не помню, но в репозитории MassTransit говорили, что это абсолютно нормальное поведение (тоже задавался этим вопросом).

UPD: Вопрос решен через Телеграм, ошибка была в том, что человек создал 2 разных запроса с одинаковой структурой, consumer не мог обрабатывать запрос т.к. не совпадал namespace.
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы