@IdsaMikado

Как реализовать RPC на Masstransit?

Господа, ситуация следующая:
я пытаюсь отправить запрос на поиск данных из одного микросервиса в другой и ожидать от него ответ, но по итогу выдает сообщение о превышенным временем ожидания, вроде код правильный но в чем проблема понять не могу
микросервис поиска
startrup
services.AddMassTransit(x =>
            {
                x.AddRequestClient<SearchRequest>();
                x.UsingRabbitMq((context, cfg) =>
                {
                    var rabbitMqConfig = Configuration.GetSection("RabbitMQ");
                    cfg.Host(rabbitMqConfig["Hostname"], h =>
                    {
                        h.Username(rabbitMqConfig["Username"]);
                        h.Password(rabbitMqConfig["Password"]);
                    });
                    cfg.ConfigureEndpoints(context);
                });
            });

appsetings.json
"RabbitMQ": {
    "Hostname": "rabbitmq://localhost",
    "Username": "guest",
    "Password": "guest"
  },

SearchServices
public class SearchRequest
    {
        public string Text { get; set; }
    }

    public class SearchResponse
    {
        public List<object> Data { get; set; }
    }
    public class SearchServices
    {
        private readonly IRequestClient<SearchRequest> _requestClient;

        public SearchServices(IRequestClient<SearchRequest> requestClient)
        {
            _requestClient = requestClient;
        }

        public async Task<List<object>> SearchAsync(string text)
        {
            var searchRequest = new SearchRequest
            {
                Text = text
            };

            Log.Information("Отправка запроса на поиск: {Text}", searchRequest.Text);

            var response = await _requestClient.GetResponse<SearchResponse>(searchRequest);

            Log.Information("Получен ответ на запрос поиска");

            var searchResponse = response.Message;

            return searchResponse.Data;
        }

    }

controller
public class SearchController : ControllerBase
    {
        private readonly SearchServices _searchService;
        private readonly ILogger<SearchController> _logger;

        public SearchController(SearchServices searchService, ILogger<SearchController> logger)
        {
            _searchService = searchService;
            _logger = logger;
        }

        [HttpPost("search")]
        public async Task<IActionResult> SearchAsync(string text)
        {
            _logger.LogInformation("Получен запрос на поиск");
            var searchResults = await _searchService.SearchAsync(text);

            if (searchResults.Count == 0)
            {
                _logger.LogInformation("Результаты поиска не найдены");
                return BadRequest(new { message = "Пользователи или товары не найдены" });
            }

            _logger.LogInformation("Возвращены результаты поиска для текста");
            return Ok(searchResults);
        }
    }

микросервис пользователей
startup
services.AddMassTransit(config =>
            {
                config.AddConsumer<SearchRequestHandler>();
                config.UsingRabbitMq((context, cfg) =>
                {
                    var rabbitMqConfig = Configuration.GetSection("RabbitMQ");
                    cfg.Host(new Uri(rabbitMqConfig["Hostname"]), h =>
                    {
                        h.Username(rabbitMqConfig["Username"]);
                        h.Password(rabbitMqConfig["Password"]);
                    });
                    cfg.ConfigureEndpoints(context);
                });
            });

SearchRequestHandler
public class SearchRequest
    {
        public string Text { get; set; }
    }

    public class SearchResponse
    {
        public List<object> Data { get; set; }
    }

    public class SearchRequestHandler : IConsumer<SearchRequest>
    {
        private readonly AddDbContext _dbContext;
        private readonly ILogger<SearchRequestHandler> _logger;

        public SearchRequestHandler(AddDbContext dbContext, ILogger<SearchRequestHandler> logger)
        {
            _dbContext = dbContext;
            _logger = logger;
        }

        public async Task Consume(ConsumeContext<SearchRequest> context)
        {
            var searchRequest = context.Message;

            _logger.LogInformation("Получен запрос на поиск: {Text}", searchRequest.Text);

            var searchResult = await PerformSearch(searchRequest.Text);

            _logger.LogInformation("Поиск завершен. Найдено {Count} результатов", searchResult.Count);

            var searchResponse = new SearchResponse { Data = searchResult };

            await context.RespondAsync(searchResponse);
        }

        private async Task<List<object>> PerformSearch(string searchText)
        {
            var users = await _dbContext.Users
                .Where(p => p.Username.Contains(searchText, StringComparison.OrdinalIgnoreCase))
                .ToListAsync();
            var results = users.Select(p => new { p.Username }).Cast<object>().ToList();
            return results;
        }
    }


При запуске проекта регаются 2 обменника и 1 очередь
spoiler

64d4d878ef9d7886048031.jpeg
64d4d882ac310374573665.jpeg
64d4d88f92a03040497886.jpeg

после отправки запроса появляются еще 2 обменника и 1 очередь
spoiler

64d4d8c9d48ec136069942.jpeg
64d4d8f5dfffa916037189.jpeg
64d4d8fcaeb29938960005.jpeg


Как я полагаю, второй обменник созданный после отправки запроса должен привязываться к очереди чтобы отправить запрос, т.к. судя по логам
spoiler

(SearchService)
Request starting HTTP/2 POST https://localhost:44327/search?text=sas - 0
[INF] Executing endpoint 'SearchService.Controllers.SearchController.SearchAsync
[INF] Route matched with {action = "Search", controller = "Search"}. Executing controller action with signature System.Threading.Tasks.Task`1[Microsoft.AspNetCore.Mvc.IActionResult] SearchAsync(System.String) on controller SearchService.Controllers.SearchController

2023-08-09 21:18:22.136 +04:00 [INF] Получен запрос на поиск
2023-08-09 21:18:22.138 +04:00 [INF] Отправка запроса на поиск: sas
[INF] Executed action SearchService.Controllers.SearchController.SearchAsync
[INF] Executed endpoint 'SearchService.Controllers.SearchController.SearchAsync
[ERR] An unhandled exception has occurred while executing the request.

отправка запроса
идет но до микросервиса не доходит, подскажите в чем дело а то я уже без понятия
  • Вопрос задан
  • 145 просмотров
Решения вопроса 1
@OwDafuq
MassTransit для запросов (через IRequestClient) создает себе временную очередь и привязывает её к такому-же обменнику для внутренних нужд, можете обратить внимание, что очередь exp, скорее всего, со временем в 60000 мсек (1 минута). В общем - это нормальное поведение для него, он таким образом определяет одного Producer'a от другого для запросов. Для публикации (publish) создание 2-х очередей тоже норма (например: Producer -> MyEntity (Exchange) -> MyEntityQueue (Exchange) -> MyEntityQueue (Queue) -> Consumer). С чем это конкретно связано - не помню, но в репозитории MassTransit говорили, что это абсолютно нормальное поведение (тоже задавался этим вопросом).

UPD: Вопрос решен через Телеграм, ошибка была в том, что человек создал 2 разных запроса с одинаковой структурой, consumer не мог обрабатывать запрос т.к. не совпадал namespace.
Ответ написан
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы
03 мар. 2024, в 11:02
30000 руб./за проект
03 мар. 2024, в 10:39
3000 руб./за проект
03 мар. 2024, в 10:05
5000 руб./за проект