Building an Event Driven .NET Application: Setting Up MassTransit and RabbitMQ
Distributed Systems
- event driven architecture
- dotnet
- net5
- masstransit
- rabbitmq
- Published on
- Authors
- Name
- Paul DeVito
- @pdevito3
- Building an Event Driven .NET Application: The Fundamentals
- Building an Event Driven .NET Application: Setting Up MassTransit and RabbitMQ
- Building an Event Driven .NET Application: Integration Testing Your MassTransit Message Bus
Introduction
The purpose of this series is to go through and in-depth walkthrough of setting up an event driven architecture in a distributed .NET application.
This is the second post in the Building an Event Driven .NET Application series. In this post, we’ll see how we can set up our MassTransit bus to send message to and receive messages from RabbitMQ. In the last post, we covered the core concepts and now we get to look at some code.
⚠ Unless you're already very familiar with the EDA flow, I would strongly recommend going through the core concepts post. It won't take very long 🙂
Setting Up RabbitMQ 🐰
We're going to start out by setting up our message broker (this is our post office where we send all of our messages for distribution). In this example, we're going to use RabbitMQ.
Running RabbitMQ Locally Using Docker
While RabbitMQ has a great docker page, we're actually going to use the MassTransit Image instead to make our lives a bit easier (MassTransit is the message bus we're going to be using).
docker run -p 15672:15672 -p 5672:5672 masstransit/rabbitmq
Once you have the container up and running, you can go to localhost:15672
and log in with the user guest
and password guest
.
Using a Deployed Instance of RabbitMQ
There are obviously a myriad of options to use when deploying RabbitMQ, but I want to give a special shout out to CloudAMQP. They offer RMQ as a Service and focus entirely on doing this very well. They also have great docs and even have a free-tier for devs to work in and get a feel for things, so it's perfect for this use case.
Understanding the RabbitMQ User Interface
The MassTransit docker image we used for RabbitMQ leverages the management
plugin to give you a use interface (this what you hit if you went to localhost:15672
or your CloudAMQP deployment). It isn't the best UI you've ever seen, but it has some pretty good features for managing your RMQ setup.
CloudAMQP has a great write up on the different aspects of the UI and I'd definitely recommend checking it out.
Publishing and Subscribing to Messages Using MassTransit in .NET Web APIs
So now that we have our message broker set up, let's send it some messages! We could roll our own code for this, but there are several really good libraries out there (MassTransit, NServiceBus, Brighter, Rebus) that handle publishing and subscribing to messages for use already. No need to reinvent the wheel.
In this example, we're going to use MassTransit. It's a battle tested OSS library in the .NET space with a great community, helpful maintainer, and thorough documentation.
We're going to go over a few different use cases to cover what the different exchanges would look like.
Use Case 1: Setting Up a Direct Exchange
For this example, lets say that we have a reporting application and we want to send out a report over either email or fax. To do this, we'll send a message to either the email queue or the fax queue depending on which routing key is sent along with the message. Once the message reaches the queue, an email or fax service can get the message from the queue and send the report.
Project Setup
I'm using a simple web api generated from Craftsman. I'm using a build in the middle of v0.10.0 so there isn't a great way to build it on demand at the moment, but you can jump tot he example repo below.
You can build something similar using this template in 0.9.4+ and should have even more flexibility with 0.10.x+ to do a lot of this for you.
DomainName: WeSendReportsCompany
BoundedContexts:
- SolutionName: Reporting
Port: 1200
DbContext:
ContextName: ReportingDbContext
DatabaseName: ReportingDbContext
Provider: SqlServer
Entities:
- Name: ReportRequest
Properties:
- Name: Provider
Type: string
CanFilter: true
CanSort: true
- Name: Target
Type: string
CanFilter: true
CanSort: true
SwaggerConfig:
Title: Reporting Swagger Doc
Description: Swagger doc for reporting
SwaggerEndpointName: "v1"
AddSwaggerComments: true
If you want to look at the repo directly, you can see:
Summary
So if you wanted to visualize what we're going to build, you'd see something like the below. And it probably isn't quite what you'd expect at first.
Let's see what this looks like.
- A user submits a report in their application, which makes an API call to our .NET Core backend. Our backend will create a message marked with a routing key of
email
and an exchange type ofdirect
to be sent to our message broker. - Our message gets sent to the reporting exchange called
send-report
. This reporting exchange has adirect
binding to two other intermediary exchanges, one for email and one for fax (more on the intermediary exchange in a minute). - When the reporting exchange receives the message, it will check the binding keys between each of the intermediary exchanges to see where it should route the message. In this case, the message has a routing key of
email
and will be routed to the exchange that is bound with a matchingemail
key. - The intermediary exchange is set as a
fanout
with a single binding to a queue with the same name; in this case,send-email
. Once this exchange gets the message from the primary exchange, it will route that message to the email queue to be consumed by the email service.
Now you might be asking, what the heck is up with these intermediary exchanges? Up to this point, we've only ever discussed having one exchange that connects directly to a queue with some kind of binding. In this case, we've added the extra exchange to facilitate a pattern called wire tapping to allow us to more easily debug our process in production. This pattern is built into MassTransit so you don't have to do anything special, but you do need to be aware of how it's setting things up for you. For more info on the wire tap pattern and how it can be used, check out this video by Chris Patterson, the creator of MassTransit.
Creating the Message
This is the branch for message setup
First, lets add a class library to our solution (if you have a project built with Craftsman 0.10.x+, this project will get added with your bus automatically). I'd recommend creating a separate class library project to use as your source of truth for your messages. This is because MassTransit uses the full type name, including the namespace, for message contracts. When creating the same message type in two separate projects, the namespaces must match or the message will not be consumed. By adding the message to a single class library, it should make your life a bit easier, especially when working with distributed projects.
Okay, so what do these messages look like? Well, they're just simple plain old class objects (POCO), though it is strongly recommended to use interfaces for message contracts. In this example, I'm going to make an ISendReportRequest
object that will capture the ReportId
, the Provider
that we want to use to send the message (email or fax), and the Target
where we want to send the message. This would probably be a bit more involved (e.g. enum or something of that nature for Provider), but i'm going to leave it as a string for simplicity sake. Generally you want to keep your messages as small as you can, but the amount of messages going back and forth can actually affect performance more than the message size, so don't be afraid to put a bit more in your messages if it will reduce the amount of calls you are making.
public interface ISendReportRequest
{
Guid ReportId { get; set; }
string Provider { get; set; } // "email" or "fax"
string Target { get; set; }
}
That's it! Let's publish our message.
Registering MassTransit in Our Producer
This is the branch for producer setup
Let's start by adding MassTransit to the project we're going to publish from. In this case, I'm going to add it to a .NET 6 web api, but this process would be the same for any .NET Core app.
To start, let's go to the csproj
of our publishing project and add the following Nuget packages so we can use MassTransit:
<PackageReference Include="MassTransit" Version="7.1.8" />
<PackageReference Include="MassTransit.AspNetCore" Version="7.1.8" />
<PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="7.1.8" />
<PackageReference Include="MassTransit.RabbitMQ" Version="7.1.8" />
Then, let's register MassTransit in our Startup.cs
services and use RabbitMQ:
using MassTransit;
using Messages;
using RabbitMQ.Client;
///
services.AddMassTransit(mt =>
{
mt.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.Message<ISendReportRequest>(e => e.SetEntityName("report-requests")); // name of the primary exchange
cfg.Publish<ISendReportRequest>(e => e.ExchangeType = ExchangeType.Direct); // primary exchange type
cfg.Send<ISendReportRequest>(e =>
{
e.UseRoutingKeyFormatter(context => context.Message.Provider.ToString()); // route by provider (email or fax)
});
});
});
services.AddMassTransitHostedService();
AddMassTransit
is the main service registration for MassTransitUsingRabbitMq
will let us configure our RabbitMQ connection- The
cfg.Host
portion is actually optional for local development if you are running RMQ locally using the MassTransit image, but you can add whatever server information is appropriate if you're using something like CloudAMQP or a non dev environment. Obviously if you are using this in production, please be mindful of how you are storing this information.
- The
Message<ISendReportRequest>(e => e.SetEntityName("report-requests"))
is telling MassTransit that we want to send messages of typeISendReportRequest
to an exchange calledreport-requests
.Publish<ISendReportRequest>(e => e.ExchangeType = ExchangeType.Direct)
is telling MassTransit that we want messages of typeISendReportRequest
to be published using adirect
exchange type.Send<ISendReportRequest>
is setting up the routing configuration we want to use when we sent a message of typeISendReportRequest
. In this case, we are usingUseRoutingKeyFormatter
to tell MassTransit that we want to use the provider as the routing key (e.g.email
orfax
)AddMassTransitHostedService
will add the service bus and some additional bells and whistles like health check integration that we won't worry about for now.
Publishing Our Message
Next, we can publish our message! The exact location that you put this depends on the way you've organized your project, but the principle is the same. In this example, I have a MediatR RequestHandler (technically an in-memory message bus dealing with commands instead of events), but you could put it in a repository or other method if that's how your project is set up. Regardless of where you're adding it, it's pretty straight forward.
As our starting point, lets say that I have a SubmitReportRequestCommand
command that adds an order request to the database:
public class Handler : IRequestHandler<SubmitReportRequestCommand, ReportRequestDto>
{
private readonly ReportingDbContext _db;
private readonly IMapper _mapper;
public Handler(ReportingDbContext db, IMapper mapper)
{
_mapper = mapper;
_db = db;
}
public async Task<ReportRequestDto> Handle(SubmitReportRequestCommand request, CancellationToken cancellationToken)
{
if (await _db.ReportRequests.AnyAsync(r => r.ReportId == request.RequestToAdd.ReportId))
{
throw new ConflictException("Reporting Request already exists with this primary key.");
}
var reportRequest = _mapper.Map<ReportRequest>(request.ReportRequestToAdd);
_db.ReportRequests.Add(reportRequest);
var saveSuccessful = await _db.SaveChangesAsync() > 0;
if (saveSuccessful)
{
return await _db.ReportRequests
.ProjectTo<ReportRequestDto>(_mapper.ConfigurationProvider)
.FirstOrDefaultAsync(r => r.ReportId == reportRequest.ReportId);
}
else
{
throw new Exception("Unable to save the new record. Please check the logs for more information.");
}
}
}
First, inject an IPublishEndpoint
to your command, repository, etc. This is what MassTransit will use to transport the message to the message broker.
private readonly ReportingDbContext _db;
private readonly IMapper _mapper;
private readonly IPublishEndpoint _publishEndpoint;
public Handler(ReportingDbContext db, IMapper mapper, IPublishEndpoint publishEndpoint)
{
_mapper = mapper;
_db = db;
_publishEndpoint = publishEndpoint;
}
Then we can create our message and publish it using the publishing endpoint. Here, I am just using an anonymous object, but you could use automapper or whatever makes sense for you here as well.
var message = new
{
ReportId = reportRequest.ReportRequestId,
Provider = reportRequest.Provider.ToString(),
Target = reportRequest.Target.ToString()
};
await _publishEndpoint.Publish<ISendReportRequest>(message);
So bringing that all together, you have something like this.
public class Handler : IRequestHandler<SubmitReportRequestCommand, ReportRequestDto>
{
private readonly ReportingDbContext _db;
private readonly IMapper _mapper;
private readonly IPublishEndpoint _publishEndpoint;
public Handler(ReportingDbContext db, IMapper mapper, IPublishEndpoint publishEndpoint)
{
_mapper = mapper;
_db = db;
_publishEndpoint = publishEndpoint;
}
public async Task<ReportRequestDto> Handle(SubmitReportRequestCommand request, CancellationToken cancellationToken)
{
if (await _db.ReportRequests.AnyAsync(r => r.ReportId == request.RequestToAdd.ReportId))
{
throw new ConflictException("Reporting Request already exists with this primary key.");
}
var reportRequest = _mapper.Map<ReportRequest>(request.ReportRequestToAdd);
_db.ReportRequests.Add(reportRequest);
var saveSuccessful = await _db.SaveChangesAsync() > 0;
if (saveSuccessful)
{
var message = new
{
ReportId = reportRequest.ReportRequestId,
Provider = reportRequest.Provider.ToString(),
Target = reportRequest.Target.ToString()
};
await _publishEndpoint.Publish<ISendReportRequest>(message);
return await _db.ReportRequests
.ProjectTo<ReportRequestDto>(_mapper.ConfigurationProvider)
.FirstOrDefaultAsync(r => r.ReportId == reportRequest.ReportId);
}
else
{
throw new Exception("Unable to save the new record. Please check the logs for more information.");
}
}
}
And that's it! If we were to run our API and call our endpoint that creates new order requests, we should see a new request in RabbitMQ!
Registering MassTransit in our Consumer
This is the branch for consumer setup
Now lets set up our consumer. This is going to be a worker service that doesn't actually implement any logic, but can consume the messages for us.
Just like before, let's start by going to the csproj
of our worker project and add the following Nuget packages so we can use MassTransit:
<PackageReference Include="MassTransit" Version="7.1.8" />
<PackageReference Include="MassTransit.AspNetCore" Version="7.1.8" />
<PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="7.1.8" />
<PackageReference Include="MassTransit.RabbitMQ" Version="7.1.8" />
Then, let's register MassTransit in our Program.cs
services (no startup because we're using a worker, but could do that if you were in a web api and it would work the same) and use RabbitMQ:
using MassTransit;
using Messages;
using RabbitMQ.Client;
///
services.AddMassTransit(mt =>
{
mt.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint("email-reports", re =>
{
// turns off default fanout settings
re.ConfigureConsumeTopology = false;
// a replicated queue to provide high availability and data safety. available in RMQ 3.8+
re.SetQuorumQueue();
// enables a lazy queue for more stable cluster with better predictive performance.
// Please note that you should disable lazy queues if you require really high performance, if the queues are always short, or if you have set a max-length policy.
re.SetQueueArgument("declare", "lazy");
re.Consumer<EmailConsumer>();
re.Bind("report-requests", e =>
{
e.RoutingKey = "email";
e.ExchangeType = ExchangeType.Direct;
});
});
cfg.ReceiveEndpoint("fax-reports", re =>
{
re.ConfigureConsumeTopology = false;
re.Consumer<FaxConsumer>();
re.Bind("report-requests", e =>
{
e.RoutingKey = "fax";
e.ExchangeType = ExchangeType.Direct;
});
});
});
});
services.AddMassTransitHostedService();
AddMassTransit
,UsingRabbitMq
,cfg.Host
, andAddMassTransitHostedService
all behave the same as in the previous service registrationcfg.ReceiveEndpoint("email-reports", re =>
is creating an intermediary fanout exchange namedemail-reports
as well as a queue of the same name that will be bound to it.re.ConfigureConsumeTopology = false;
turns off the default fanout settingsre.SetQuorumQueue();
sets the queue to be a quorum queue, a best practice made available starting in RMQ 3.8. For more details see this writeup by CloudAMQP.re.SetQueueArgument("declare", "lazy");
sets the queue to be lazy per CloudAMQP best practices. If you need really high performance, always have short queues, or have a max length policy you should remove this setting.re.Consumer<EmailConsumer>();
subscribes theEmailConsumer
to theemail-reports
queue.re.Bind("report-requests", e =>
binds thereport-requests
exchange to the intermediary exchange.e.RoutingKey = "email";
sets the binding between the primary and intermediary exchange to a particular routing keye.ExchangeType = ExchangeType.Direct;
sets the primary exchange type. In this case,direct
so messages have to have a routing key that matches the binding key to be routed through.
Consuming the message
Finally, let's make the actual consumers! These are just going to be classes that inherit an IConsumer
of whatever the message is, in our case IConsumer<ISendReportRequest>
.
So the email consumer might look like this:
public class EmailConsumer : IConsumer<ISendReportRequest>
{
public Task Consume(ConsumeContext<ISendReportRequest> context)
{
// do work to send email here
return Task.CompletedTask;
}
}
And the fax consumer might look like this:
public class FaxConsumer : IConsumer<ISendReportRequest>
{
public Task Consume(ConsumeContext<ISendReportRequest> context)
{
// do work to send fax here
return Task.CompletedTask;
}
}
Now if you ran the solution with multiple startup projects (web api and worker), you should be able to create a new request and messages with a provider of email
or fax
to hit a breakpoint on either of the consumers!
Use Case 2: Setting Up a Topic Exchange
Next, let's see how we can set up a topic
exchange. As a reminder, a topic
exchange type is similar to the direct
type, but lets us check for a partial match between the routing key and the binding key. You can use a *
(asterisk) as a wildcard for a single word and a #
(hash) can substitute for zero or more words.
Source Code
We'll be modifying our direct exchange project. Here is the branch with the topic exchange updates.
Summary
This example will be similar to the direct example above, only now, we will be routing our message based on whether it is a public or private message. Again, let's start by visualizing the process.
- A user submits a report in their application, which makes an API call to our .NET Core backend. Our backend will create a message marked with a routing key of
private.email
and an exchange type oftopic
to be sent to our message broker. - Our message gets sent to the reporting exchange called
send-report
. This reporting exchange now has atopic
binding to two other intermediary exchanges, one for private messages and one for public messages. This time, it doesn't matter what the provider is because we have a*
in our binding; only thepublic
orprivate
designation will affect the routing of these messages. - When the reporting exchange receives the message, it will check the binding keys between each of the intermediary exchanges to see where it should route the message. In this case, the message has a routing key of
private.email
and will be routed to the exchange that is bound with the partial match ofprivate.*
- The intermediary exchange is set as a
fanout
with a single binding to a queue with the same name; in this case,private-reports
. Once this exchange gets the message from the primary exchange, it will route that message to the email queue to be consumed by the email service.
Updating Our Message
First, I want to update our message (and associated DTO and entity) to include a new IsPublic
property. This will determine whether or not a request is for public or private consumption and, in turn, how we should route the message.
public interface ISendReportRequest
{
Guid ReportId { get; set; }
string Provider { get; set; } // "email" or "fax" or "cloud"
string Target { get; set; }
public bool IsPublic { get; set; }
}
Setting Up Our Producer
Then we're going to make some slight tweaks to our service registration.
services.AddMassTransit(mt =>
{
mt.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.Message<ISendReportRequest>(e => e.SetEntityName("send-report")); // name of the primary exchange
cfg.Publish<ISendReportRequest>(e => e.ExchangeType = ExchangeType.Topic); // primary exchange type
cfg.Send<ISendReportRequest>(e =>
{
e.UseRoutingKeyFormatter(context =>
{
var sharedState = context.Message.IsPublic ? "public" : "private";
return $"{sharedState}.{context.Message.Provider}";
});
});
});
});
services.AddMassTransitHostedService();
Let's break this down. The big change here is that we're creating a chained routing key.
- Just like before,
cfg.Message<ISendReportRequest>(e => e.SetEntityName("send-report"));
will create our primary exchange with a name ofsend-report
cfg.Publish<ISendReportRequest>(e => e.ExchangeType = ExchangeType.Topic);
is setting our exchange type just like before, only now we're using a type oftopic
e.UseRoutingKeyFormatter(context =>
is still here to set build our message's routing key, but now we're creating asharedState
variable to figure out if the message public or private, and then appending the provider (where we want to send the report) on the end like so$"{sharedState}.{context.Message.Provider}"
. The provider isn't coming in to play with the routing at all as our binding will have wildcard for the provider, but we can still pass the info along to be used however is appropriate.
Updating Our Consumer
Next, let's update our consumer to create modified endpoints for this new need. We're going to make one private endpoint (private-reports
) and a public endpoint (public-reports
).
services.AddMassTransit(mt =>
{
mt.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint("private-reports", re =>
{
// turns off default fanout settings
re.ConfigureConsumeTopology = false;
// a replicated queue to provide high availability and data safety. available in RMQ 3.8+
re.SetQuorumQueue();
// enables a lazy queue for more stable cluster with better predictive performance.
// Please note that you should disable lazy queues if you require really high performance, if the queues are always short, or if you have set a max-length policy.
re.SetQueueArgument("declare", "lazy");
re.Consumer<EmailConsumer>();
re.Consumer<FaxConsumer>();
re.Bind("send-report", e =>
{
e.RoutingKey = "private.*";
e.ExchangeType = ExchangeType.Topic;
});
});
cfg.ReceiveEndpoint("public-reports", re =>
{
// turns off default fanout settings
re.ConfigureConsumeTopology = false;
// a replicated queue to provide high availability and data safety. available in RMQ 3.8+
re.SetQuorumQueue();
// enables a lazy queue for more stable cluster with better predictive performance.
// Please note that you should disable lazy queues if you require really high performance, if the queues are always short, or if you have set a max-length policy.
re.SetQueueArgument("declare", "lazy");
re.Consumer<CloudConsumer>();
re.Bind("send-report", e =>
{
e.RoutingKey = "public.*";
e.ExchangeType = ExchangeType.Topic;
});
});
});
});
services.AddMassTransitHostedService();
Let's break it down:
- Like before,
cfg.ReceiveEndpoint("private-reports", re =>
is creating our intermediary exchange and queue calledprivate-reports
. re.ConfigureConsumeTopology = false;
turns off the default fanout settingsre.SetQuorumQueue();
sets the queue to be a quorum queue, a best practice made available starting in RMQ 3.8. For more details see this writeup by CloudAMQP.re.SetQueueArgument("declare", "lazy");
sets the queue to be lazy per CloudAMQP best practices. If you need really high performance, always have short queues, or have a max length policy. You should remove this setting.- Then we're using
re.Consumer<EmailConsumer>();
andre.Consumer<FaxConsumer>();
to bind both the email and the fax consumers to this endpoint. So private messages will be sent to both email and fax. This doesn't really make sense in this case as we're only sending one target and provider so the other couldn't be used, but you get the point 🙂 - Same as before
re.Bind("send-report", e =>
will create a binding from our intermediary exchange to our primary exchange with a binding ofprivate.*
, so any incoming message that starts withprivate
will be sent here. - And again, we set the primary exchange type to topic with
e.ExchangeType = ExchangeType.Topic;
The cloud consumer that was added here is just to show another consumer hitting the private queue, but it looks the same as the other two:
public class CloudConsumer : IConsumer<ISendReportRequest>
{
public Task Consume(ConsumeContext<ISendReportRequest> context)
{
// do work to send cloud here
return Task.CompletedTask;
}
}
RabbitMQ Best Practices
If you'd like to read on some of the best practices to follow around RabbitMQ, check out this post by CloudAMQP.
Using Craftsman to Add MassTransit Into Your Projects in Minutes
In the upcoming v0.10.x release of Craftsman, you will have several commands available to you like add:bus
, add:message
, register:consumer
, and register:producer
to scaffold out your
events in minutes. This eventing functionality will continue to be improved in future releases to include scaffolded tests, logging, open telemetry tracing, and more.
Coming in the Next Post
In the next post of the series, we’ll look at how we can implement testing for our events.
Regardless, I hope this was helpful! I'd love to hear your thoughts and insights on Twitter @pdevito3.