{"id":82709,"date":"2018-12-31T16:13:21","date_gmt":"2018-12-31T16:13:21","guid":{"rendered":"https:\/\/www.red-gate.com\/simple-talk\/?p=82709"},"modified":"2021-07-29T19:44:09","modified_gmt":"2021-07-29T19:44:09","slug":"getting-started-with-cqrs-part-2","status":"publish","type":"post","link":"https:\/\/www.red-gate.com\/simple-talk\/development\/dotnet-development\/getting-started-with-cqrs-part-2\/","title":{"rendered":"Getting Started with CQRS \u2013 Part 2"},"content":{"rendered":"<p><strong>The series so far:<\/strong><\/p>\n<ol>\n<li><a href=\"https:\/\/www.red-gate.com\/simple-talk\/dotnet\/c-programming\/getting-started-with-cqrs-part-1\/\">Getting Started with CQRS \u2013 Part 1<\/a><\/li>\n<li><a href=\"https:\/\/www.red-gate.com\/simple-talk\/dotnet\/c-programming\/getting-started-with-cqrs-part-2\/\">Getting Started with CQRS \u2013 Part 2<\/a><\/li>\n<li><a href=\"https:\/\/www.red-gate.com\/simple-talk\/dotnet\/c-programming\/getting-started-with-cqrs-part-3\/\">Getting Started with CQRS \u2013 Part 3<\/a><\/li>\n<\/ol>\n\n<p>The first article of the series dove into the main concepts of CQRS along with the project configuration and setup. You learned how to create a basic CRUD-like application with ASP.NET and integrate it with a simple data source structure that relies on the Entity Framework and SQLite.<\/p>\n<p>It\u2019s important to notice, until here, that no CQRS concept was applied yet, since the whole implementation is about CRUD operations. This is intentional, for you to better understand how this pattern can apply in existing REST applications as well as web and even desktop applications that want to take advantage of it.<\/p>\n<p>For the second part of the tutorial, you\u2019ll finish the CQRS configurations by creating the MongoDB code, which is going to be responsible for the <em>query<\/em> side of the pattern. Also, you are going to create the commands that will operate over the creation and update of the data through normal relational database operations; finish the customer controller, which, in turn, will access the commands and queries to serve each appropriate endpoint; and, finally, set up the event-driven architecture with RabbitMQ for the publishing and subscribing of all the events the application has to deal with. The source code is available at the end of the article.<\/p>\n<h2>Creating the Query Side<\/h2>\n<p>Start by creating the MongoDB configurations, necessary for the Query side of the CQRS model. Open the <em>CustomerAPI<\/em> application in Visual you created in the first article and open your <em>Startup.cs<\/em> class. (You can find the code for this article <a href=\"https:\/\/drive.google.com\/open?id=1wLJy8hnoNMQ78_wUa0e4PsU2Ip9vhxvF\">here<\/a>.) Add another transient repository setting, right below the SQLite one:<\/p>\n<pre class=\"lang:c# theme:vs2012\">services.AddTransient&lt;CustomerMongoRepository&gt;();<\/pre>\n<p>Don\u2019t forget the proper using statement at the beginning of the file:<\/p>\n<pre class=\"lang:c# theme:vs2012\">using CustomerApi.Models.Mongo;<\/pre>\n<p>Add a new folder to the project: <em>\\Models\\Mongo<\/em>. Create a new class, <code>CustomerMongoRepository<\/code> in the new folder. This new class will accommodate the same CRUD operations you\u2019ve created in the SQLite repository. However, using the specific configurations provided by the previously imported Mongo driver library. Replace the code:<\/p>\n<pre class=\"lang:c# theme:vs2012\">using System.Collections.Generic;\r\nusing MongoDB.Driver;\r\nnamespace CustomerApi.Models.Mongo\r\n{\r\n    public class CustomerMongoRepository\r\n\t{\r\n\t\tprivate const string _customerDB = \"CustomerDB\";\r\n\t\tprivate const string _customerCollection = \"Customers\";\r\n\t\tprivate IMongoDatabase _db;\r\n\t\tpublic CustomerMongoRepository()\r\n\t\t{\r\n\t\t\tMongoClient _client = new MongoClient(\"mongodb:\/\/localhost:27017\");\r\n\t\t\t_db = _client.GetDatabase(_customerDB);\r\n\t\t}\r\n\t\tpublic List&lt;CustomerEntity&gt; GetCustomers()\r\n\t\t{\r\n\t\t\treturn _db.GetCollection&lt;CustomerEntity&gt;(_customerCollection).Find(_ =&gt; true).ToList();\r\n\t\t}\r\n\t\tpublic CustomerEntity GetCustomer(long id)\r\n\t\t{\r\n\t\t\treturn _db.GetCollection&lt;CustomerEntity&gt;(_customerCollection).Find(customer =&gt; customer.Id == id).SingleOrDefault();\r\n\t\t}\r\n\t\tpublic CustomerEntity GetCustomerByEmail(string email)\r\n\t\t{\r\n\t\t\treturn _db.GetCollection&lt;CustomerEntity&gt;(_customerCollection).Find(customer =&gt; customer.Email == email).Single();\r\n\t\t}\r\n\t\tpublic void Create(CustomerEntity customer)\r\n\t\t{\r\n\t\t\t_db.GetCollection&lt;CustomerEntity&gt;(_customerCollection).InsertOne(customer);\r\n\t\t}\r\n\t\tpublic void Update(CustomerEntity customer)\r\n\t\t{\r\n\t\t\tvar filter = Builders&lt;CustomerEntity&gt;.Filter.Where(_ =&gt; _.Id == customer.Id);\r\n\t\t\t_db.GetCollection&lt;CustomerEntity&gt;(_customerCollection).ReplaceOne(filter, customer);\r\n\t\t}\r\n\t\tpublic void Remove(long id)\r\n\t\t{\r\n\t\t\tvar filter = Builders&lt;CustomerEntity&gt;.Filter.Where(_ =&gt; _.Id == id);\r\n\t\t\tvar operation = _db.GetCollection&lt;CustomerEntity&gt;(_customerCollection).DeleteOne(filter);\r\n\t\t}\r\n\t}\r\n}<\/pre>\n<p>Notice that, this time, you have more operations for retrieving\/searching elements from the database than in the SQLite repository. It happens because, as the Query side of the model, some flexibilities like searching by the id or email must exist in order to accommodate the different types of searches the client may perform.<\/p>\n<p>The other operations will help with tests, like creating, updating and removing items from the database for checking purposes.<\/p>\n<p>Also, it\u2019s important to attain to the name of the database and the collection (<em>_customerDB<\/em> and <em>_customerCollection<\/em>, respectively). They must be exactly the same as the ones you created before at MongoDB Compass in the previous article. Thanks to the lambda functionalities .NET provides, you can pass summarized functions as arguments to most of the methods of MongoDB database.<\/p>\n<p>Next, it\u2019s time to create the Mongo entities, mapping exactly the same values to expose to the clients, in response to the values saved in the SQLite entities. Inside the <em>\\Mongo<\/em> folder, create a new class called <em>CustomerEntity.cs<\/em> and add the following code:<\/p>\n<pre class=\"lang:c# theme:vs2012\">using MongoDB.Bson;\r\nusing MongoDB.Bson.Serialization.Attributes;\r\nusing System.Collections.Generic;\r\nnamespace CustomerApi.Models.Mongo\r\n{\r\n\tpublic class CustomerEntity\r\n    {\r\n\t\t[BsonElement(\"Id\")]\r\n\t\tpublic long Id { get; set; }\r\n\t\t[BsonElement(\"Email\")]\r\n\t\tpublic string Email { get; set; }\r\n\t\t[BsonElement(\"Name\")]\r\n\t\tpublic string Name { get; set; }\r\n\t\t[BsonElement(\"Age\")]\r\n\t\tpublic int Age { get; set; }\r\n\t\t[BsonElement(\"Phones\")]\r\n\t\tpublic List&lt;PhoneEntity&gt; Phones { get; set; }\r\n\t}\r\n}<\/pre>\n<p>The class is basically composed of the same properties found in the corresponding SQLite code. The annotation <code>BsonElement <\/code>is provided by the MongoDB library and specifies the element name of each field of the entity. This is important, because this is how Mongo will identify each field when saving the document-based values to the database through a key-value system. Add another class to folder, <code>PhoneEntity<\/code>.<\/p>\n<p>Here is the code for <em>PhoneEntity.cs<\/em>:<\/p>\n<pre class=\"lang:c# theme:vs2012\">using MongoDB.Bson.Serialization.Attributes;\r\nnamespace CustomerApi.Models.Mongo\r\n{\r\n\tpublic partial class PhoneEntity\r\n    {\r\n\t\t[BsonElement(\"Type\")]\r\n\t\tpublic PhoneType Type { get; set; }\r\n\t\t[BsonElement(\"AreaCode\")]\r\n\t\tpublic int AreaCode { get; set; }\r\n\t\t[BsonElement(\"Number\")]\r\n\t\tpublic int Number { get; set; }\r\n\t}\r\n}<\/pre>\n<p>Both SQLite and MongoDB use the same <em>PhoneType.cs<\/em> class from the previous article. Make sure that it is in the <em>\\Models<\/em> folder.<\/p>\n<h2>Setting up the Event Handler<\/h2>\n<p>In order for RabbitMQ to work in an ASP.NET project, besides having a proper RabbitMQ client dependency configured on the project, it\u2019s also necessary to register the publisher and the subscriber beans in the <em>Startup.cs<\/em> class (<code>ConfigureServices()<\/code> method):<\/p>\n<pre class=\"lang:c# theme:vs2012\">services.AddTransient&lt;AMQPEventPublisher&gt;();\r\nservices.AddSingleton&lt;CustomerMessageListener&gt;();<\/pre>\n<p>Don\u2019t forget to import the proper using statements at the beginning of the class.<\/p>\n<pre class=\"lang:c# theme:vs2012\">using CustomerApi.Models.Mongo;<\/pre>\n<p>Start with the publisher by creating a new <em>\\Events<\/em> folder on the root folder. Then, create a class called <em>AMQPEventPublisher.cs<\/em> and add the following code:<\/p>\n<pre class=\"lang:c# theme:vs2012\">using Microsoft.AspNetCore.Hosting;\r\nusing Microsoft.Extensions.Configuration;\r\nusing Newtonsoft.Json;\r\nusing RabbitMQ.Client;\r\nusing System.Text;\r\nnamespace CustomerApi.Events\r\n{\r\n\tpublic class AMQPEventPublisher\r\n    {\r\n\t\tprivate readonly ConnectionFactory connectionFactory;\r\n\t\tpublic AMQPEventPublisher(IHostingEnvironment env)\r\n\t\t{\r\n\t\t\tconnectionFactory = new ConnectionFactory();\r\n\t\t\tvar builder = new ConfigurationBuilder()\r\n\t\t\t\t.SetBasePath(env.ContentRootPath)\r\n\t\t\t\t.AddJsonFile(\"appsettings.json\", optional: false, reloadOnChange: false)\r\n\t\t\t\t.AddEnvironmentVariables();\r\n\t\t\t\r\n\t\t\tbuilder.Build().GetSection(\"amqp\").Bind(connectionFactory);\r\n\t\t}\r\n\t\tpublic void PublishEvent&lt;T&gt;(T @event) where T : IEvent\r\n\t\t{\r\n\t\t\tusing (IConnection conn = connectionFactory.CreateConnection())\r\n\t\t\t{\r\n\t\t\t\tusing (IModel channel = conn.CreateModel())\r\n\t\t\t\t{\r\n\t\t\t\t\tvar queue = @event is CustomerCreatedEvent ? \r\n\t\t\t\t\t\tConstants.QUEUE_CUSTOMER_CREATED : @event is CustomerUpdatedEvent ? \r\n\t\t\t\t\t\t\tConstants.QUEUE_CUSTOMER_UPDATED : Constants.QUEUE_CUSTOMER_DELETED;\r\n\t\t\t\t\tchannel.QueueDeclare(\r\n\t\t\t\t\t\tqueue: queue,\r\n\t\t\t\t\t\tdurable: false,\r\n\t\t\t\t\t\texclusive: false,\r\n\t\t\t\t\t\tautoDelete: false,\r\n\t\t\t\t\t\targuments: null\r\n\t\t\t\t\t);\r\n\t\t\t\t\tvar body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(@event));\r\n\t\t\t\t\tchannel.BasicPublish(\r\n\t\t\t\t\t\texchange: \"\",\r\n\t\t\t\t\t\troutingKey: queue,\r\n\t\t\t\t\t\tbasicProperties: null,\r\n\t\t\t\t\t\tbody: body\r\n\t\t\t\t\t);\r\n\t\t\t\t}\r\n\t\t\t}\r\n\t\t}\r\n\t}\r\n}<\/pre>\n<p>A few important points to notice here include the <code>ConnectionFactory<\/code> object. It is part of RabbitMQ Client library and responsible for creating and managing the connections, models and channels to publish the events. The <code>ConfigurationBuilder <\/code>helps with importing the proper configurations from the <em>appsettings.json<\/em> file (see its code below). And, finally, the <code>PublishEvent()<\/code> method was built upon a generic implementation; in other words, the generic <em>T<\/em> tells this method to expect any type of object to be published, which makes it also reusable for other more abstract constructions you may want.<\/p>\n<p>Once the connection is opened, and the model is created, you can declare the queue, its configs, and perform a basic publish through the <code>BasicPublish<\/code> method of the model. Notice that three types of events will be published: when a customer is created, updated or deleted. Don\u2019t worry about the compilation errors for now, you\u2019ll soon create those classes as well.<\/p>\n<p>Add the <code>Constants<\/code> class to the <em>\\Events<\/em> folder:<\/p>\n<pre class=\"lang:c# theme:vs2012\">namespace CustomerApi.Events\r\n{\r\n\tpublic class Constants\r\n    {\r\n\t\tpublic const string QUEUE_CUSTOMER_CREATED = \"customer_created\";\r\n\t\tpublic const string QUEUE_CUSTOMER_UPDATED = \"customer_updated\";\r\n\t\tpublic const string QUEUE_CUSTOMER_DELETED = \"customer_deleted\";\r\n\t}\r\n}<\/pre>\n<p>The generic T you\u2019ve seen before is also inheriting from an interface called <code>IEvent<\/code>. It helps to identify what is an event only in the project. Add the <code>IEvent<\/code> class. The following code represents it (yes, just a marking interface):<\/p>\n<pre class=\"lang:c# theme:vs2012\">namespace CustomerApi.Events\r\n{\r\n\tpublic interface IEvent\r\n    {\r\n    }\r\n}<\/pre>\n<p>Now, create the events themselves starting with the <code>CustomerCreatedEvent<\/code>. The structure is very similar to the entities you created before, mainly because no big changes will be made to avoid adding complexity:<\/p>\n<pre class=\"lang:c# theme:vs2012\">using CustomerApi.Models.Mongo;\r\nusing System.Collections.Generic;\r\nusing System.Linq;\r\nnamespace CustomerApi.Events\r\n{\r\n\tpublic class CustomerCreatedEvent : IEvent\r\n\t{\r\n\t\tpublic long Id { get; set; }\r\n\t\tpublic string Email { get; set; }\r\n\t\tpublic string Name { get; set; }\r\n\t\tpublic int Age { get; set; }\r\n\t\tpublic List&lt;PhoneCreatedEvent&gt; Phones { get; set; }\r\n\t\tpublic CustomerEntity ToCustomerEntity()\r\n\t\t{\r\n\t\t\treturn new CustomerEntity\r\n\t\t\t{\r\n\t\t\t\tId = this.Id,\r\n\t\t\t\tEmail = this.Email,\r\n\t\t\t\tName = this.Name,\r\n\t\t\t\tAge = this.Age,\r\n\t\t\t\tPhones = this.Phones.Select(phone =&gt; new PhoneEntity {\r\n\t\t\t\t\tType = phone.Type,\r\n\t\t\t\t\tAreaCode = phone.AreaCode,\r\n\t\t\t\t\tNumber = phone.Number\r\n\t\t\t\t}).ToList()\r\n\t\t\t};\r\n\t\t}\r\n\t}\r\n}<\/pre>\n<p><em>NOTE: Architecturally, it\u2019s common to give event class names of a past nature, meaning that the event has already happened and you\u2019re dealing with something in the past.<\/em><\/p>\n<p>Notice that a method to convert each event to an entity will be useful since they mostly must be persisted after being received and processed (<em>Linq<\/em> will be used as a good option to iterate and perform operations over the lists of your event models).<\/p>\n<p>The phone created event class (it can be just a single value object if you desire):<\/p>\n<pre class=\"lang:c# theme:vs2012\">using CustomerApi.Models;\r\nnamespace CustomerApi.Events\r\n{\r\n\tpublic class PhoneCreatedEvent : IEvent\r\n\t{\r\n\t\tpublic PhoneType Type { get; set; }\r\n\t\tpublic int AreaCode { get; set; }\r\n\t\tpublic int Number { get; set; }\r\n\t}\r\n}<\/pre>\n<p>Next, the updated event class:<\/p>\n<pre class=\"lang:c# theme:vs2012\">using CustomerApi.Models.Mongo;\r\nusing System.Collections.Generic;\r\nusing System.Linq;\r\nnamespace CustomerApi.Events\r\n{\r\n\tpublic class CustomerUpdatedEvent : IEvent\r\n\t{\r\n\t\tpublic long Id { get; set; }\r\n\t\tpublic string Name { get; set; }\r\n\t\tpublic int Age { get; set; }\r\n\t\tpublic List&lt;PhoneCreatedEvent&gt; Phones { get; set; }\r\n\t\tpublic CustomerEntity ToCustomerEntity(CustomerEntity entity)\r\n\t\t{\r\n\t\t\treturn new CustomerEntity\r\n\t\t\t{\r\n\t\t\t\tId = this.Id,\r\n\t\t\t\tEmail = entity.Email,\r\n\t\t\t\tName = entity.Name.Equals(this.Name) ? entity.Name : this.Name,\r\n\t\t\t\tAge = entity.Age.Equals(this.Age) ? entity.Age : this.Age,\r\n\t\t\t\tPhones = GetNewOnes(entity.Phones).Select(phone =&gt; new PhoneEntity { AreaCode = phone.AreaCode, Number = phone.Number }).ToList()\r\n\t\t\t};\r\n\t\t}\r\n\t\tprivate List&lt;PhoneEntity&gt; GetNewOnes(List&lt;PhoneEntity&gt; Phones)\r\n\t\t{\r\n\t\t\treturn Phones.Where(a =&gt; !this.Phones.Any(x =&gt; x.Type == a.Type\r\n\t\t\t\t&amp;&amp; x.AreaCode == a.AreaCode\r\n\t\t\t\t&amp;&amp; x.Number == a.Number)).ToList&lt;PhoneEntity&gt;();\r\n\t\t}\r\n\t}\r\n}<\/pre>\n<p>This class has a structure similar to that of the previous one, except for a bolder approach to deal with the phones updating. You\u2019ll only update phones that had, in fact, suffered any change whether it was in the area code or the number. So, for the update in the database, they\u2019ll be overwritten every time. Plus, don\u2019t forget to always check if each attribute inside the event had really changed compared to the received entity object.<\/p>\n<p>The deleted event class:<\/p>\n<pre class=\"lang:c# theme:vs2012\">namespace CustomerApi.Events\r\n{\r\n\tpublic class CustomerDeletedEvent : IEvent\r\n\t{\r\n\t\tpublic long Id { get; set; }\r\n\t}\r\n}<\/pre>\n<p>The only thing that\u2019s needed to delete an object to the database is the customer id.<\/p>\n<p>Be aware that, in real world applications, it\u2019s against the event-storing patterns to update\/delete data in any situation. Events must be preserved in their original precedence as much as possible, to simulate exactly what happened. Here, the implementation tries to go, initially, through the old-fashion design most applications used to be built.<\/p>\n<p>And finally, create the consumer class, <code>CustomerMessageListene<\/code>r, responsible for listening to the events and processing them:<\/p>\n<pre class=\"lang:c# theme:vs2012\">using CustomerApi.Models.Mongo;\r\nusing Microsoft.Extensions.Configuration;\r\nusing Newtonsoft.Json;\r\nusing RabbitMQ.Client;\r\nusing RabbitMQ.Client.Events;\r\nusing RabbitMQ.Client.MessagePatterns;\r\nusing System;\r\nusing System.Text;\r\nusing System.Threading;\r\nnamespace CustomerApi.Events\r\n{\r\n\tpublic class CustomerMessageListener\r\n\t{\r\n\t\tprivate readonly CustomerMongoRepository _repository;\r\n\t\tpublic CustomerMessageListener(CustomerMongoRepository repository)\r\n\t\t{\r\n\t\t\t_repository = repository;\r\n\t\t}\r\n\t\tpublic void Start(string contentRootPath)\r\n\t\t{\r\n\t\t\tConnectionFactory connectionFactory = new ConnectionFactory();\r\n\t\t\tvar builder = new ConfigurationBuilder()\r\n\t\t\t\t.SetBasePath(contentRootPath)\r\n\t\t\t\t.AddJsonFile(\"appsettings.json\", optional: false, reloadOnChange: false)\r\n\t\t\t\t.AddEnvironmentVariables();\r\n\t\t\tbuilder.Build().GetSection(\"amqp\").Bind(connectionFactory);\r\n\t\t\tconnectionFactory.AutomaticRecoveryEnabled = true;\r\n\t\t\tconnectionFactory.NetworkRecoveryInterval = TimeSpan.FromSeconds(15);\r\n\t\t\tusing (IConnection conn = connectionFactory.CreateConnection())\r\n\t\t\t{\r\n\t\t\t\tusing (IModel channel = conn.CreateModel())\r\n\t\t\t\t{\r\n\t\t\t\t\tDeclareQueues(channel);\r\n\t\t\t\t\tvar subscriptionCreated = new Subscription(channel, Constants.QUEUE_CUSTOMER_CREATED, false);\r\n\t\t\t\t\tvar subscriptionUpdated = new Subscription(channel, Constants.QUEUE_CUSTOMER_UPDATED, false);\r\n\t\t\t\t\tvar subscriptionDeleted = new Subscription(channel, Constants.QUEUE_CUSTOMER_DELETED, false);\r\n\t\t\t\t\twhile (true)\r\n\t\t\t\t\t{\r\n                        \/\/ Sleeps for 5 sec before trying again\r\n\t\t\t\t\t\tThread.Sleep(5000);\r\n\t\t\t\t\t\tnew Thread(() =&gt;\r\n\t\t\t\t\t\t{\r\n\t\t\t\t\t\t\tListerCreated(subscriptionCreated);\r\n\t\t\t\t\t\t}).Start();\r\n\t\t\t\t\t\tnew Thread(() =&gt;\r\n\t\t\t\t\t\t{\r\n\t\t\t\t\t\t\tListenUpdated(subscriptionUpdated);\r\n\t\t\t\t\t\t}).Start();\r\n\t\t\t\t\t\tnew Thread(() =&gt;\r\n\t\t\t\t\t\t{\r\n\t\t\t\t\t\t\tListenDeleted(subscriptionDeleted);\r\n\t\t\t\t\t\t}).Start();\r\n\t\t\t\t\t}\r\n\t\t\t\t}\r\n\t\t\t}\r\n\t\t}\r\n\t\tprivate void ListenDeleted(Subscription subscriptionDeleted)\r\n\t\t{\r\n\t\t\tBasicDeliverEventArgs eventArgsDeleted = subscriptionDeleted.Next();\r\n\t\t\tif (eventArgsDeleted != null)\r\n\t\t\t{\r\n\t\t\t\tstring messageContent = Encoding.UTF8.GetString(eventArgsDeleted.Body);\r\n\t\t\t\tCustomerDeletedEvent _deleted = JsonConvert.DeserializeObject&lt;CustomerDeletedEvent&gt;(messageContent);\r\n\t\t\t\t_repository.Remove(_deleted.Id);\r\n                subscriptionDeleted.Ack(eventArgsDeleted);\r\n\t\t\t}\r\n\t\t}\r\n\t\tprivate void ListenUpdated(Subscription subscriptionUpdated)\r\n\t\t{\r\n\t\t\tBasicDeliverEventArgs eventArgsUpdated = subscriptionUpdated.Next();\r\n\t\t\tif (eventArgsUpdated != null)\r\n\t\t\t{\r\n\t\t\t\tstring messageContent = Encoding.UTF8.GetString(eventArgsUpdated.Body);\r\n\t\t\t\tCustomerUpdatedEvent _updated = JsonConvert.DeserializeObject&lt;CustomerUpdatedEvent&gt;(messageContent);\r\n\t\t\t\t_repository.Update(_updated.ToCustomerEntity(_repository.GetCustomer(_updated.Id)));\r\n                subscriptionUpdated.Ack(eventArgsUpdated);\r\n\t\t\t}\r\n\t\t}\r\n\t\tprivate void ListerCreated(Subscription subscriptionCreated)\r\n\t\t{\r\n\t\t\tBasicDeliverEventArgs eventArgsCreated = subscriptionCreated.Next();\r\n\t\t\tif (eventArgsCreated != null)\r\n\t\t\t{\r\n\t\t\t\tstring messageContent = Encoding.UTF8.GetString(eventArgsCreated.Body);\r\n\t\t\t\tCustomerCreatedEvent _created = JsonConvert.DeserializeObject&lt;CustomerCreatedEvent&gt;(messageContent);\r\n\t\t\t\t_repository.Create(_created.ToCustomerEntity());\r\n\t\t\t\tsubscriptionCreated.Ack(eventArgsCreated);\r\n\t\t\t}\r\n\t\t}\r\n\t\tprivate static void DeclareQueues(IModel channel)\r\n\t\t{\r\n\t\t\tchannel.QueueDeclare(\r\n\t\t\t\tqueue: Constants.QUEUE_CUSTOMER_CREATED,\r\n\t\t\t\tdurable: false,\r\n\t\t\t\texclusive: false,\r\n\t\t\t\tautoDelete: false,\r\n\t\t\t\targuments: null\r\n\t\t\t);\r\n\t\t\tchannel.QueueDeclare(\r\n\t\t\t\tqueue: Constants.QUEUE_CUSTOMER_UPDATED,\r\n\t\t\t\tdurable: false,\r\n\t\t\t\texclusive: false,\r\n\t\t\t\tautoDelete: false,\r\n\t\t\t\targuments: null\r\n\t\t\t);\r\n\t\t\tchannel.QueueDeclare(\r\n\t\t\t\tqueue: Constants.QUEUE_CUSTOMER_DELETED,\r\n\t\t\t\tdurable: false,\r\n\t\t\t\texclusive: false,\r\n\t\t\t\tautoDelete: false,\r\n\t\t\t\targuments: null\r\n\t\t\t);\r\n\t\t}\r\n\t}\r\n}<\/pre>\n<p>Please, keep in mind that for most real CQRS applications, the event subscriber objects\/components are usually built out of the publisher application for resiliency purposes. Here, they\u2019ll be together just for simplification.<\/p>\n<p>The class starts with the same RabbitMQ connection factory configs, connections and models. However, two new configs are used:<\/p>\n<ul>\n<li><code>AutomaticRecoveryEnabled<\/code>: its value is already true, but you\u2019ll use it here to enforce the setup of the auto recovery feature, that instructs RabbitMQ to retry the current connection once it is lost or dealing with connectivity issues;<\/li>\n<li><code>NetworkRecoveryInterval<\/code>: defines the time it will wait to retry the connection when facing network issues.<\/li>\n<\/ul>\n<p>There are several approaches, including external frameworks, to consume events from a queue in RabbitMQ. The focus is on simplicity, so basically the queues are going to be declared (just in case they don\u2019t exist yet in RabbitMQ server, so they\u2019ll be automatically created), along with subscription objects.<\/p>\n<p>Subscription objects control a model channel over a specific queue, as well as the acknowledge mode the delivery will be made with. In the example, each queue has its own subscription object set with <code>autoAck<\/code> mode to <em>false<\/em>, so you\u2019re the one who must manually acknowledge the channel once the event consuming is done.<\/p>\n<p>Then, an infinite loop will happen (with 5 seconds of delay from one iteration to another) opening three different threads, one for each event type can be received over the subscription objects. The reason for the loop is because new events can arrive at any moment, so you have to guarantee they will be consumed. And the threads are just to make sure no event will delay or alter the state of the others.<\/p>\n<p>In the end, the only action to be performed will be the respective operation in the repository object.<\/p>\n<h2>Setting up the Commands<\/h2>\n<p>The <em>Command<\/em> pattern is well known to every OO developer, mainly because it helps to construct structures to process separately objects that act in similar ways. In this case, start with the abstract class that\u2019ll represent it. Create a new folder \\Commands and add the Command class.<\/p>\n<pre class=\"lang:c# theme:vs2012\">namespace CustomerApi.Commands\r\n{\r\n    public abstract class Command\r\n\t{\r\n\t\tpublic long Id { get; set; }\r\n\t}\r\n}<\/pre>\n<p>The <code>Id<\/code> will be useful since it is used everywhere to identify a customer. Then, the customer created command object, <code>CreateCustomerCommand<\/code> class, that\u2019ll support the data transit:<\/p>\n<pre class=\"lang:c# theme:vs2012\">using CustomerApi.Events;\r\nusing CustomerApi.Models.SQLite;\r\nusing System.Collections.Generic;\r\nusing System.Linq;\r\nnamespace CustomerApi.Commands\r\n{\r\n\tpublic class CreateCustomerCommand : Command\r\n\t{\r\n\t\tpublic string Name { get; set; }\r\n\t\tpublic string Email { get; set; }\r\n\t\tpublic int Age { get; set; }\r\n\t\tpublic List&lt;CreatePhoneCommand&gt; Phones { get; set; }\r\n\t\tpublic CustomerCreatedEvent ToCustomerEvent(long id)\r\n\t\t{\r\n\t\t\treturn new CustomerCreatedEvent\r\n\t\t\t{\r\n\t\t\t\tId = id,\r\n\t\t\t\tName = this.Name,\r\n\t\t\t\tEmail = this.Email,\r\n\t\t\t\tAge = this.Age,\r\n\t\t\t\tPhones = this.Phones.Select(phone =&gt; new PhoneCreatedEvent { AreaCode = phone.AreaCode, Number = phone.Number }).ToList()\r\n\t\t\t};\r\n\t\t}\r\n\t\tpublic CustomerRecord ToCustomerRecord()\r\n\t\t{\r\n\t\t\treturn new CustomerRecord\r\n\t\t\t{\r\n\t\t\t\tName = this.Name,\r\n\t\t\t\tEmail = this.Email,\r\n\t\t\t\tAge = this.Age,\r\n\t\t\t\tPhones = this.Phones.Select(phone =&gt; new PhoneRecord { AreaCode = phone.AreaCode, Number = phone.Number }).ToList()\r\n\t\t\t};\r\n\t\t}\r\n\t}\r\n}<\/pre>\n<p>They are very similar to the event objects regarding the attributes but carry auxiliary methods to construct SQLite records and event objects, translating from one type of model schema to another.<\/p>\n<p>Notice that, here, you\u2019re using a second command for phone data, <code>CreatePhoneCommand<\/code>. Create the class and add the following code:<\/p>\n<pre class=\"lang:c# theme:vs2012\">using CustomerApi.Models;\r\nnamespace CustomerApi.Commands\r\n{\r\n\tpublic class CreatePhoneCommand : Command\r\n\t{\r\n\t\tpublic PhoneType Type { get; set; }\r\n\t\tpublic int AreaCode { get; set; }\r\n\t\tpublic int Number { get; set; }\r\n\t}\r\n}<\/pre>\n<p>See the code for the updated and deleted commands, as well:<\/p>\n<pre class=\"lang:c# theme:vs2012\">using CustomerApi.Events;\r\nusing CustomerApi.Models.SQLite;\r\nusing System.Collections.Generic;\r\nusing System.Linq;\r\nnamespace CustomerApi.Commands\r\n{\r\n\tpublic class UpdateCustomerCommand : Command\r\n\t{\r\n\t\tpublic string Name { get; set; }\r\n\t\tpublic int Age { get; set; }\r\n\t\tpublic List&lt;CreatePhoneCommand&gt; Phones { get; set; }\r\n\t\tpublic CustomerUpdatedEvent ToCustomerEvent()\r\n\t\t{\r\n\t\t\treturn new CustomerUpdatedEvent\r\n\t\t\t{\r\n\t\t\t\tId = this.Id,\r\n\t\t\t\tName = this.Name,\r\n\t\t\t\tAge = this.Age,\r\n\t\t\t\tPhones = this.Phones.Select(phone =&gt; new PhoneCreatedEvent {\r\n\t\t\t\t\tType = phone.Type,\r\n\t\t\t\t\tAreaCode = phone.AreaCode,\r\n\t\t\t\t\tNumber = phone.Number\r\n\t\t\t\t}).ToList()\r\n\t\t\t};\r\n\t\t}\r\n\t\tpublic CustomerRecord ToCustomerRecord(CustomerRecord record)\r\n\t\t{\r\n\t\t\trecord.Name = this.Name;\r\n\t\t\trecord.Age = this.Age;\r\n\t\t\trecord.Phones = this.Phones.Select(phone =&gt; new PhoneRecord\r\n\t\t\t\t{\r\n\t\t\t\t\tType = phone.Type,\r\n\t\t\t\t\tAreaCode = phone.AreaCode,\r\n\t\t\t\t\tNumber = phone.Number\r\n\t\t\t\t}).ToList()\r\n\t\t\t\t;\r\n\t\t\treturn record;\r\n\t\t}\r\n\t}\r\n}<\/pre>\n<p>For the deleted one, remember that only the id is important to be passed on:<\/p>\n<pre class=\"lang:c# theme:vs2012\">using CustomerApi.Events;\r\nnamespace CustomerApi.Commands\r\n{\r\n\tpublic class DeleteCustomerCommand : Command\r\n\t{\r\n\t\tinternal CustomerDeletedEvent ToCustomerEvent()\r\n\t\t{\r\n\t\t\treturn new CustomerDeletedEvent\r\n\t\t\t{\r\n\t\t\t\tId = this.Id\r\n\t\t\t};\r\n\t\t}\r\n\t}\r\n}<\/pre>\n<p>All the command objects need a handler object, that will take care of receiving and distributing the operations through the different service\/repository layers. Start with the command handler interface code:<\/p>\n<pre class=\"lang:c# theme:vs2012\">namespace CustomerApi.Commands\r\n{\r\n\tpublic interface ICommandHandler&lt;T&gt; where T : Command\r\n\t{\r\n\t\tvoid Execute(T command);\r\n\t}\r\n}<\/pre>\n<p>The method <code>Execute()<\/code> receives the specific command represented by its interface declaration and calls the proper operations for each one. See the following code for the implementation:<\/p>\n<pre class=\"lang:c# theme:vs2012\">using CustomerApi.Events;\r\nusing CustomerApi.Models.SQLite;\r\nusing System;\r\nnamespace CustomerApi.Commands\r\n{\r\n\tpublic class CustomerCommandHandler : ICommandHandler&lt;Command&gt;\r\n\t{\r\n\t\tprivate CustomerSQLiteRepository _repository;\r\n\t\tprivate AMQPEventPublisher _eventPublisher;\r\n\t\tpublic CustomerCommandHandler(AMQPEventPublisher eventPublisher, CustomerSQLiteRepository repository)\r\n\t\t{\r\n\t\t\t_eventPublisher = eventPublisher;\r\n\t\t\t_repository = repository;\r\n\t\t}\r\n\t\tpublic void Execute(Command command)\r\n\t\t{\r\n\t\t\tif (command == null)\r\n\t\t\t{\r\n\t\t\t\tthrow new ArgumentNullException(\"command is null\");\r\n\t\t\t}\r\n\t\t\tif (command is CreateCustomerCommand createCommand)\r\n\t\t\t{\r\n\t\t\t\tCustomerRecord created = _repository.Create(createCommand.ToCustomerRecord());\r\n\t\t\t\t_eventPublisher.PublishEvent(createCommand.ToCustomerEvent(created.Id));\r\n\t\t\t}\r\n\t\t\telse if (command is UpdateCustomerCommand updateCommand)\r\n\t\t\t{\r\n\t\t\t\tCustomerRecord record = _repository.GetById(updateCommand.Id);\r\n\t\t\t\t_repository.Update(updateCommand.ToCustomerRecord(record));\r\n\t\t\t\t_eventPublisher.PublishEvent(updateCommand.ToCustomerEvent());\r\n\t\t\t}\r\n\t\t\telse if (command is DeleteCustomerCommand deleteCommand)\r\n\t\t\t{\r\n\t\t\t\t_repository.Remove(deleteCommand.Id);\r\n\t\t\t\t_eventPublisher.PublishEvent(deleteCommand.ToCustomerEvent());\r\n\t\t\t}\r\n\t\t}\r\n\t}\r\n}<\/pre>\n<p>Here, roughly, you can already see the command running the code that\u2019ll deal with relational database operations, and its segregation from the event publishing, always at the end of each command processing.<\/p>\n<p>Finally, you need to add the handler as a scoped service. So, open the <em>Startup.cs<\/em> file again and add the following <em>using<\/em> command to the beginning of the file and <code>AddScope()<\/code> operation to the <code>ConfigureServices()<\/code> method:<\/p>\n<pre class=\"lang:c# theme:vs2012\">using CustomerApi.Commands;\r\n\/\/ \u2026\r\nservices.AddScoped&lt;ICommandHandler&lt;Command&gt;, CustomerCommandHandler&gt;();<\/pre>\n<h2>Testing the New Controller<\/h2>\n<p>To make sure the new structures are fully working together, change the current customer controller to support the new commands, along with the proper REST results. This is the final <code>CustomersController <\/code>class:<\/p>\n<pre class=\"lang:c# theme:vs2012\">using CustomerApi.Commands;\r\nusing CustomerApi.Models;\r\nusing CustomerApi.Models.Mongo;\r\nusing CustomerApi.Models.SQLite;\r\nusing Microsoft.AspNetCore.Mvc;\r\nusing System.Collections.Generic;\r\nnamespace CustomerApi.Controllers\r\n{\r\n    [Route(\"api\/[controller]\")]\r\n    public class CustomersController : Controller\r\n    {\r\n        private readonly ICommandHandler&lt;Command&gt; _commandHandler;\r\n        private readonly CustomerMongoRepository _mongoRepository;\r\n        private readonly CustomerSQLiteRepository _sqliteRepository;\r\n        public CustomersController(ICommandHandler&lt;Command&gt; commandHandler,\r\n            CustomerSQLiteRepository sqliteRepository,\r\n            CustomerMongoRepository repository)\r\n        {\r\n            _commandHandler = commandHandler;\r\n            _sqliteRepository = sqliteRepository;\r\n            _mongoRepository = repository;\r\n            if (_mongoRepository.GetCustomers().Count == 0)\r\n            {\r\n                var customerCmd = new CreateCustomerCommand\r\n                {\r\n                    Name = \"George Michaels\",\r\n                    Email = \"george@email.com\",\r\n                    Age = 23,\r\n                    Phones = new List&lt;CreatePhoneCommand&gt;\r\n                    {\r\n                        new CreatePhoneCommand { Type = PhoneType.CELLPHONE, AreaCode = 123, Number = 7543010 }\r\n                    }\r\n                };\r\n                _commandHandler.Execute(customerCmd);\r\n            }\r\n        }\r\n        [HttpGet]\r\n        public List&lt;CustomerEntity&gt; Get()\r\n        {\r\n            return _mongoRepository.GetCustomers();\r\n        }\r\n        [HttpGet(\"{id}\", Name = \"GetCustomer\")]\r\n        public IActionResult GetById(long id)\r\n        {\r\n            var product = _mongoRepository.GetCustomer(id);\r\n            if (product == null)\r\n            {\r\n                return NotFound();\r\n            }\r\n            return new ObjectResult(product);\r\n        }\r\n        [HttpGet(\"{email}\")]\r\n        public IActionResult GetByEmail(string email)\r\n        {\r\n            var product = _mongoRepository.GetCustomerByEmail(email);\r\n            if (product == null)\r\n            {\r\n                return NotFound();\r\n            }\r\n            return new ObjectResult(product);\r\n        }\r\n        [HttpPost]\r\n        public IActionResult Post([FromBody] CreateCustomerCommand customer)\r\n        {\r\n            _commandHandler.Execute(customer);\r\n            return CreatedAtRoute(\"GetCustomer\", new { id = customer.Id }, customer);\r\n        }\r\n        [HttpPut(\"{id}\")]\r\n        public IActionResult Put(long id, [FromBody] UpdateCustomerCommand customer)\r\n        {\r\n            var record = _sqliteRepository.GetById(id);\r\n            if (record == null)\r\n            {\r\n                return NotFound();\r\n            }\r\n            customer.Id = id;\r\n            _commandHandler.Execute(customer);\r\n            return NoContent();\r\n        }\r\n        [HttpDelete(\"{id}\")]\r\n        public IActionResult Delete(long id)\r\n        {\r\n            var record = _sqliteRepository.GetById(id);\r\n            if (record == null)\r\n            {\r\n                return NotFound();\r\n            }\r\n            _commandHandler.Execute(new DeleteCustomerCommand()\r\n            {\r\n                Id = id\r\n            });\r\n            return NoContent();\r\n        }\r\n    }\r\n}<\/pre>\n<p>Basically, what changed is that the commands are now being handled by the REST automatic conversions and a new customer is being added once the constructor of the class runs, to make sure you\u2019ll have some data in the database as soon as the application starts.<\/p>\n<p>Before you run the application, make sure everything is up and running (the MongoDB and RabbitMQ servers) and, once Visual Studio finishes the starting up, you\u2019d have the data stored in the SQLite database (just like you can see in Figure 1).<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" width=\"553\" height=\"132\" class=\"wp-image-82710\" src=\"https:\/\/www.red-gate.com\/simple-talk\/wp-content\/uploads\/2018\/12\/word-image-223.png\" \/><\/p>\n<p class=\"caption\">Figure 1. SQLite data after app started up.<\/p>\n<p>Then, after persisting to the SQLite database, the <code>customer_created<\/code> event will be published. The RabbitMQ queue will receive the event (Figure 2), prepare a consumer to deliver it directly to the message listener.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"alignnone size-full wp-image-85109\" src=\"https:\/\/www.red-gate.com\/simple-talk\/wp-content\/uploads\/2018\/12\/Figure02.png\" alt=\"\" width=\"1097\" height=\"656\" \/><\/p>\n<p class=\"caption\">Figure 2. RabbitMQ event consuming.<\/p>\n<p>And finally, when the event is received at the listener, process the MongoDB data persisting, to guarantee the data is safe there, all in an asynchronous way.<\/p>\n<p><img loading=\"lazy\" decoding=\"async\" width=\"1322\" height=\"194\" class=\"wp-image-82711\" src=\"https:\/\/www.red-gate.com\/simple-talk\/wp-content\/uploads\/2018\/12\/word-image-224.png\" \/><\/p>\n<p class=\"caption\">Figure 3. MongoDB data after event being consumed.<\/p>\n<h2>Conclusion<\/h2>\n<p>It is now time to go and create more tests. Make sure to try each of your REST API operations with Postman (like the POST to create a new customer), checking to make sure each customer was stored in the SQLite database and, then, posted to RabbitMQ and, subsequently, to the MongoDB database asynchronously.<\/p>\n<p>Some other structures of the architecture could be improved as well. If the application itself (or one of the nodes it is hosted when in a clustered environment) crashes in the very right moment the event would be published. For this, a couple of options are available such as considering a bigger transactional design that thinks of the processing as a whole that\u2019s complete only when everything important is done. Or you could pick up a different messaging tool that guarantees, with more consistency, the delivery of the events (like Apache Kafka, for example, a very popular player at the moment). Just be sure to play with different scenarios, considering that they can happen in the real world.<\/p>\n<p>The next part of this series will explore a totally new architecture considering the paradigm of Event Sourcing and Processing to map the events the application receives, reduce them when necessary, and make the application more scalable.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>In this second article of the series, Diogo Souza walks you through creating the MongoDB code, the query side of the pattern, to complete the CQRS configuration. &hellip;<\/p>\n","protected":false},"author":320401,"featured_media":0,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"_acf_changed":false,"footnotes":""},"categories":[143538,53],"tags":[],"coauthors":[60461],"class_list":["post-82709","post","type-post","status-publish","format-standard","hentry","category-dotnet-development","category-featured"],"acf":[],"_links":{"self":[{"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/posts\/82709","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/users\/320401"}],"replies":[{"embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/comments?post=82709"}],"version-history":[{"count":4,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/posts\/82709\/revisions"}],"predecessor-version":[{"id":85110,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/posts\/82709\/revisions\/85110"}],"wp:attachment":[{"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/media?parent=82709"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/categories?post=82709"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/tags?post=82709"},{"taxonomy":"author","embeddable":true,"href":"https:\/\/www.red-gate.com\/simple-talk\/wp-json\/wp\/v2\/coauthors?post=82709"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}