Skip to content

Connecting Microservices with Integration Events and the Azure Service Bus

An integration event is a type of communication between microservices that is dependent on the occurrence of events.

When a significant event happens, such as a change to vital data within a microservice data store, a microservice publishes an event to the world. Instead of publishing this event directly to another microservice, which would result in a reliance between our microservice and the other microservice, we publish the event to a service bus queue. Any receiving microservice that subscribes to the event from the queue would then be notified of the occurrence of the event.

An example of how integration events are commonly used is to synchronize data between two data sources that are located in two distinct microservices. The following diagram depicts the architecture of this scenario:

The steps taken to update data in the initial data store and to replicate into the destination data store are:

  1. Client application updates book data.
  2. Book Catalog API method UpdateBook() is called.
  3. Book Catalog API updates the source database table.
  4. Book Catalog API method UpdateBook() publishes change event to Service Bus Queue.
  5. Azure Function is triggered by the Service Bus.
  6. Azure Function calls Loan API UpdateBook() method.
  7. Loan API UpdateBook() method updates the destination database table.

Below is the definition of an integration change events for new and changed records. These will be used to notify any event receivers of a change in price of a product:

				
					namespace BookLoan.Catalog.API.Events
{
    [Serializable]
    public class BookInventoryChangedIntegrationEvent: IntegrationEvent
    {
        public int BookId { get; private set; }
        public string NewEdition { get; private set; }
        public string OldEdition { get; private set; }
 
        public BookInventoryChangedIntegrationEvent(int bookId, string newEdition,
            string oldEdition)
        {
            BookId = bookId;
            NewEdition = newEdition;
            OldEdition = oldEdition;
        }
    }
 
    [Serializable]
    public class BookInventoryNewIntegrationEvent: IntegrationEvent
    {
        public int BookId { get; private set; }
        public BookViewModel newBook { get; private set; }
 
        public BookInventoryNewIntegrationEvent(int bookId, BookViewModel newBookModel)
        {
            BookId = bookId;
            newBook = newBookModel;
        }
    }
}

//The base integration event type is defined as shown: 

namespace BookLoan.Catalog.API.Events
{
    [Serializable]
    public class IntegrationEvent
    {
        public IntegrationEvent()
        {
            Id = Guid.NewGuid();
            CreationDate = DateTime.UtcNow;
        }
 
        [JsonConstructor]
        public IntegrationEvent(Guid id, DateTime createDate)
        {
            Id = id;
            CreationDate = createDate;
        }
 
        [JsonProperty]
        public Guid Id { get; private set; }
 
        [JsonProperty]
        public DateTime CreationDate { get; private set; }
    }
}
				
			

Before we progress to look at the implementation, we will review our approach.

Relating microservice data consistency and the CAP theorem to our approach

The CAP theorem says that you cannot build a distributed database or a microservice that owns its model that is continually available, strongly consistent, and tolerant to any partition. You must choose two of these three properties.

With microservice architectures, availability and tolerance are the recommended properties with eventual consistency. What this means is that we can use a queue-based Publisher-Subscriber pattern with integration events as one method to get our distributed data stores back to a consistent level.

Another approach is the Event Sourcing pattern which requires its own data source to store domain events and apply these transactions to target datastores to achieve consistency.

Publishing integration events to the Azure Service Bus from a microservice API

When an event is to be propagated through to other microservices, the source microservice will publish an integration event (such as the one described above) to a service bus queue.

An example of setting up and publishing an integration event is when we update a book. If there are any changes these are published to the service bus queue as shown below:

				
					public class BookService: IBookService
 {
        ApplicationDbContext _db;
        private readonly ILogger _logger;
 
        private IEventBus _eventBus;
 
        public BookService(ApplicationDbContext db, 
            ILogger<BookService> logger,
            IEventBus eventBus)
        {
            _db = db;
            _logger = logger;
            _eventBus = eventBus;
        }
 
    ..
    ..
        public async Task<BookViewModel> UpdateBook(int Id, BookViewModel vm)
        {
            BookViewModel book = await _db.Books.Where(a => a.ID == Id).SingleOrDefaultAsync();
            if (book != null)
            {
                string originalEdition = book.Edition;
                book.Title = vm.Title;
                book.Author = vm.Author;
                book.Edition = vm.Edition;
                book.Genre = vm.Genre;
                book.ISBN = vm.ISBN;
                book.Location = vm.Location;
                book.YearPublished = vm.YearPublished;
                book.DateUpdated = DateTime.Now;
                _db.Update(book);
                await _db.SaveChangesAsync();
 
                // Detect a changed field.
                if (vm.Edition != originalEdition)
                {
                    BookInventoryChangedIntegrationEvent bookInventoryChangedIntegrationEvent
                        = new BookInventoryChangedIntegrationEvent(Id, book.Edition, originalEdition);                  
                    await _eventBus.Publish(bookInventoryChangedIntegrationEvent);
                    await Task.Delay(2000);
                }
            }
            return book;
        }
    ..
}
				
			

A basic event bus implementation is shown below:

				
					namespace BookLoan.Catalog.API.Events
{
    public class AzureEventBus : IEventBus
    {
        private MessageBusQueueHelper messageBusHelper;
        public Func<Message, CancellationToken, Task> ReceiveMessageHandler;
 
        public AzureEventBus() 
        {
            messageBusHelper = new MessageBusQueueHelper();
        }
 
        public async Task Publish(IntegrationEvent @event)
        {
            await messageBusHelper.InitSendMessages(@event);
        }
     ..
    }
}
				
			
				
					//In our service bus helper, we can send out service bus messages using JSON follows:

public async Task SendMessagesAsync(object newmessage)
{
  try
  {
    if (_messageBusFormat == MessageBusFormat.Binary)
    {
      // Create a new message to send to the queue
      byte[] messagebytes = SerializationHelpers.SerializeToByteArray(newmessage);
      var message = new Message(messagebytes);
      await _queueClient.SendAsync(message);
      IsMessageSent = true;
    }
    else
    if (_messageBusFormat == MessageBusFormat.JSON)
    {
      var message = new Message(
        Encoding.UTF8.GetBytes(
          JsonConvert.SerializeObject(newmessage)));
      message.ContentType = "text/plain";
      await _queueClient.SendAsync(message);
      IsMessageSent = true;
    }
  }
  catch (Exception exception)
  {
  Console.WriteLine($"{DateTime.Now} :: Exception{exception.Message}");
  }
}
				
			

Where newmessage is our instantiated object.

Using Azure Functions to synchronize data across microservices

The most obvious solution to synchronizing data across microservices is to subscribe to the event bus directly from the receiving API. However, this solution would not achieve the desired level of decoupling as the receiving API would need to be running as a background service and not as a transient API service. When the service is transient, there is no guarantee the data will be synchronized with the desired level of frequency.

An alternative to running the service bus receiver within a windows service or background task is to utilize an Azure serverless function to obtain the integration event from the service bus queue and process the data.

An example of an Azure Service Bus trigger function that will process our integration event from our source API is shown below:

				
					using System;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.InteropExtensions;
using Newtonsoft.Json;
using System.Text;
 
using BookLoan.Catalog.API.Events;
using BookLoanMicroservices.Helpers;
 
namespace BookLoanServiceBusQueueTriggerFunctionApp
{
    public static class ServiceBusQueueTriggerFunction
    {
        [FunctionName("ServiceBusQueueTriggerFunction")]
        public static void Run([ServiceBusTrigger("xxxxxxxqueue",
            Connection = "AzureWebJobsServiceBusQueue")]
Message myQueueItem, ILogger log)
        {
            // Deserialize the body of the message..
            var body = Encoding.UTF8.GetString(myQueueItem.Body);
            BookInventoryChangedIntegrationEvent bookInventoryChangedIntegrationEvent = JsonConvert.DeserializeObject<BookInventoryChangedIntegrationEvent>(body);
            log.LogInformation($"C# ServiceBus queue trigger function processed message: { bookInventoryChangedIntegrationEvent.BookId}");
            log.LogInformation($"C# ServiceBus queue trigger function processed message old edition: {bookInventoryChangedIntegrationEvent.OldEdition}");
            log.LogInformation($"C# ServiceBus queue trigger function processed message new edition: {bookInventoryChangedIntegrationEvent.NewEdition}");
        }
    }
}
				
			

Testing a trigger function for Azure Service Bus can be run as an emulated function host console within Visual Studio as shown:

First set a break point within the Azure trigger method in Visual Studio.

Now using POSTMAN or a client application we submit a record to our API for update.

After a successful update we will see the updated data in the request pane:

With our message being successfully posted into the queue, the break point within the azure trigger method will be hit and the message parameter can be inspected as shown:

And the Body property of the message will show the JSON content of our posted record:

The message, if it shows as JSON text, can then be deserialized to the destination object we had submitted from the client application or API:

The trigger function console log will show the method log messages:

				
					[8/04/2020 5:15:49 AM] Executing 'ServiceBusQueueTriggerFunction' (Reason='New ServiceBus message detected on 'xxxxxxxxqueue'.', Id=582e98b4-3d49-4fed-9e97-630f6b76d6d9)
[8/04/2020 5:15:49 AM] Trigger Details: MessageId: 0d0e112ccca649b39dc8e10c9bcfeb38, DeliveryCount: 2, EnqueuedTime: 8/04/2020 5:11:38 AM, LockedUntil: 8/04/2020 5:12:38 AM
[8/04/2020 5:15:50 AM] C# ServiceBus queue trigger function processed message: 2
[8/04/2020 5:15:53 AM] C# ServiceBus queue trigger function processed message old edition: 5
[8/04/2020 5:16:05 AM] C# ServiceBus queue trigger function processed message: 2
[8/04/2020 5:16:06 AM] C# ServiceBus queue trigger function processed message new edition: 6
[8/04/2020 5:16:06 AM] C# ServiceBus queue trigger function processed message old edition: 5
[8/04/2020 5:17:15 AM] C# ServiceBus queue trigger function processed message new edition: 6
[8/04/2020 5:17:15 AM] Executed 'ServiceBusQueueTriggerFunction' (Succeeded, Id=582e98b4-3d49-4fed-9e97-630f6b76d6d9)
				
			

Additionally, the procedure may be expanded to do additional data updates in order to synchronize our data and to perform other housekeeping or business processes. The most fundamental example of service bus messaging send and receive uses a simple string, as seen in this example.

As seen here, we can use the Azure service bus queue to push more complicated types across the network. This technique would be relevant in a more realistic business microservice application context.

That concludes today’s blog article.

I really hope you found this post to be useful and educational.

Leave a Reply

Your email address will not be published.