Publishing to Multiple Topics in Azure Service Bus

Multiple topics

There are two main approaches when it comes to topics in Azure Service Bus. Depending what's your architecture and needs you can have either:

  • Filtered topic - this means you have only one topic for all of your event types. All publishers push messages onto that single topic. Some of the message fields and properties are then used for filtering by the subscribers since in most cases a subscriber will be interested only in certain types of messages.
  • Multiple topics - you have multiple topics for different domain events or channels of your system. You can still leverage the filtering to have more granular control in your subscribers. You can also have topics based on priority, event types etc.

Our main concern in this post is the second use case and how to build a nice ITopicClient factory inside a .NET Core project. I've built my example with Azure Functions, but 99.9% of the code is completely identical to what you would do in a normal .NET Core web service.

Basic Setup

If you haven't used ITopicClient and Azure Service Bus I recommend you read the following two articles before continuing:

Now that we know what libraries we'll be using we can start building our solution. The first step is to create our TopicClient which is easy enough

var topicClient = new TopicClient("AzureServiceBusConnectionString", "TopicName");

The two things we need are the ASB connection string and the topic to which we would like to publish messages.

Let's get the boilerplate code that is going to be used onward out of the way first. You'll need to have something like that in your appsettings.json file

{
    "ServiceBus": {
        "Endpoint": "AsbFooBarEndpoint",
        "FooTopic": "foo",
        "BarTopic": "bar"
    }
}

And a simple POCO that you can map it to

public class ServiceBusOptions
{
    public string Endpoint { get; set; }

    public string FooTopic { get; set; }

    public string BarTopic { get; set; }
}

Then you register your options in the startup

private void BindConfigurationOptions(IServiceCollection services)
{
    services.Configure<ServiceBusOptions>(options =>
        Configuration.GetSection("ServiceBus")
            .Bind(options));
}

The naïve approach

If we were doing the first example (filtered topic) we are pretty much done. You can register your ITopicClient as a singleton and inject it in your DI container.

// This is the Azure Functions host builder
// In a web service you'll have the IServiceCollection
public static void Configure(IFunctionsHostBuilder builder)
{
    builder.Services.AddSingleton<ITopicClient>(x =>
    {
        var options = x.GetRequiredService<IOptions<ServiceBusOptions>>();
        return new new TopicClient(options.Value.Endpoint, options.Value.FooTopic);
    });
}

Finally you just use the injected client to send messages to the Foo topic

// Code below uses a MediatR handler as an example
public class FooHandler : IRequestHandler<Foo, bool>
{
    private readonly ITopicClient topicClient;

    public FooHandler(ITopicClient topicClient)
    {
        this.topicClient = topicClient ?? throw new ArgumentNullException(nameof(topicClient));
    }

    public async Task<bool> Handle(Foo request, CancellationToken cancellationToken)
    {
        if (request == null) throw new ArgumentNullException(nameof(request));

        try
        {
            await topicClient.SendAsync(request.Message);
            return true;
        }
        catch (Exception ex)
        {
            return false;
        }
    }
}

And that's that. We're done.

What happens however if we have to accommodate multiple topics? Do we inject multiple topics in the service/handler? Depending on your DI container are you even able to put multiple implementations of the same interface? Do we use a bunch of if statements and introduce cyclomatic complexity for deciding which topic client we want? These are all viable questions and solutions, but one elegant answer is hidden in the next section.

Introducing the ITopicClient resolver

In order to solve the problem at hand I want to minimize the complexity of dealing with multiple topics. I want the usage to be as simple as when using only one just like in the previous example. So how do we do that?

The first part of the problem is how to differentiate between the topic clients for each topic. We can do that by creating small wrapper classes around each client we have.

public class FooClient
{
    public readonly ITopicClient TopicClient;

    public FooClient(IOptions<ServiceBusOptions> options)
    {
        TopicClient = new TopicClient(options.Value.Endpoint, options.Value.FooTopic);
    }
}

public class BarClient
{
    public readonly ITopicClient TopicClient;

    public BarClient(IOptions<ServiceBusOptions> options)
    {
        TopicClient = new TopicClient(options.Value.Endpoint, options.Value.BarTopic);
    }
}

Now that we have our clients we are going to create a factory with the help of a delegate. Based on a key we'll return the exact topic client we want when we want it!

using Microsoft.Azure.ServiceBus;

namespace FooBarApp.Clients
{
    // The key can be a string, Type, enum, int - whatever you need really
    public delegate ITopicClient TopicResolver(string key);
}

The step where the magic happens is in our DI registration.

// This is the Azure Functions host builder
// In a web service you'll have the IServiceCollection
public static void Configure(IFunctionsHostBuilder builder)
{
    builder.Services.AddSingleton<FooClient>();
    builder.Services.AddSingleton<BarClient>();
    builder.Services.AddTransient<TopicResolver>(topicProvider => key =>
    {
        // The key can be a string, Type, enum, int - whatever you need really
        switch (key)
        {
            case "Foo":
                return topicProvider.GetService<FooClient>().TopicClient;
            case "Bar":
                return topicProvider.GetService<BarClient>().TopicClient;
            default:
                throw new KeyNotFoundException();
        }
    });
}

Let's me explain what's happening here. First we're registering our named clients as singletons per the explanations and reasoning given in the documentation.

Next we register our delegate as a transient service that will return one of the topic clients we already registered based on a key. For my usage I use a string key stored inside the message user properties. Your key can be anything like int, enum or a specific Type.

Last thing we have to do is to put our TopicResolver to work!

// Code below uses a MediatR handler as an example
public class MessageHandler : IRequestHandler<Message, bool>
{
    private readonly TopicResolver topicResolver;

    public MessageHandler(TopicResolver topicResolver)
    {
        this.topicResolver = topicResolver ?? throw new ArgumentNullException(nameof(topicResolver));
    }

    public async Task<bool> Handle(Message msg, CancellationToken cancellationToken)
    {
        if (msg == null) throw new ArgumentNullException(nameof(msg));

        try
        {
             // Use a field from the user properties of the message as key
             // The client we get can be either FooClient or BarClient based on the value
            var client = topicResolver(msg.UserProperties["Key"]);
            await client.SendAsync(msg);
            return true;
        }
        catch (Exception ex)
        {
            return false;
        }
    }
}

As a bonus here's how to test if the delegate is doing it's job and verifying that the right client is called.

using System;
using System.Threading.Tasks;
using AutoFixture;
using MediatR;
using Microsoft.Azure.ServiceBus;
using Moq;
using Shouldly;
using Xunit;

namespace FooBarApp.Tests
{
    public class MessageHandlerTests
    {
        private readonly Mock<TopicResolver> topicResolver;
        private readonly Fixture fixture;
        private IRequestHandler<Message, bool> sut;

        public MessageHandlerTests()
        {
            topicResolver = new Mock<TopicResolver>();
            fixture = new Fixture();
        }

        [Fact]
        public async Task Handle_ValidMessageRequest_Foo_SendsMessage()
        {
            var message = fixture.Create<Message>();
            // Best will be to create a customizer for different Messages
            message.UserProperties.Add("Key", "Foo");
            var client = new Mock<ITopicClient>();

            topicResolver.Setup(x => x.Invoke(It.IsAny<string>()))
                .Returns(client.Object);

            sut = new MessageHandler(topicResolver.Object);

            await sut.Handle(message, default);

            topicResolver.Verify(x => x("Foo"), Times.Once);
            client.Verify((x => x.SendAsync(It.IsAny<Message>())), Times.Once);
        }

        [Fact]
        public async Task Handle_ValidMessageRequest_Bar_SendsMessage()
        {
            var message = fixture.Create<Message>();
            // Best will be to create a customizer for different Messages
            message.UserProperties.Add("Key", "Bar");
            var client = new Mock<ITopicClient>();

            topicResolver.Setup(x => x.Invoke(It.IsAny<string>()))
                .Returns(client.Object);

            sut = new MessageHandler(topicResolver.Object);

            await sut.Handle(message, default);

            topicResolver.Verify(x => x("Bar"), Times.Once);
            client.Verify((x => x.SendAsync(It.IsAny<Message>())), Times.Once);
        }
    }
}

And that's it! With this setup now you can add multiple topic clients with ease! Hope you find this post useful. There are other ways to accomplish the same thing and solve this problem, but I like this approach because it's extremely clean, elegant and testable!

comments powered by Disqus