MassTransit i RabbitMQ- zdarzenia i komendy

W systemach komunikujących się za pomocą wiadomości i systemów kolejkowych, wyróżnia się dwa typy wiadomości – komendy (commands) i zdarzenia (events). Ich obsługa przez system powinna się nieco różnić.

Przedstawiony tym razem przykład obejmuje jeden program publikujący wiadomości będące komendami i zdarzeniami oraz dwa programy konsumujące te wiadomości.

Wiadomości-zdarzenia powinny trafić do wszystkich zainteresowanych konsumentów. Wiadomości-komendy powinny zaś zostać odebrane i przetworzone przez jednego konsumenta (tak, żeby nie zostały przetworzone wielokrotnie).

Jak to działa w RabbitMQ?

RabbitMQ jest brokerem wiadomości. Jego zadaniem jest kolejkowanie i przekazywanie wiadomości od nadawców do odbiorców. Tak wygląda uproszczony model – składa się z nadawcy, kolejki i odbiorcy. Jednak, jeżeli przyjrzeć się bliżej, okaże się, że producent nigdy nie wysyła wiadomości bezpośrednio do kolejki. Jest tam jeszcze jeden element – exchange (ciężko mi znaleźć sensowną nazwę w języku polskim, więc pozostanę przy angielskiej).

Exchange jest elementem, w którym znajduje się logika kreująca przepływy wiadomości wewnątrz brokera. Wiadomość może zostać przekazana z exchange do konkretnej kolejki, do kilku kolejek jednocześnie albo do innego exchange. Można w ten sposób tworzyć całkiem zaawansowane grafy przepływu, bazując na właściwościach odebranej wiadomości.

Obsługa zdarzeń

Zdarzenia (events) powinny trafić do wszystkich zarejestrowanych odbiorców. Każdy z odbiorców rejestruje swoją tymczasową kolejkę wiadomości. Taka kolejka jest połączona (binding) z konkretnym exchange.

W momencie publikowania przez producenta zdarzenia, zdarzenie takie trafia do exchange, z którego przekazywane jest dalej do wszystkich zbindowanych z nim kolejek. Z tych kolejek każdy z konsumentów pobiera je w swoim tempie.

W momencie wyłączenia jednego z konsumentów – powiązana z nim kolejka znika. W skrajnym przypadku może się okazać, że nie ma żadnego odbiorcy. Wówczas zdarzenia nie są zapamiętywane przez system kolejkowy – ponieważ nie ma nikogo, kogo by dana informacja interesowała.

Obsługa komend

Komendy obsługuje się inaczej. Komendy to informacje, że coś musi zostać wykonane. Takiej informacji nie można „zgubić”. Komendy powinny być buforowane do momentu, gdy pojawi się konsument, który może je przetworzyć.

Ponadto komendy powinny być wykonane tylko raz, jeżeli istnieje więcej niż jeden konsument, konieczne jest zapewnienie, że komendę odczyta i przetworzy tylko jeden.

W takim przypadku mówimy o jednej tylko kolejce, do której podłączonych jest wiele konsumentów. Odpowiednim rozdysponowaniem wiadomości zajmie się RabbitMQ.

Przykład z MassTransit

Kod przykładu znajduje się tutaj. Składa się z trzech komponentów – aplikacji producenta, aplikacji konsumenta i współdzielonego projektu z definicją wiadomości.

Przykładowe wiadomości są dwie – komenda SendMail i zdarzenie UserUpdated.

public record SendMail
{
    public required string Subject { get; init; }
    public required List<string> Recipients { get; init; }
    public required string Body { get; init; }
}

public record UserAdded
{
    public required Guid Guid { get; init; }
    public required string Email { get; init; }
}

Transport via RabbitMQ

Tutaj wszystko wygląda analogicznie jak w poprzednim przykładzie. Zarówno po stronie producenta wiadomości, jak i konsumenta.

builder.Services.AddOptions<RabbitMqTransportOptions>()
    .Configure(options =>
    {
        options.Host = builder.Configuration.GetValue<string>("Rabbit:Host");
        options.Port = builder.Configuration.GetValue<ushort>("Rabbit:Port");
        options.VHost = builder.Configuration.GetValue<string>("Rabbit:VHost");
        options.User = builder.Configuration.GetValue<string>("Rabbit:User");
        options.Pass = builder.Configuration.GetValue<string>("Rabbit:Pass");
        options.UseSsl = builder.Configuration.GetValue<bool>("Rabbit:UseSsl");
    });

Konsumowanie wiadomości

Od strony odbierającej potrzebna jest implementacja interfejsu IConsumer. Przykład zawiera dwie klasy konsumentów – dla wiadomości Messages.SendMail oraz Messages.UserAdded. Obie są w zasadzie identyczne. Poniżej jedna z nich.

public class SendMailConsumer:IConsumer<Messages.SendMail>
{
    private readonly ILogger _logger;

    public SendMailConsumer(ILogger<SendMailConsumer> logger)
    {
        _logger = logger;
    }

    public Task Consume(ConsumeContext<SendMail> context)
    {
        var message = context.Message;
        _logger.LogInformation("SendMail - subject:{subject}, recipients:{recipients}, body:{body}",
            message.Subject,
            string.Join(",",message.Recipients),
            message.Body);
        return Task.CompletedTask;
    }
}

Odrobinę ciekawiej się robi przy konfiguracji kolejek. W przypadku SendMail nie trzeba w sumie nic specjalnego robić, natomiast UserAdded wymaga stworzenia tymczasowych kolejek, po jednej dla każdego uruchomionego konsumenta.

var instanceId = "-"+Guid.NewGuid().ToString();
builder.Services.AddMassTransit(x =>
{
    x.AddConsumer<SendMailConsumer>();
    x.AddConsumer<UserAddedConsumer>().Endpoint(c =>
    {
        c.InstanceId = instanceId;
        c.Temporary = true;
    });;
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.ConfigureEndpoints(context);
    });
});

W efekcie uruchomienia programu, w RabbitMQ utworzy się konstrukcja podobna do poniższej.

Produkowanie wiadomości

Od strony producenta wiadomości, kod będzie bardzo prosty. Całość sprowadza się do odpowiedniego użycia interfejsu IBus.

private readonly IBus _bus;

[...]

var sendEndpoint = await _bus.GetPublishSendEndpoint<Messages.SendMail>();
var message = new Messages.SendMail()
{
   Subject = "new user",
   Recipients = new List<string>() { "user@example.com" },
   Body = "Hello new user."
};
await sendEndpoint.Send<Messages.SendMail>(message);

[...]

var message = new Messages.UserAdded()
{
   Email = "user@example.com",
   Guid = Guid.NewGuid()
};
await _bus.Publish<Messages.UserAdded>(message);

Przykład działania aplikacji

1 – Publikowana jest wiadomość typu UserAdded, zostaje odebrana przez obu konsumentów.

2 – Opublikowana zostaje wiadomość SendMail, odczytana zostaje tylko przez konsumenta drugiego.

3 – Publikowana jest wiadomość typu UserAdded, również zostaje odebrana przez obu konsumentów.

4 – Opublikowana zostaje wiadomość SendMail, odczytana zostaje tylko przez konsumenta pierwszego.