MassTransit i RabbitMQ

Do tej pory używałem bibliotek dostarczanych wraz z RabbitMQ, jednak jakiś czas temu, przy okazji jednego z projektów, zaproponowano mi użycie MassTransit do obsługi wymiany wiadomości miedzy mikroserwisami. Okazało się, że jest to całkiem ciekawe rozwiązanie.

MassTransit to otwarty framework wprowadzający warstwę abstrakcji nad systemem wymiany komunikatów. Oficjalnie wspierane są RabbitMQ, Azure Service Bus i Amazon SQS.

Poniżej przykład użycia w aplikacji producenta i konsumenta wiadomości.

Odrobina wstępu

Przykład składa się z dwóch aplikacji – Publisher, Consumer. Komunikują się one między sobą za pomocą RabbitMQ. Jeden wysyła do drugiego wiadomość (rekord Message). Kod z przykładu znajduje się tutaj.

Wiadomości są dwoma różnymi rekordami z punktu widzenia obu aplikacji, choć strukturę mają podobną. Zdefiniowane są jak niżej.

Publisher
using MassTransit;

namespace Publisher.Infrastructure.Messages;

[MessageUrn("message")]
public record Message
{
    public Guid Guid { get; } 
        = Guid.NewGuid();
}
Consumer
using MassTransit;

namespace Consumer.Infrastructure.Messages;

[MessageUrn("message")]
public record Message
{
    public Guid Guid { get; init; }
}

Ważny w tym wszystkim jest atrybut MessageUrn. MassTransit rozpoznaje typ wiadomości na podstawie ich URN. Jest on domyślnie tworzony na podstawie nazwy klasy/rekordu/interfejsu wraz z uwzględnieniem przestrzeni nazw. Dla dwóch różnych programów nie będą to te same przestrzenie nazw więc i typ wiadomości nie zostanie rozpoznany. Można by też użyć współdzielonego projektu definiującego typy przekazywanych komunikatów. Wtedy, dla obu aplikacji, będzie to ten sam rekord.

Transport via RabbitMQ

MassTransit może używać różnych serwisów do przekazywania wiadomości. Ja zdecydowałem się na RabbitMQ. Konfiguracja dla obu stron jest w zasadzie identyczna.

builder.Services.AddOptions<RabbitMqTransportOptions>()
   .Configure(options =>
   {
      options.Host = builder.Configuration.GetValue<string>("Rabbit:Host");
      options.Port = builder.Configuration.GetValue<ushort>("Rabbit:Port");
      options.VHost = builder.Configuration.GetValue<string>("Rabbit:VHost");
      options.User = builder.Configuration.GetValue<string>("Rabbit:User");
      options.Pass = builder.Configuration.GetValue<string>("Rabbit:Pass");
      options.UseSsl = builder.Configuration.GetValue<bool>("Rabbit:UseSsl");
   });

Publisher – strona transmitująca wiadomość

Samo publikowanie wiadomości jest bardzo proste. Ogranicza się do użycia interfejsu IPublishEndpoints.

private readonly IPublishEndpoint _endpoints;
[...]
var message = new Message();
[...]
_endpoints.Publish<Message>(message);

Ciekawsze jest to, jak przymusić całość, żeby wiadomości trafiały do odpowiednich exchange w brokerze (o odpowiedniech nazwach i o odpowiednich typach).

builder.Services.AddMassTransit(x =>
{
   x.UsingRabbitMq((context, cfg) =>
   {
      cfg.Message<Message>(x =>
      {
         x.SetEntityName(builder.Configuration
                .GetValue<string>("Rabbit:MessageExchangeName",
                "message-exchange")!);
        });
      cfg.Publish<Message>(x => { x.ExchangeType = "topic"; });
    });
});

Można też od razu zdefiniować całą topologię przekazywania wiadomości między exchange a queue, jednak wolę to zrobić już w samym RabbitMQ niż z poziomu aplikacji.

Consumer – odbieranie wiadomości

Od strony odbierającej potrzebna jest implementacja interfejsu IConsumer<Message>. Tutaj trywialny przykład.

using Consumer.Infrastructure.Messages;
using MassTransit;

namespace Consumer.Infrastructure;

public class MessageConsumer : IConsumer<Message>
{
   private readonly ILogger _logger;

   public MessageConsumer(ILogger<MessageConsumer> logger)
   {
      _logger = logger;
   }

   public Task Consume(ConsumeContext<Message> context)
   {
      var m = context.Message;
      _logger.LogInformation("Message {guid}",m.Guid);
      return Task.CompletedTask;
   }
}

Mając klasę, która jest w stanie zareagować na przychodzącą wiadomość, pozostaje wskazać ją MassTransit-owi i powiązać z odpowiednią kolejką w RabbitMQ.

builder.Services.AddMassTransit(x =>
{
   x.AddConsumer<MessageConsumer>().Endpoint(e =>
   {
      e.Name = builder.Configuration
            .GetValue<string>("Rabbit:MessageQueueName","messages")!;
      e.ConfigureConsumeTopology = false;
   });
   x.UsingRabbitMq((context, cfg) =>
   {
      cfg.ConfigureEndpoints(context);
   });
});

Na koniec przykład, jak wygląda taka wiadomość z perspektywy RabbitMQ.