У меня есть воркеры, это сервера с GPU для StableDefussion и есть публикатор, это бот который дает задание на генерацию картинок
Дело в том, что у каждого сервера с SD есть свой набор моделей, и мне надо чтобы они забирали из
общей очереди только те задания, которые они могут выполнить
т.е. при публикации задания надо как-то указать, что это будет генерироваться на условной модели для аниме, и только воркеры у которых есть эта модель возьмут задание
С помощью гугления и ChatGPT я выяснил, что это можно как-то сделать с помощью Headers, но то, что я сделал не хочет работать (учитывая мой опыт работы с MassTransit не более суток) - воркеры принимают все задания несмотря ни на что
Мой упрощенный код сейчас
Publisher:
internal class Program {
static async Task Main(string[] args) {
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => {
cfg.Host(new Uri(ServerConfig.Host), h => {
h.Username(ServerConfig.Username);
h.Password(ServerConfig.Password);
});
cfg.ReceiveEndpoint("calculation_results_queue", e => {
e.Consumer(() => new ResultConsumer());
});
});
await busControl.StartAsync();
var taskEndpoint = await busControl.GetSendEndpoint(new Uri("queue:calculation_task_queue"));
await taskEndpoint.Send(new stabletestTask {
TaskId = NewId.NextGuid().ToString(),
Data = "Important data for calculation"
}, context => {
context.Headers.Set("model", "model_comic");
});
Console.WriteLine("Press any key to exit");
await Task.Run(() => Console.ReadKey());
await busControl.StopAsync();
}
}
Worker:
internal class Program {
static async Task Main(string[] args) {
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg => {
cfg.Host(new Uri(ServerConfig.Host), h => {
h.Username(ServerConfig.Username);
h.Password(ServerConfig.Password);
});
cfg.ReceiveEndpoint("calculation_task_queue", e => {
e.Bind("calculation_model_real_exchange", x => {
x.ExchangeType = "headers";
x.SetExchangeArgument("x-match", "any");
x.SetExchangeArgument("model", "model_real");
});
e.Bind("calculation_model_anime_exchange", x => {
x.ExchangeType = "headers";
x.SetExchangeArgument("x-match", "any");
x.SetExchangeArgument("model", "model_anime");
});
e.Consumer(() => new CalculationTaskConsumer());
});
});
await busControl.StartAsync();
Console.WriteLine("Press any key to exit");
await Task.Run(() => Console.ReadKey());
await busControl.StopAsync();
}
}