MassTransit i RabbitMQ – nazwy kolejek, exchange i wiadomości

Logika nazywania kolejek i exchange była dla mnie na początku mocno niezrozumiała. Nie mogłem też znaleźć w dokumentacji nic, co by to jasno wyjaśniało.

W tym tekście postaram się opisać kilka przykładów, które pokazują jak to działa.

Pierwszy przykład to najprostsza konfiguracja – aplikacja publikująca wiadomości, aplikacja konsumująca wiadomości i współdzielony projekt z definicją rekordów wiadomości.

Drugi przykład jest podobny, jednak brak jest wspólnego projektu, a każdy z programów ma własną definicje rekordów wiadomości.

Trzeci przykład pokazuje jak skonfigurować exchange i kolejki pozwalające na użycie większej ilości konsumentów.

Kod przykładów z tego wpisu znajduje się tutaj.

Przykład 1 – konfiguracja domyślna

Przykład składa się z aplikacji Publisher, aplikacji Consumer i trzech wiadomości – UserAdded, UserUpdated, UserDeleted. Konfiguracja minimalistyczna.

Konsument
builder.Services.AddMassTransit(configure =>
{
    configure.AddConsumer<Consumers.UserAddedConsumer>();
    configure.AddConsumer<Consumers.UserUpdatedConsumer>();
    configure.AddConsumer<Consumers.UserDisabledConsumer>();
    configure.UsingRabbitMq((context, cfg) =>
    {
        cfg.ConfigureEndpoints(context);
    });
});
Nadawca wiadomości
builder.Services.AddMassTransit(configuration =>
{
    configuration.UsingRabbitMq((context, cfg) =>
    {
    });
});
RabbitMQ

W RabbitMQ powstały trzy komplety analogicznych struktur. Na przykładzie zdarzenia UserAdded są to dwa exchang-e: Message:UserAddedUserAdded oraz kolejka o nazwie UserAdded.

Przykład przesłanej wiadomości wygląda jak niżej. Atrybut messageType ma wartość urn:message:Messages:UserAdded.

Przykład 2 – osobne klasy komunikatów

Przykład drugi jest bardzo podobny do pierwszego. Różnica polega na tym, że nie ma już współdzielonego projektu z definicjami wiadomości. Zamiast tego zarówno konsument jak i aplikacja publikująca posiadają własne rekordy.

Po uruchomieniu aplikacji konsumenta i po próbach wysłania wiadomości z aplikacji producenta, w RabbitMQ powstaną struktury jak niżej (nazwy bazują na nazwach klas z uwzględnieniem przestrzeni nazw).

Od strony konsumenta powstały exchange i kolejki analogicznie jak w poprzednim przykładzie. Natomiast od strony aplikacji publikującej również powstały exchang-e. To na nie aplikacja będzie publikowała wiadomości. Problem polega na tym, że jedne i drugie nie są w żaden sposób ze sobą powiązane, czyli wiadomości będą publikowane do exchange-y, do których nie ma podłączonych żadnych kolejek. Przykład publikowanej wiadomości poniżej. Uwagę należy zwrócić na atrybut messageType, który ma wartość urn:message:Publisher.Messages:UserAdded.

Poprawka 1 – nazwy exchange i kolejek w aplikacji publikującej

Jak coś takiego poprawić? Musimy doprecyzować nazwy kolejek, do których aplikacja będzie publikowała. Przykład poniżej.

builder.Services.AddMassTransit(configuration =>
{
    configuration.UsingRabbitMq((context, cfg) =>
    {
        cfg.Message<Messages.UserAdded>(c => { c.SetEntityName("UserAdded"); });
        cfg.Message<Messages.UserUpdated>(c => { c.SetEntityName("UserUpdated"); });
        cfg.Message<Messages.UserDisabled>(c => { c.SetEntityName("UserDisabled"); });
    });
});

Po wprowadzeniu takich zmian, aplikacja publisher-a zaczyna wysyłać wiadomości do poprawnie nazwanego exchange. Jednak pojawia się inny problem – po publikacji wiadomości nie zostaje ona przetworzona przez aplikację konsumenta. Zamiast tego pojawiły się w RabbitMQ struktury z dopiskiem _skipped.

Jest to spowodowane zawartością atrybutu messageType wiadomości. Publikowana wiadomość ma tam wartość urn:message:Publisher.Messages:UserAdded, konsument oczekuje zaś urn:message:Consumer.Messages:UserAdded. Z perspektywy aplikacji odbierającej, jest to wiadomość nieobsługiwanego typu. Z tego powodu wiadomość jest pomijana (ląduje w odpowiedniej kolejce _skipped).

Poprawka 2 – urn wiadomości

Żeby rozwiązać ten problem należy uspójnić nazewnictwo wiadomości w obu aplikacjach. W tym celu można użyć atrybutu MessageUrn.

using MassTransit;

namespace Consumer.Messages;

[MessageUrn("UserAdded")]
public record UserAdded();

Efektem takiej zmiany będzie wiadomość jak niżej. Atrybut messageType ma teraz wartość urn:message:UserUpdated. Konsument teraz rozpoznaje typ wiadomości i może ją przetworzyć.

Przykład 3 – wiele odbiorców

Ostatni przykład jest w zasadzie powieleniem poprzedniego wpisu. Dodaje go tutaj, aby tekst był spójny i pokazywał główne przypadki użycia kolejek. Dodatkowo powstanie tutaj jeden mały problem, który nie występował we wcześniejszym wpisie.

Ten przykład przewiduje kilka aplikacji konsumenta, które dostają tą samą wiadomość-zdarzenie. Aplikacje te będą tworzyły osobne, tymczasowe kolejki zbindowane z jednym exchange. Wymaga to konfiguracji InstandeId. Będzie to unikalny dla aplikacji ciąg znaków, który będzie doklejany do nazwy kolejki i exchange.

builder.Services.AddMassTransit(configure =>
{
    configure.AddConsumer<Consumers.UserAddedConsumer>()
         .Endpoint(cfg =>
         {
            cfg.InstanceId = "-ConsumerA";
            cfg.Temporary = true;
         });
    configure.AddConsumer<Consumers.UserUpdatedConsumer>()
         .Endpoint(cfg =>
         {
            cfg.InstanceId = "-ConsumerA";
            cfg.Temporary = true;
         });
    configure.AddConsumer<Consumers.UserDisabledConsumer>()
         .Endpoint(cfg =>
         {
            cfg.InstanceId = "-ConsumerA";
            cfg.Temporary = true;
         });
    configure.UsingRabbitMq((context, cfg) => {});
});

Efekt tego będzie jak poniżej. Występuje tu podobny problem jak na początku poprzedniego przykładu – strony konsumujące i strona publikująca nie mają wspólnych punktów w topologii w RabbitMQ.

Aplikacja publikująca wiadomości jest skonfigurowana jak w poprzednim przykładzie. Czyli ustawione są SetEntityName. Z tego powodu zostały utworzone exchange UserAdded, UserUdated oraz UserDisabled.

Poprawka – nazwy exchange

Podobnie jak w poprzednim przykładzie konieczne było poprawienie nazw kolejek po stronie publikującej, tak w tym przypadku potrzebny jest analogiczny proces po stronie konsumującej wiadomości. Aplikacje konsumujące trzeba uzupełnić o konfigurację jak niżej.

configure.UsingRabbitMq((context, cfg) =>
    {
        cfg.Message<Messages.UserAdded>(x =>
        {
            x.SetEntityName("UserAdded");
        });
        cfg.Message<Messages.UserUpdated>(x =>
        {
            x.SetEntityName("UserUpdated");
        });
        cfg.Message<Messages.UserDisabled>(x =>
        {
            x.SetEntityName("UserDisabled");
        });
        cfg.ConfigureEndpoints(context);
    });

Efekt tego w RabbitMQ będzie następujący.