@Drottarutarnum
Любопытный любитель

Как сделать фильтр для подпискиков на очередь в MassTransit rabbitmq?

У меня есть воркеры, это сервера с GPU для StableDefussion и есть публикатор, это бот который дает задание на генерацию картинок

Дело в том, что у каждого сервера с SD есть свой набор моделей, и мне надо чтобы они забирали из общей очереди только те задания, которые они могут выполнить

т.е. при публикации задания надо как-то указать, что это будет генерироваться на условной модели для аниме, и только воркеры у которых есть эта модель возьмут задание

С помощью гугления и ChatGPT я выяснил, что это можно как-то сделать с помощью Headers, но то, что я сделал не хочет работать (учитывая мой опыт работы с MassTransit не более суток) - воркеры принимают все задания несмотря ни на что

Мой упрощенный код сейчас

Publisher:
internal class Program {
    static async Task Main(string[] args) {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => {
            cfg.Host(new Uri(ServerConfig.Host), h => {
                h.Username(ServerConfig.Username);
                h.Password(ServerConfig.Password);
            });

            cfg.ReceiveEndpoint("calculation_results_queue", e => {
                e.Consumer(() => new ResultConsumer());
            });
        });
        await busControl.StartAsync();

        var taskEndpoint = await busControl.GetSendEndpoint(new Uri("queue:calculation_task_queue"));
        await taskEndpoint.Send(new stabletestTask {
            TaskId = NewId.NextGuid().ToString(),
            Data = "Important data for calculation"
        }, context => {
            context.Headers.Set("model", "model_comic");
        });

        Console.WriteLine("Press any key to exit");
        await Task.Run(() => Console.ReadKey());
        await busControl.StopAsync();
    }
}


Worker:
internal class Program {
    static async Task Main(string[] args) {
        var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => {
            cfg.Host(new Uri(ServerConfig.Host), h => {
                h.Username(ServerConfig.Username);
                h.Password(ServerConfig.Password);
            });

            cfg.ReceiveEndpoint("calculation_task_queue", e => {
                e.Bind("calculation_model_real_exchange", x => {
                    x.ExchangeType = "headers";
                    x.SetExchangeArgument("x-match", "any");
                    x.SetExchangeArgument("model", "model_real");
                });

                e.Bind("calculation_model_anime_exchange", x => {
                    x.ExchangeType = "headers";
                    x.SetExchangeArgument("x-match", "any");
                    x.SetExchangeArgument("model", "model_anime");
                });

                e.Consumer(() => new CalculationTaskConsumer());
            });
        });

        await busControl.StartAsync();

        Console.WriteLine("Press any key to exit");
        await Task.Run(() => Console.ReadKey());
        await busControl.StopAsync();
    }
}
  • Вопрос задан
  • 60 просмотров
Пригласить эксперта
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Войти через центр авторизации
Похожие вопросы