Using AppFabric topics & subscriptions to implement cloud-scale publish-subscribe

May 19, 2011 at 8:43 PM

Earlier this week, the first CTP of the Service Bus enhancements was released; the May CTP.  This CTP was announced at TechEd and on the AppFabric Team Blog.  One week prior to that, Codit has been accepted in the Technology Adoption Program (TAP) of the Windows Azure AppFabric team.  In this program, we will work closely together with the product team to build our cloud integration offering.

Being BizTalk experts since our inception, we are highly interested in this release of the AppFabric Service Bus, since this one provides real ‘hardcore’ messaging capabilities, just like BizTalk has.  These capabilities make asynchronous, loosely coupled scenarios possible on a cloud scale service bus.

This post dives into one of the most interesting features towards flexible routing and decoupling of messages: topics & subscriptions. 




AppFabric service bus queues can be compared with the storage queues or with the message buffers of AppFabric V1, but they offer much more complex and rich functionality (sessions, dead letter queues: DLQ, deferring of messages, duplicate checking, transactions…)

Some enhancements over the azure storage queues:

  • Large message sizes (currently: 256KB – storage queues: 8KB)
  • Unlimited TTL (time to live – storage queues: max 7d)
  • Correlation, request/reply
  • Grouping of messages

Topics & subscriptions

Topics & subscriptions allow to implement real publish/subscribe patterns on the Windows Azure AppFabric platform.  A sender/publisher submits his messages to a topic.  (this means he does not need to know anything about any potential subscribers/destinations).  And linked with a topic, you can add multiple Subscriptions. 
A subscription will register on messages that are matching the filter for that subscription.  A filter is defined as a SQL92 or LINQ expression and that filter is matched against the properties of a message (BrokeredMessage).  If the filter is matched, a message will be marked for delivery on that subscription.
A new concept is the use of filter actions.  These can be configured on a subscription to update the properties of a message. 

The following schema shows an overview of topics and subscriptions. 


Someone familiar with BizTalk will immediately get this picture.  I would make the following analogy with BizTalk:

  • AppFabric Topic = BizTalk Receive Agent (Receive Port)
  • AppFabric Subscription = BizTalk Subscription
  • AppFabric Filter Action= A very lightweight BizTalk pipeline (only influencing message properties)
  • Message properties = Message Context

Implementing a pub/sub sample.

In this blog post, I am implementing a scenario where a client application (WPF app – the publisher block in the above diagram) is sending messages to a topic.  These messages are objects of a custom class: Material.  On these messages, the client application adds some properties.  These properties will be used to match the filters against.  The client allows to send multiple messages during a long period.  This will allow us to test creating subscriptions on the fly.

At the other end of the Service Bus, I will have multiple receivers that each either listen on an existing subscription, or that will create a subscription on the fly and receive from it.  I can start up different receivers and messages will be delivered to them, as long as the message properties match the filter of the subscription.

The code


Registering the Service Bus namespace

The blog post by David Ingham of the AppFabric team shows how you can register on the AppFabric labs portal and how you can download the SDK. You will need the name of the service bus namespace, an issuer name (owner is default, but should not be used in production) and the corresponding issuer secret key.

Creating the client application

  • Create a new WPF Forms application.
  • Add the Microsoft.ServiceBus and the Microsoft.ServiceBus.Messaging assemblies to the project.  Make sure you use the v2.0.0.0, because the v1.0.0.0 still references the old (?) version.
  • I designed the client form like displayed further down this article.
  • When clicking the submit button, I check if an interval is required between sending messages.  If that is the case, I send messages in a for loop with a Thread.Sleep between each send.  If there is no interval needed, I do a Parallel.For, so that all messages are send in parallel to the topic.
  • When staring up the application, I initialize two important objects: the ServiceBusNamespaceClient (which will be used for management of the namespace) and the MessagingFactory (which will be used to send and receive messages).
             private void Initialize()        
                          // Read issuer name & secret
                          string issuerName = ConfigurationManager.AppSettings["issuerName"];
                          string issuerKey = ConfigurationManager.AppSettings["issuerKey"];
                          string sbNamespace = ConfigurationManager.AppSettings["sbnamespace"];

                          // Create credentials
                          SharedSecretCredential sbCredentials = TransportClientCredentialBase.CreateSharedSecretCredential(issuerName, issuerKey);

                          // Namespace client
                          Uri sbAddress = ServiceBusEnvironment.CreateServiceUri("sb", sbNamespace, "");
                          sbClient = new ServiceBusNamespaceClient(sbAddress, sbCredentials);
                          msgFactory = MessagingFactory.Create(sbAddress, sbCredentials);


  • The implementation for sending the message is like this.  The object that is being sent is of type Material.  It will be serialized, using the standard Serializer.  I create a TopicClient for that specific topic and in that client, I create a Sender.
            private void SubmitToTopic(Material material, string action, int sequence)
                        TopicClient topicClient = null;
                        lock (msgFactory)
                                    topicClient = msgFactory.CreateTopicClient(TOPICNAME);
                        var sender = topicClient.CreateSender();
                        BrokeredMessage message = BrokeredMessage.CreateMessage(material);
                        message.Properties["Type"] = material.MaterialType;
                        message.Properties["Number"] = material.Number;
                        message.Properties["Region"] = material.Region;
                        message.Properties["Action"] = action;
                        message.Properties["Sequence"] = sequence;


Creating the subscribers

  • In the subscribing application, I also add the initialization code as in the client app.
  • After collecting some information from the user, through the console, I start listening on the subscription, using the following code
var subscriptionClient = msgFactory.CreateSubscriptionClient(topicName, subscriptionName);
var receiver = subscriptionClient.CreateReceiver(ReceiveMode.ReceiveAndDelete);
bool dontstop = true;
while (dontstop)
            BrokeredMessage message = null;
            while (receiver.TryReceive(TimeSpan.FromSeconds(10), out message))
                        var material = message.GetBody<Material>();
                        Console.WriteLine("Material received from subscription " + subscriptionName);
            Console.WriteLine("If you want to continue polling, enter X");
            dontstop = Console.ReadLine().ToLower() == "x";
  • In the subscribing application, I also give the user the ability to create a subscription and immediately start listening on it. Therefore, I am executing the following code, prior to the above mentioned code.  I ask the user to enter a SQL expression that I am using to define the Filter for the Subscription.

Console.WriteLine("Please enter your new subscription name.");
string subscriptionName = Console.ReadLine();
Console.WriteLine("Please enter your filter expression.");
string filter = Console.ReadLine();
sbClient.GetTopic(topicName).AddSubscription(subscriptionName, new SqlFilterExpression(filter));

Creating the queues and topics

At this moment there is no out of the box tooling to create queues, topics and subscriptions.  Therefore, I wrote a simple WPF application to list, created, delete these objects in an AppFabric namespace. 

The application at work

The following screenshots show a simple test where I am sending a message with the Region set to EU (left) and I am listening on the EUSubscription with filter (Region = ‘EU’) on the right.




The filter expressions that can be applied are very rich.  A SQL syntax is being used to create these filters.  (the underlying store of the service bus is SQL Azure).  I tried out different types of filters.
These filters have been successfully tested as SqlFilterExpressions:

  • Region = ‘EU’
  • Sequence > 3 AND Sequence < 7
  • Number LIKE ‘LG%’
  • Region IN (‘US’, ‘EMEA’, ‘EU’)



This first release of the Service Bus Enhancements is very promising.  This will form the basis for the rest of the entire middleware platform of the future.  This release of the Service Bus provides real rich messaging capabilities, allowing to build decoupled, asynchronous connections.

Things we might (not sure yet) expect over time are:

  • Pipelines (like in BizTalk) on topics, queues and subscriptions.
  • Advanced Filter Actions
  • Content based filtering (through Filter Actions?)
  • Tooling

Sam Vanhoutte, Codit

Pingbacks and trackbacks (2)+

Add comment

  Country flag

  • Comment
  • Preview