Logika nazywania kolejek i exchange była dla mnie na początku mocno niezrozumiała. Nie mogłem też znaleźć w dokumentacji nic, co by to jasno wyjaśniało.
W tym tekście postaram się opisać kilka przykładów, które pokazują jak to działa.
Pierwszy przykład to najprostsza konfiguracja – aplikacja publikująca wiadomości, aplikacja konsumująca wiadomości i współdzielony projekt z definicją rekordów wiadomości.
Drugi przykład jest podobny, jednak brak jest wspólnego projektu, a każdy z programów ma własną definicje rekordów wiadomości.
Trzeci przykład pokazuje jak skonfigurować exchange i kolejki pozwalające na użycie większej ilości konsumentów.
Kod przykładów z tego wpisu znajduje się tutaj.
Przykład 1 – konfiguracja domyślna
Przykład składa się z aplikacji Publisher, aplikacji Consumer i trzech wiadomości – UserAdded, UserUpdated, UserDeleted. Konfiguracja minimalistyczna.
Konsument
builder.Services.AddMassTransit(configure =>
{
configure.AddConsumer<Consumers.UserAddedConsumer>();
configure.AddConsumer<Consumers.UserUpdatedConsumer>();
configure.AddConsumer<Consumers.UserDisabledConsumer>();
configure.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
Nadawca wiadomości
builder.Services.AddMassTransit(configuration =>
{
configuration.UsingRabbitMq((context, cfg) =>
{
});
});
RabbitMQ
W RabbitMQ powstały trzy komplety analogicznych struktur. Na przykładzie zdarzenia UserAdded są to dwa exchang-e: Message:UserAdded i UserAdded oraz kolejka o nazwie UserAdded.
Przykład przesłanej wiadomości wygląda jak niżej. Atrybut messageType ma wartość urn:message:Messages:UserAdded.
Przykład 2 – osobne klasy komunikatów
Przykład drugi jest bardzo podobny do pierwszego. Różnica polega na tym, że nie ma już współdzielonego projektu z definicjami wiadomości. Zamiast tego zarówno konsument jak i aplikacja publikująca posiadają własne rekordy.
Po uruchomieniu aplikacji konsumenta i po próbach wysłania wiadomości z aplikacji producenta, w RabbitMQ powstaną struktury jak niżej (nazwy bazują na nazwach klas z uwzględnieniem przestrzeni nazw).
Od strony konsumenta powstały exchange i kolejki analogicznie jak w poprzednim przykładzie. Natomiast od strony aplikacji publikującej również powstały exchang-e. To na nie aplikacja będzie publikowała wiadomości. Problem polega na tym, że jedne i drugie nie są w żaden sposób ze sobą powiązane, czyli wiadomości będą publikowane do exchange-y, do których nie ma podłączonych żadnych kolejek. Przykład publikowanej wiadomości poniżej. Uwagę należy zwrócić na atrybut messageType, który ma wartość urn:message:Publisher.Messages:UserAdded.
Poprawka 1 – nazwy exchange i kolejek w aplikacji publikującej
Jak coś takiego poprawić? Musimy doprecyzować nazwy kolejek, do których aplikacja będzie publikowała. Przykład poniżej.
builder.Services.AddMassTransit(configuration =>
{
configuration.UsingRabbitMq((context, cfg) =>
{
cfg.Message<Messages.UserAdded>(c => { c.SetEntityName("UserAdded"); });
cfg.Message<Messages.UserUpdated>(c => { c.SetEntityName("UserUpdated"); });
cfg.Message<Messages.UserDisabled>(c => { c.SetEntityName("UserDisabled"); });
});
});
Po wprowadzeniu takich zmian, aplikacja publisher-a zaczyna wysyłać wiadomości do poprawnie nazwanego exchange. Jednak pojawia się inny problem – po publikacji wiadomości nie zostaje ona przetworzona przez aplikację konsumenta. Zamiast tego pojawiły się w RabbitMQ struktury z dopiskiem _skipped.
Jest to spowodowane zawartością atrybutu messageType wiadomości. Publikowana wiadomość ma tam wartość urn:message:Publisher.Messages:UserAdded, konsument oczekuje zaś urn:message:Consumer.Messages:UserAdded. Z perspektywy aplikacji odbierającej, jest to wiadomość nieobsługiwanego typu. Z tego powodu wiadomość jest pomijana (ląduje w odpowiedniej kolejce _skipped).
Poprawka 2 – urn wiadomości
Żeby rozwiązać ten problem należy uspójnić nazewnictwo wiadomości w obu aplikacjach. W tym celu można użyć atrybutu MessageUrn.
using MassTransit;
namespace Consumer.Messages;
[MessageUrn("UserAdded")]
public record UserAdded();
Efektem takiej zmiany będzie wiadomość jak niżej. Atrybut messageType ma teraz wartość urn:message:UserUpdated. Konsument teraz rozpoznaje typ wiadomości i może ją przetworzyć.
Przykład 3 – wiele odbiorców
Ostatni przykład jest w zasadzie powieleniem poprzedniego wpisu. Dodaje go tutaj, aby tekst był spójny i pokazywał główne przypadki użycia kolejek. Dodatkowo powstanie tutaj jeden mały problem, który nie występował we wcześniejszym wpisie.
Ten przykład przewiduje kilka aplikacji konsumenta, które dostają tą samą wiadomość-zdarzenie. Aplikacje te będą tworzyły osobne, tymczasowe kolejki zbindowane z jednym exchange. Wymaga to konfiguracji InstandeId. Będzie to unikalny dla aplikacji ciąg znaków, który będzie doklejany do nazwy kolejki i exchange.
builder.Services.AddMassTransit(configure =>
{
configure.AddConsumer<Consumers.UserAddedConsumer>()
.Endpoint(cfg =>
{
cfg.InstanceId = "-ConsumerA";
cfg.Temporary = true;
});
configure.AddConsumer<Consumers.UserUpdatedConsumer>()
.Endpoint(cfg =>
{
cfg.InstanceId = "-ConsumerA";
cfg.Temporary = true;
});
configure.AddConsumer<Consumers.UserDisabledConsumer>()
.Endpoint(cfg =>
{
cfg.InstanceId = "-ConsumerA";
cfg.Temporary = true;
});
configure.UsingRabbitMq((context, cfg) => {});
});
Efekt tego będzie jak poniżej. Występuje tu podobny problem jak na początku poprzedniego przykładu – strony konsumujące i strona publikująca nie mają wspólnych punktów w topologii w RabbitMQ.
Aplikacja publikująca wiadomości jest skonfigurowana jak w poprzednim przykładzie. Czyli ustawione są SetEntityName. Z tego powodu zostały utworzone exchange UserAdded, UserUdated oraz UserDisabled.
Poprawka – nazwy exchange
Podobnie jak w poprzednim przykładzie konieczne było poprawienie nazw kolejek po stronie publikującej, tak w tym przypadku potrzebny jest analogiczny proces po stronie konsumującej wiadomości. Aplikacje konsumujące trzeba uzupełnić o konfigurację jak niżej.
configure.UsingRabbitMq((context, cfg) =>
{
cfg.Message<Messages.UserAdded>(x =>
{
x.SetEntityName("UserAdded");
});
cfg.Message<Messages.UserUpdated>(x =>
{
x.SetEntityName("UserUpdated");
});
cfg.Message<Messages.UserDisabled>(x =>
{
x.SetEntityName("UserDisabled");
});
cfg.ConfigureEndpoints(context);
});
Efekt tego w RabbitMQ będzie następujący.