October 27, 2011 at 7:29 AM

Great news coming from the Azure AppFabric Service Bus team.  As from today, they have updated the Service Bus to include load balancing in the Relay Service capabilities.  It will now be possible to have up to 25 listeners registered to the same service endpoint.  When a message is then sent to that endpoint, only one of these endpoints will get the service call, based on a random decision on the relay cloud infrastructure.

  • First, we got an e-mail with the announcement through our Azure agreement
  • After that, a tweet of Valery Mizonov (from the AppFabric CAT team) appeared: #ServiceBus relay load balancing was one of your top requirements, dear community. We are done. What’s the next big thing you need?

 

How to enable this functionality.

No update of SDK is needed, everything is included in the AppFabric 1.5 SDK.

The only thing that needs to be done, is change your code to manage the number of concurrent listeners per endpoint and set the limits right. I expect that each of these listeners will count for a billable connection, so the limits are important indeed.

Scale out scenarios

Before this load balancing

Before this capability, a lot of issues were encountered when opening service bus endpoints on multi-instance computers.  In that case, the AddressAlreadyInUseException was thrown.

  • When a Web or Worker role opened a relay connection, but had more than one instance (to be high available), the second instance that tried to open that same endpoint got an exception, because the other instance already registered on it.
  • With BizTalk Server multi-server groups, the receive location with the Relay Binding only could be opened on one host instance.

New scenarios possible

Now it becomes possible to have real scaled out infrastructure and logic on premises too.  Customers who have geo-clustered data centers with redundant applications and logic can now expose that same level of availability to the cloud.

A concrete example

An concrete example of how we can leverage this, is the following. For one of our cloud scenarios, we need to do a lot of flat file parsing.  This we do on an on-premises hosted BizTalk Server.  Therefore, we created a generic web service that accepts the flat file with a message identifier and returns the parsed XML.

Until now, only one server could expose this functionality over the service bus.  But, in case of outage in our on premises data center, we would’ve been offline with this capability, making our entire solution not functioning for that amount of time.  With the new load balancing feature, we just have to expose the same service on a BizTalk Server in another data center (of our hosting partner) and we’re high available just like that.

 

So we can conclude that with this new feature, the following things have been shown:

  • The service bus keeps on being improved to be a world class reliable hybrid connectivity bus.
  • It is great that this feature is introduced, without a new release of SDK being required.  That is the power of the cloud.
  • Most importantly: on premises services can now be scaled better to leverage cloud power on premises too.

 

Sam Vanhoutte


June 5, 2011 at 11:19 PM

In the recent CTP of Windows Azure AppFabric, we can see a lot of new rich-messaging features.  In my previous blog post, I blogged about the publish-subscribe features.  Another interesting feature is the use of sessions when sending messages to a queue.  And that’s what this post is about.

We will demonstrate how to send large messages in an atomic batch of chunks over a queue.

The new queues

The new AppFabric Service Bus queues are much richer in functionality and features than the V1-message buffers or the Windows Azure storage queues.  A list of the biggest differences:

  • Reliable & durable storage (no limit on the TTL-time to live).
  • The maximum size of a queue is 1GB. (100Mb in the CTP version).
  • A message can measure up to 256KB.
  • Different messaging API’s are available: REST, .NET client and WCF bindings.
  • Transactional support in sending messages to a queue.
  • De-duplication of messages.
  • Deferring of messages.

As you can see, the limit of a message size is 256Kb, which is much more than the storage queue limit, but can still be not enough in certain cases.  And that is where sessions come in the picture.

Session concept

The concept of sessions allows receiving messages that belong to a certain logical group all by the same receiver.  This is done by specifying a SessionId on a message.  Receivers can listen on a specific session, or can lock the session for their usage on a first come first served basis.

Sessions can also be used to implement Request-Reply patterns over a queue.  But more on that can be expected in a future blog post.

 

Creating a session-enabled queue

By default, queues are created session-less.  The usage of sessions can only be used on queues that have session-support enabled.  The following code extract shows how to create a queue with sessions enabled.  (notice the usage of the ServiceBusNamespaceClient object.  This is the object you’ll always use in administrative operations.

sbClient.CreateQueue("qName", new QueueDescription { RequiresSession = true });

In this way, the queue will now be created and be able to handle sessions.

 

Sending messages to the queue

The only specific thing that is needed to send messages in a session to a queue, is to define the SessionId property on the specific message.  In this case, I also write a value to the message properties to indicate that the last message of a session is being sent.

var message = BrokeredMessage.CreateMessage(msgContent);
message.SessionId = "MySessionId"; 
message.MessageId = "MyMessageId"; 
if (isLastMessage) { message.Properties["LastMessageInSession"] = true; }

 

Receiving messages from a session

The specific thing on receiving messages in a session, is to use a SessionReceiver.  This receiver will make sure that all messages it receives will belong to the same session.  It is possible to listen on a specific session (by passing in the session name) and to specify a session timeout.

QueueClient queueClient = msgFactory.CreateQueueClient("LargeFileQueue");
SessionReceiver sessionReceiver = queueClient.AcceptSessionReceiver();
while (sessionReceiver.TryReceive(TimeSpan.FromSeconds(10), out receivedMessage))
{
	Console.WriteLine("Message received in session " + receivedMessage.SessionId);
	if (receivedMessage.Properties.ContainsKey("LastMessageInSession"))
	{     
		Console.WriteLine("Last message of session received");
		break; 
	} 
}

In the above mentioned sample, I am looping until I receive the ‘last message in the session’.

The sample Sending large messages in chunks to a session

In the sample that I upload, I send large messages in chunks to a session-enable queue.  Using multiple receivers, I am guaranteed that each file will only be received by exactly one receiver.

 

Functionality

The sample contains a sender where you can specify a directory where all messages in that directory will be picked up and submitted (in parallel) to a session-enabled queue, if the checkbox is checked.  Otherwise they will be sent to a session-less queue. (to demonstrate that messages will be received in random order).

The receiver is a console app that listens for incoming messages based on the session.  You can startup multiple receivers, to indicate that each receiver will receive messages to its own session.

sessioncapture

These are the most important design steps I took:

 

Sending messages

  • In the sample, I am using a Parallel.ForEach to loop over a bunch of files and send them in parallel.  This way, I am sure that messages won’t arrive in sequence on the queue.
  • Each large message is being sent in a transaction to the queue. 
  • On every message, I use the file name as the session id.
  • On the last message in the batch, I write the ‘LastMessageInSession’ property to make this visible to the receiver.

Receiving messages

  • I am using the PeekLock receive method to make sure I only remove the messages from the queue, when the full session has been received.
  • I receive the messages in a TransactionScope.  This makes sure that the session on itself is being rollbacked, in case of an exception.

The code of this sample can be found here.

Conclusions

This post was a new example of another great messaging feature that comes with the AppFabric Service Bus Enhancements.  We explored the sessions to group messages to the same receiver.  This scenario in this post demonstrated that we can split large messages in smaller chunks but still are able to handle them in one atomic batch.

Sam Vanhoutte


June 3, 2011 at 9:08 AM

Richard Seroter, recognized blogger, BizTalk MVP and book author, has a series of interviews with people in the “connected technology” space. Yesterday, he published its 31st interview, with me.

If you want to know more about the way we see the AppFabric Service Bus positioned and used, I definitely recommend you to have a look at it. And make sure to subscribe to the blog of Richard too; you won’t regret it.

Here it is: http://seroter.wordpress.com/2011/06/02/interview-series-four-questions-with-sam-vanhoutte/

Sam Vanhoutte


May 19, 2011 at 8:43 PM

Earlier this week, the first CTP of the Service Bus enhancements was released; the May CTP.  This CTP was announced at TechEd and on the AppFabric Team Blog.  One week prior to that, Codit has been accepted in the Technology Adoption Program (TAP) of the Windows Azure AppFabric team.  In this program, we will work closely together with the product team to build our cloud integration offering.

Being BizTalk experts since our inception, we are highly interested in this release of the AppFabric Service Bus, since this one provides real ‘hardcore’ messaging capabilities, just like BizTalk has.  These capabilities make asynchronous, loosely coupled scenarios possible on a cloud scale service bus.

This post dives into one of the most interesting features towards flexible routing and decoupling of messages: topics & subscriptions. 

 

Terminology

Queues

AppFabric service bus queues can be compared with the storage queues or with the message buffers of AppFabric V1, but they offer much more complex and rich functionality (sessions, dead letter queues: DLQ, deferring of messages, duplicate checking, transactions…)

Some enhancements over the azure storage queues:

  • Large message sizes (currently: 256KB – storage queues: 8KB)
  • Unlimited TTL (time to live – storage queues: max 7d)
  • Correlation, request/reply
  • Grouping of messages

Topics & subscriptions

Topics & subscriptions allow to implement real publish/subscribe patterns on the Windows Azure AppFabric platform.  A sender/publisher submits his messages to a topic.  (this means he does not need to know anything about any potential subscribers/destinations).  And linked with a topic, you can add multiple Subscriptions. 
A subscription will register on messages that are matching the filter for that subscription.  A filter is defined as a SQL92 or LINQ expression and that filter is matched against the properties of a message (BrokeredMessage).  If the filter is matched, a message will be marked for delivery on that subscription.
A new concept is the use of filter actions.  These can be configured on a subscription to update the properties of a message. 

The following schema shows an overview of topics and subscriptions. 

Subscriptions

Someone familiar with BizTalk will immediately get this picture.  I would make the following analogy with BizTalk:

  • AppFabric Topic = BizTalk Receive Agent (Receive Port)
  • AppFabric Subscription = BizTalk Subscription
  • AppFabric Filter Action= A very lightweight BizTalk pipeline (only influencing message properties)
  • Message properties = Message Context

Implementing a pub/sub sample.

In this blog post, I am implementing a scenario where a client application (WPF app – the publisher block in the above diagram) is sending messages to a topic.  These messages are objects of a custom class: Material.  On these messages, the client application adds some properties.  These properties will be used to match the filters against.  The client allows to send multiple messages during a long period.  This will allow us to test creating subscriptions on the fly.

At the other end of the Service Bus, I will have multiple receivers that each either listen on an existing subscription, or that will create a subscription on the fly and receive from it.  I can start up different receivers and messages will be delivered to them, as long as the message properties match the filter of the subscription.

The code

 

Registering the Service Bus namespace

The blog post by David Ingham of the AppFabric team shows how you can register on the AppFabric labs portal and how you can download the SDK. You will need the name of the service bus namespace, an issuer name (owner is default, but should not be used in production) and the corresponding issuer secret key.

Creating the client application

  • Create a new WPF Forms application.
  • Add the Microsoft.ServiceBus and the Microsoft.ServiceBus.Messaging assemblies to the project.  Make sure you use the v2.0.0.0, because the v1.0.0.0 still references the old (?) version.
  • I designed the client form like displayed further down this article.
  • When clicking the submit button, I check if an interval is required between sending messages.  If that is the case, I send messages in a for loop with a Thread.Sleep between each send.  If there is no interval needed, I do a Parallel.For, so that all messages are send in parallel to the topic.
  • When staring up the application, I initialize two important objects: the ServiceBusNamespaceClient (which will be used for management of the namespace) and the MessagingFactory (which will be used to send and receive messages).
             private void Initialize()        
             {
                          // Read issuer name & secret
                          string issuerName = ConfigurationManager.AppSettings["issuerName"];
                          string issuerKey = ConfigurationManager.AppSettings["issuerKey"];
                          string sbNamespace = ConfigurationManager.AppSettings["sbnamespace"];

                          // Create credentials
                          SharedSecretCredential sbCredentials = TransportClientCredentialBase.CreateSharedSecretCredential(issuerName, issuerKey);

                          // Namespace client
                          Uri sbAddress = ServiceBusEnvironment.CreateServiceUri("sb", sbNamespace, "");
                          sbClient = new ServiceBusNamespaceClient(sbAddress, sbCredentials);
                          msgFactory = MessagingFactory.Create(sbAddress, sbCredentials);
             }

 

  • The implementation for sending the message is like this.  The object that is being sent is of type Material.  It will be serialized, using the standard Serializer.  I create a TopicClient for that specific topic and in that client, I create a Sender.
            private void SubmitToTopic(Material material, string action, int sequence)
            {
                        TopicClient topicClient = null;
                        lock (msgFactory)
                        {
                                    topicClient = msgFactory.CreateTopicClient(TOPICNAME);
                        }           
                        var sender = topicClient.CreateSender();
                        BrokeredMessage message = BrokeredMessage.CreateMessage(material);
                        message.Properties["Type"] = material.MaterialType;
                        message.Properties["Number"] = material.Number;
                        message.Properties["Region"] = material.Region;
                        message.Properties["Action"] = action;
                        message.Properties["Sequence"] = sequence;
                        sender.Send(message);
            }

 

Creating the subscribers

  • In the subscribing application, I also add the initialization code as in the client app.
  • After collecting some information from the user, through the console, I start listening on the subscription, using the following code
var subscriptionClient = msgFactory.CreateSubscriptionClient(topicName, subscriptionName);
var receiver = subscriptionClient.CreateReceiver(ReceiveMode.ReceiveAndDelete);
bool dontstop = true;
while (dontstop)
{
            BrokeredMessage message = null;
            while (receiver.TryReceive(TimeSpan.FromSeconds(10), out message))
            {
                        var material = message.GetBody<Material>();
                        Console.WriteLine("Material received from subscription " + subscriptionName);
                        Console.WriteLine(material);
             }
            Console.WriteLine("If you want to continue polling, enter X");
            dontstop = Console.ReadLine().ToLower() == "x";
}
  • In the subscribing application, I also give the user the ability to create a subscription and immediately start listening on it. Therefore, I am executing the following code, prior to the above mentioned code.  I ask the user to enter a SQL expression that I am using to define the Filter for the Subscription.

Console.WriteLine("Please enter your new subscription name.");
string subscriptionName = Console.ReadLine();
Console.WriteLine("Please enter your filter expression.");
string filter = Console.ReadLine();
sbClient.GetTopic(topicName).AddSubscription(subscriptionName, new SqlFilterExpression(filter));

Creating the queues and topics

At this moment there is no out of the box tooling to create queues, topics and subscriptions.  Therefore, I wrote a simple WPF application to list, created, delete these objects in an AppFabric namespace. 
explorer

The application at work

The following screenshots show a simple test where I am sending a message with the Region set to EU (left) and I am listening on the EUSubscription with filter (Region = ‘EU’) on the right.

ClientAppsubscriber

 

Filters

The filter expressions that can be applied are very rich.  A SQL syntax is being used to create these filters.  (the underlying store of the service bus is SQL Azure).  I tried out different types of filters.
These filters have been successfully tested as SqlFilterExpressions:

  • Region = ‘EU’
  • Sequence > 3 AND Sequence < 7
  • Number LIKE ‘LG%’
  • Region IN (‘US’, ‘EMEA’, ‘EU’)

 

Conclusions

This first release of the Service Bus Enhancements is very promising.  This will form the basis for the rest of the entire middleware platform of the future.  This release of the Service Bus provides real rich messaging capabilities, allowing to build decoupled, asynchronous connections.

Things we might (not sure yet) expect over time are:

  • Pipelines (like in BizTalk) on topics, queues and subscriptions.
  • Advanced Filter Actions
  • Content based filtering (through Filter Actions?)
  • Tooling

Sam Vanhoutte, Codit


May 15, 2011 at 11:48 AM

 

In this post I will highlight what the different pain points are when hosting a WCF Service in Windows Azure with Service Bus Endpoints (e.g. using HttpRelayBinding)

 

Including Microsoft.ServiceBus in your deployment

As many people know already, the Windows Azure AppFabric SDK is not installed on Windows Azure. Which means that you won’t have the Microsoft.ServiceBus.dll once you deploy your solution to Windows Azure. This first problem is easily fixed by setting the ‘Copy Local’ property of this reference to true.

HttpRelayBinding error

Now chances are you have a web.config to accompany your svc file that uses one of the relay bindings that come with the AppFabric SDK.

<service name="test">
          <endpoint name="PublishServiceEndpoint"
                      contract="WCFServiceWebRole1.IService1"
                      binding="basicHttpRelayBinding" />
</service>

Once you deploy and navigate to your svc you will get an error saying ….

image

Of course you can modify your web.config to include the necessary extensions manually, as specified in this post, but it won’t work anymore on your dev machine, since installing the AppFabric SDK already updated your machine.config with all the extensions. I wanted a solution where the machine.config was updated on Windows Azure prior to executing my code.

Using Startup Tasks

Luckily the AppFabric SDK comes with a tool called RelayConfigurationInstaller.exe which installs the Machine.config settings necessary for the Service Bus bindings to be supported in App.config. Now this just screams Startup Task.

So I went ahead and created a new solution and added the RelayConfigurationInstaller.exe to the project along with a RelayConfigurationInstaller.exe.config file which looks like this:

?xml version ="1.0"?>
<configuration>
          <startup>
                    <requiredRuntime safemode="true" imageVersion="v4.0.30319"           
                              version="v4.0.30319"/>
          </startup>
</configuration>

I also added a Startup.cmd file which simply calls RelayConfigurationInstaller.exe /i. Then went into the ServiceDefinition.csdef and added the Startup Task with the call to Startup.cmd. This is what my project looks like so far:

image

Deploy…waiting 15 minutes… and no difference, I still get the same error, what went wrong?

Installing Microsoft.ServiceBus in the GAC

I found out that executing my startup task actually resulted in an error:

image

Which means that the Microsoft.ServiceBus.dll should be installed in the GAC prior to calling the executable.

Now this presents a problem, there is no Windows SDK installed on Windows Azure, so we can’t execute a gacutil to install the assembly. However I discovered that you can use the System.EnterpriseServices library to publish a dll into the GAC. Time to include powershell in my startup task.

I’ve created this ps script that installs the Microsoft.ServiceBus.dll in the GAC and added it to my project:

BEGIN {
         $ErrorActionPreference = "Stop"
 
         if ( $null -eq ([AppDomain]::CurrentDomain.GetAssemblies() |? { $_.FullName -eq "System.EnterpriseServices, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a" }) ) 
         {
                  [System.Reflection.Assembly]::Load("System.EnterpriseServices, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a") | Out-Null 
         }

         $publish = New-Object System.EnterpriseServices.Internal.Publish
}
PROCESS {
         $dir = [Environment]::CurrentDirectory=(Get-Location -PSProvider FileSystem).ProviderPath
         $assembly = Join-Path ($dir) "Microsoft.ServiceBus.dll"

         if ( -not (Test-Path $assembly -type Leaf) ) 
         {
                  throw "The assembly '$assembly' does not exist."
         }
         
         if ( [System.Reflection.Assembly]::LoadFile($assembly).GetName().GetPublicKey().Length -eq 0 ) 
         {
                  throw "The assembly '$assembly' must be strongly signed."
         }
         Write-Output "Installing: $assembly"
         $publish.GacInstall($assembly) 
}

next I’ve modified the startup.cmd file to first call the ps script and then call the RelayConfigurationInstaller.exe to modify the machine.config with the needed bindings. This is what the final cmd file looks like:

 1: powershell.exe Set-ExecutionPolicy RemoteSigned -Force
 2: powershell.exe .\Startup\gacutil.ps1
 3: Startup\RelayConfigurationInstaller.exe /i


Caveats

  1. Make sure you’ve selected osFamily=”2” in your ServiceConfiguration.cscfg file, since this gives you Windows Server 2008 R2, needed for caveat 2.
  2. If you want to execute powershell scripts, you have to set the ExecutionPolicy to RemoteSigned (see line 1 of startup.cmd)

Wouter Seye, Codit


November 17, 2010 at 8:07 AM

A common requirement in many development scenarios is caching.  In BizTalk implementations, this can be the case, mainly for performance reasons.

As a test I wrote 2 pipelinecomponents that handle 2 common issues with caching inside BizTalk.  These sample components are performing similar tasks to two components of the Codit implementation framework.  This is a framework we use at a lot of our customers.

  1. Code table mappings : limit access to SQL database to perform Code table mappings.  In our framework, this is similar to the Transco component.
  2. Duplicate message : stop messages that come into BizTalk multiple times within a specific timeframe.  In our framework, this is similar to the Double checker component.

CacheHelper

Because both pipelinecomponents use the AppFabric cache, I wrote a small class that takes care of this.

public class CacheHelper : IDisposable
    {
        private string _cacheName = "default";
        private string _region;

        public CacheHelper(string region)
        {
            _region = region;
            CreateRegion(_cacheName,region);
        }

        /// 
        /// Creates a Region in a specified cache.
        /// 
        /// Cache name
        /// Region name
        private void CreateRegion(string cacheName,string region)
        {
            DataCacheFactory dcf=ConnectToCache();

            if (dcf != null)
            {
                DataCache dc=dcf.GetCache(cacheName);
                dc.CreateRegion(region);
            }
        }

        /// 
        /// Connect to a Cache server
        /// 
        /// The Datacache
        private DataCacheFactory ConnectToCache()
        {
            //This can also be kept in a config file
            var config = new DataCacheFactoryConfiguration();
            config.SecurityProperties = new DataCacheSecurity(DataCacheSecurityMode.None, DataCacheProtectionLevel.None);
            config.Servers = new List
            {
                new DataCacheServerEndpoint(Environment.MachineName, 22233)
            };

            return new DataCacheFactory(config);
        }

        ~CacheHelper()
        {
            Dispose(false);
        }

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected virtual void Dispose(bool disposing)
        {
        }

        /// 
        /// Gets a value from the cache in the specified region (class level)
        /// 
        /// Key linked to the data
        /// The found data. If null --> not found in the cache
        public string GetLookUpCacheData(string keyValue)
        {
            DataCacheFactory dcf = ConnectToCache();

            var cache = dcf.GetCache(_cacheName);
            string data = cache.Get(keyValue,_region) as string;

            return data;
        }

        /// 
        /// Store a value in the cache
        /// 
        /// Key
        /// Data
        public void StoreLookUpCacheData(string keyValue, object value)
        {
            DataCacheFactory dcf = ConnectToCache();

            var cache = dcf.GetCache(_cacheName);
            cache.Add(keyValue, value, _region);
        }

        /// 
        /// Stores a value in the cache for a specified amount of time
        /// 
        /// Key
        /// Data
        /// Time to keep the data in the cache
        public void StoreLookUpCacheData(string keyValue, object value,TimeSpan expires)
        {
            DataCacheFactory dcf = ConnectToCache();

            var cache = dcf.GetCache(_cacheName);
            cache.Add(keyValue, value, expires, _region);
        }        
    }

This is a very simple implementation that will store values in the default cache and in a specified region.

CodeTable Mapper

Codetable mapping is a very common scenario in BizTalk implementations. In my example we will be translating countrycodes to the country name.
The values are stored in a SQL table. But every time we get a value, we are going to save it to the AppFabric cache.
When we want to get the same value again, we are not going to the database but we will get the stored value from the AppFabric Cache.

 

public IBaseMessage Execute(IPipelineContext pContext, IBaseMessage pInMsg)
        {
            // Set variables
            biztalkMessage = pInMsg;
            XmlReader reader = XmlReader.Create(pInMsg.BodyPart.Data);
            XPathCollection xpaths = new XPathCollection();
            //For this example we are going to use 1 xpath expression
            xpaths.Add(this.XPath);

            ValueMutator vm = new ValueMutator(handleXpathFound);
            pInMsg.BodyPart.Data = new XPathMutatorStream(reader, xpaths, vm);
            return pInMsg;
        }

        private void handleXpathFound(int matchIdx, XPathExpression matchExpr, string origVal, ref string finalVal)
        {
            CacheHelper ch = new CacheHelper("Countries");
            string data = ch.GetLookUpCacheData(origVal);
            if (data == null)
            {
                finalVal = GetCountryFromDB(origVal);
                ch.StoreLookUpCacheData(origVal, finalVal);
            }
            else
                finalVal = ch.GetLookUpCacheData(origVal);
        }

        private string GetCountryFromDB(string countryCode)
        {
            string country = string.Empty;
            SqlConnection conn = null;
            SqlCommand comm = null;

            try
            {
                //Connect to look up database and retrieve the names of the products.
                conn = new SqlConnection("Data Source=(local);Initial Catalog=CacheTest;Integrated Security=SSPI;");
                conn.Open();

                comm = new SqlCommand();
                comm.Connection = conn;
                comm.CommandText = string.Format("SELECT Country FROM Countries WHERE CountryCode='{0}'", countryCode);
                comm.CommandType = CommandType.Text;

                country = (string)comm.ExecuteScalar();
                if(string.IsNullOrEmpty(country))
                    throw new Exception(string.Format("No country found for code {0}",countryCode));
            }
            catch (Exception e)
            {
                throw new Exception(e.Message + e.StackTrace);
            }
            finally
            {
                comm.Dispose();
                conn.Close();
                conn.Dispose();
            }


            return country;
        }

 

Duplicate Message Checker

As a sample scenario I took one I read about a few months ago. BizTalk had to stop messages that come in multiple times within 2 minutes.
So if there are more then 2 minutes between the messages, they should continue.

Normally this would involve a SQL table to store some information and some job to do the cleanup of this table.
But for my example I use AppFabric cache. There you have the option to store something in the cache for a certain timespan.
It is automatically deleted after this period.

 

public Microsoft.BizTalk.Message.Interop.IBaseMessage Execute(IPipelineContext pContext, Microsoft.BizTalk.Message.Interop.IBaseMessage pInMsg)
        {
            //Create hash
            VirtualStream input = new VirtualStream(pInMsg.BodyPart.GetOriginalDataStream());
            MD5 md5 = MD5.Create();
            byte[] hash = md5.ComputeHash(input);
            string hashString = Convert.ToBase64String(hash);

            //check Cache
            CacheHelper ch = new CacheHelper("DuplicateMessages");
            string date=ch.GetLookUpCacheData(hashString);
            if (string.IsNullOrEmpty(date))
            {
                //If not in cache yet, store it --> lifetime is 2 minutes
                ch.StoreLookUpCacheData(hashString, DateTime.Now.ToString(), new TimeSpan(0, 2, 0));
            }
            else
            {
                //Throw error
                throw new ApplicationException(string.Format("Duplicate Message. Already received at {0}",date));
            }

            //Put stream back to beginning
            input.Seek(0, SeekOrigin.Begin);
            return pInMsg;
        }

This makes the implementation very easy and you will not need a SQL table or anything to store the information.
You could say that you can do this with a custom caching solution as well. But what about HA environments with multiple BizTalk servers?
AppFabric is a distributed cache. So it doesn't mather on which server the message is processed. It will end up in the same cache and will be accessible on all the servers.

Conclusion

As you see, AppFabric caching has some advantages in BizTalk as well. The API is very easy to use and I got this to work quite quickly.

Tim D'haeyer, CODit

 

Posted in: AppFabric | BizTalk

Tags:


September 13, 2010 at 11:50 PM

Since I am coming from the BizTalk world, I feel very comfortable in implementing Workflows, using the Microsoft Workflow 4.0 runtime.  The engine comes with a lot of similar activities and capabilities as the XLANG engine provides us for more than 6 years already. 

One of the typical patterns that involve long-running processes is correlation and more in specific; convoys.  Each time different messages or calls have to be related together in order to achieve something, we are using convoys. Convoy processing is a typical design pattern.

There are two types of convoys, which I will demonstrate both:

  • Sequential convoy - Items that are in a order, one after another (often messages of the same type)
  • Parallel convoy - Items that are received in any order, but that all need to be received, before processing continues

The main difference in the two types of convoys is the order of receipt of the items.

This blog post shows two scenarios in an online travel website, where users can rate hotels and where hotels can be added, by consuming workflow services from any mash-up application.

The source code of my example can be found here.

Correlation

The concept of correlation is when various messages or calls need to be linked to the same instance of a long-running workflow.  These calls are typically related, based on content in the message, or based on context of the call (endpoint, for example).

An example would be where a workflow creates a user task on sharepoint and waits until the task is completed.  Multiple instances of that workflow can run in the same time and tasks can be completed in a different order than they were created.  Therefore, we need to make sure that the task completion event is sent to the correct instance of the workflow that created the task.  This is called correlation.

Setting up correlation in workflow is done by defining a variable of type System.ServiceModel.Activities.CorrelationHandle This variable needs to be scoped to make sure it is available for all receive and send activities that need to participate in the correlation.

Correlations are initialized or followed on the messaging activities.

Sequential convoy

The first scenario is a workflow service that processes the user rates that is given for a certain hotel.  A requirement for the web site is that the rating for a hotel should only be published when at least 5 user ratings have been received for that specific hotel.  This is needed to prevent influences of one specific rating.

This is why we implement a sequential convoy.

 

Receive web service operation and correlation

For this, we have a sequential workflow that exposes a web operation: SubmitHotelReview.  This operation accepts a message of type HotelReview (a datacontract class with some basic parameters).   All calls to this workflow service will be correlated to the same instance, based on the hotel ID. 

The most important settings of the Receive activity are shown in the following table.

Correlations  
CorelatesOn Xpath query key 1 : sm:body()/xg0:HotelReview/xg0:HotelId
CorrelatesWith hotelCorrelationHandle
CorrelationInitializers no specific changes here (only the request/response handle is initialized here)
Misc  
OperationName SubmitHotelReview
ServiceContractName {http://blog.codit.eu/workflow/convoys/}IHotelReviewService
CanCreateInstance Checked (True)

A common mistake is to also specify an entry in the CorrelationInitializers property to initialize the content-based correlation handle (hotel id).  But doing this, would result in the following exception: An instance key of value {Guid}' already exists. This could be because there are multiple MessageQuerySets defined that evaluate to the same CorrelationKey.  The reason for this is that the correlationkey would be initialized twice, which results in this exception.

Looping condition for sequential convoy

To have the sequential convoy implemented, we need to correlate all receives together and therefore we can add a loop, using the DoWhile activity.  Here we just add the receive activity in the 'Do-body’ and define a condition, making sure we continue processing (publishing the hotel rates), once five reviews have been received.

The entire workflow service is displayed in the following screenshot. (click to enlarge)

Sequential

 

Parallel convoy

The second scenario is a workflow service that adds new hotel information to the web site.  Data for these hotels is coming from two different sources.  One consumer of our workflow service will pas in the general hotel information, where another consumer application will provide pricing information for a hotel.  The sequence of these events is unknown and can differ from one hotel to the other. 

This is why we implement a parallel convoy.

 

Receive web service operations and correlation

For this, we have a sequential workflow that exposes two web operations: SubmitGeneralInformation and SubmitPricingInformation.  Both receive activities correlate on the same correlation handle that is correlating on the HotelId.

The settings of the receive activities are similar to the settings listed in the sequential convoy sample.

Parallel activity for multiple receives

To make sure we can accept both pricing and general information, we add the receives in a parallel shape, which will make sure we will only continue the flow if all branches have been successfully completed (and thus all messages have been received).

The entire workflow service is displayed in the following screenshot. (click to enlarge)

image

 

Conclusion

Implementing convoys in Workflow 4.0 is rather straightforward and does not require very complex tricks or configuration.  The magic is all in the correlation handling.

In comparison with BizTalk, I really like the sequential convoy implementation, since we now only have the need for one single Receive activity, where in BizTalk the initializing and the following receives need to be two different shapes.

Something I curious about is if there would also be the concept of zombie messages that are typical in the convoy scenarios.  (when a message is being received at the exact same time as the looping condition of the sequential convoy completes.)

Sam Vanhoutte, CODit


By sam
September 10, 2010 at 9:17 PM

I was building a test solution for my next blog post and I started to create a new workflow service in Visual Studio.  But apparently the designer came up with an exception for me:

Workflow Designer encountered problems with your document
Please check the document for invalid content, namespaces, references or reference loops.

'\' is an unexpected token. The expected token is ';'. Line 3, position 99.

After opening the XAML view, I noticed the full path of the xamlx file was added to the xml in the sad:XamlDebuggerXmlReader.FileName attribute of the sequence (main element).  And there it appeared that the fileName was not well-encoded, resulting in invalid XML, because of the usage of '&' sign.  (I added my blog tests in a folder with name R&D …)

The solution to this issue was to manually escape the R&D with R&amp;D string.

Off course, one could argue why the ‘source code’ or ‘model’ of a Workflow service needs to maintain a reference to a physical file on a developers machine (in source safe, everyone on the same project can check out to different locations…)

Just wanted to share this one.

Posted in: AppFabric | Workflow

Tags: , ,


June 21, 2010 at 2:26 PM

If you're interested in all the content that was prensented at TechEd US a couple of weeks ago the place to go to is http://www.msteched.com/2010

Certainly one to watch is Integrating LoB Systems (SAP, Mainframe) with the Cloud Using Microsoft BizTalk Server and the Windows Azure AppFabric http://www.msteched.com/2010/NorthAmerica/ASI305

And as we had last week Jon Flanders on an AppFabric training here with CODit in Brussels we can verify if AppFabric is what he promised us a few years ago :-) Framework and Microsoft BizTalk Best Practices with an Eye Toward Oslo http://www.msteched.com/2008/NorthAmerica/techtalk_41

Posted in: AppFabric | Azure | BizTalk

Tags:


By sam
June 17, 2010 at 9:37 AM

Since the latest release of the Windows Azure Development Kit (June), Azure provides support for .NET 4 applications, which is a real important step from the point of adoption, I believe.

 

The first thing we wanted to try was to host a XAMLX Workflow Service in a web role on Azure.

Example workflow service

I created a standard Workflow Service that accepted two parameters, on  one operation (Multiply) and returns the result of the multiplication to the client.  This service was called Calc.xamlx.

 

These are the steps I followed to make my service available.  I added the exceptions, because that’s typically what users will search for J

 

Enable visualization of errors

 

Standard behavior of web roles, is to hide the exception for the users, browsing to a web page.  Therefore, it is advised to add the following in the system.web section of the web.config:

 

<customErrors mode="Off"/>

 

Configure handler for Workflow Services

 

The typical error one would get, when just adding the Workflow Service to your web role and trying to browse it, is the following:

 

The requested url '/calc.xamlx' hosts a XAML document with root element type 'System.ServiceModel.Activities.WorkflowService'; but no http handler is configured for this root element type. Please check the configuration file and make sure that the 'system.xaml.hosting/httpHandlers' section exists and a http handler is configured for root element type 'System.ServiceModel.Activities.WorkflowService'.

 

We need to specify the correct HTTP handler that needs to be used for XAMLX files.  Therefore, we link the workflow services and activities to the correct Workflow Service ServiceModel handler.

 

To solve this, the following needs to be added to the web.config.

 

<configuration>
       <configSections>
             <sectionGroup name="system.xaml.hosting" type="System.Xaml.Hosting.Configuration.XamlHostingSectionGroup, System.Xaml.Hosting, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35">
                   <section name="httpHandlers" type="System.Xaml.Hosting.Configuration.XamlHostingSection, System.Xaml.Hosting, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
             </sectionGroup>
       </configSections>
       <!-- Removed other sections for clarity -–>
       <system.xaml.hosting>
             <httpHandlers>
                    <add xamlRootElementType="System.ServiceModel.Activities.WorkflowService, System.ServiceModel.Activities, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35" httpHandlerType="System.ServiceModel.Activities.Activation.ServiceModelActivitiesActivationHandlerAsync, System.ServiceModel.Activation, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
                    <add xamlRootElementType="System.Activities.Activity, System.Activities, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35" httpHandlerType="System.ServiceModel.Activities.Activation.ServiceModelActivitiesActivationHandlerAsync, System.ServiceModel.Activation, Version=4.0.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35"/>
             </httpHandlers>
       </system.xaml.hosting>
</configuration>

 


Enabling metadata exposure of the workflow service

 

To enable consumers of our workflow service to generate their proxy classes, we want to expose the WSDL metadata of the service, by providing the following configuration section in the web.config.

 

Notice the simplicity of configuration, compared to WCF3.5.  Making use of the default behaviors, allows us to only specify what we want to override.

 

<system.serviceModel>
       <serviceHostingEnvironment multipleSiteBindingsEnabled="true" >
             <serviceActivations>
                    <add relativeAddress="~/Calc.xamlx" service="Calc.xamlx"  factory="System.ServiceModel.Activities.Activation.WorkflowServiceHostFactory"/>
  
          </serviceActivations>
       </serviceHostingEnvironment>
       <behaviors>
             <serviceBehaviors>
                    <behavior>
                           <serviceMetadata httpGetEnabled="true"/>
                    </behavior>
             </serviceBehaviors>
       </behaviors>
</system.serviceModel>

 

Testing and publishing

After successfully testing locally in the Azure Development Fabric, I uploaded and deployed the package, using the new Tools in Visual Studio to my Azure test account.

Sam Vanhoutte, CODit

Posted in: AppFabric | Azure | WCF | Workflow

Tags: