August 16, 2012 at 5:13 PM

With the introduction of Linq in .NET 3.5, the task of querying data became a lot simpler. In BizTalk, we often work with xml messages, and linq also supports querying over xml documents using the XDocument class.

In a pipeline component, we can pass the incoming message stream into an XDocument. This gives us all the flexibility of LINQ, but with the consequence of having to load the complete message in-memory. When working with large messages, this is probably not the best option, so we will need an alternative solution. 

 

In this blogpost we will focus on reading incoming messages with linq. When you want to compose/create a new xml message using linq in a streaming manner, there’s a straigthforward way to do that. Instead of creating XElements, you will want to work with XStreamingElements. More information about this topic can be found on the XStreamingElement msdn page

For illustration purposes, we need a large xml file. I’ve used the AdventureWorks database, selected all available customers, and converted the resultset to xml via XML AUTO. This resulted in a fairly large xml file containing around 18k customers. To be sure, I’ve duplicated the dataset a few times so I have an xml file of 130k customers, with a file size of 134MB.

A small excerpt: 

<Customers>
 <Customer CustomerID="11377" Title="Mr." FirstName="David" MiddleName="R." LastName="Robinett" Phone="238-555-0100" EmailAddress="david22@adventure-works.com" EmailPromotion="1" AddressType="Home" AddressLine1="Pappelallee 6667" City="Solingen" StateProvinceName="Nordrhein-Westfalen" PostalCode="42651" CountryRegionName="Germany"> 
  <Demographics> 
    <IndividualSurvey xmlns="http://schemas.microsoft.com/sqlserver/2004/07/adventure-works/IndividualSurvey"> 
      <TotalPurchaseYTD>83.97</TotalPurchaseYTD> 
      <DateFirstPurchase>2003-09-01Z</DateFirstPurchase> 
      <BirthDate>1961-02-23Z</BirthDate> 
      <MaritalStatus>M</MaritalStatus> 
      <YearlyIncome>25001-50000</YearlyIncome> 
      <Gender>M</Gender> 
      <TotalChildren>4</TotalChildren> 
      <NumberChildrenAtHome>0</NumberChildrenAtHome> 
      <Education>Graduate Degree</Education> 
      <Occupation>Clerical</Occupation> 
      <HomeOwnerFlag>1</HomeOwnerFlag> 
      <NumberCarsOwned>0</NumberCarsOwned> 
      <CommuteDistance>0-1 Miles</CommuteDistance> 
    </IndividualSurvey> 
  </Demographics> 
</Customer> 
<Customer CustomerID="11913" Title="Ms." FirstName="Rebecca" MiddleName="A." LastName="Robinson" Phone="648-555-0100" EmailAddress="rebecca3@adventure-works.com" EmailPromotion="0" AddressType="Home" AddressLine1="1861 Chinquapin Ct" City="Seaford" StateProvinceName="Victoria" PostalCode="3198" CountryRegionName="Australia">
...

 

The pipeline component 

In our simple scenario, we want to promote the average purchase amount of all customers residing in one of a select list of countries. We want to do this in a simple pipeline component, using linq.

The list of countries is configurable as a comma-seperated property:

private string _countries; 
public string Countries 
{ 
    get 
    { 
        return _countries; 
    } 
    set 
    { 
        _countries = value; 
    } 
}

Other than that, it’s a basic pipeline component, with most of its logic located in the Execute method. 

 

Parsing using XDocument

The easiest way to use linq functionality is to just use the XDocument class. We could implement it like this. 

public Microsoft.BizTalk.Message.Interop.IBaseMessage Execute(IPipelineContext pContext, Microsoft.BizTalk.Message.Interop.IBaseMessage pInMsg) 

{ 
    // Get the message stream   
    Stream bodyStream = pInMsg.BodyPart.GetOriginalDataStream();

    // If the stream can't seek, wrap it in a seekable stream 
    // so we can reset the position to 0 again 
    if (!bodyStream.CanSeek) 
    { 
        bodyStream = new ReadOnlySeekableStream(bodyStream); 
        pContext.ResourceTracker.AddResource(bodyStream); 
    } 
    // Read stream into an XDocument 
    XmlReader reader = XmlReader.Create(bodyStream); 
    XDocument document = XDocument.Load(reader);

    XNamespace surveyNS = "http://schemas.microsoft.com/sqlserver/2004/07/adventure-works/IndividualSurvey";

    // Get the average purchase amount for the selected countries 
    string[] selectedCountries = Countries.Split(';'); 
    decimal averagePurchaseAmount = document.Root.Elements("Customer").Where(el => selectedCountries.Contains(el.Attribute("CountryRegionName").Value)).Average(el => Convert.ToDecimal(el.Element("Demographics").Element(surveyNS + "IndividualSurvey").Element(surveyNS + "TotalPurchaseYTD").Value));

    // Promote the result 
    pInMsg.Context.Promote("AveragePurchaseAmount", "http://schemas.codit.eu/blog/2012/06", averagePurchaseAmount);

    bodyStream.Position = 0; 
    pInMsg.BodyPart.Data = bodyStream;

    return pInMsg; 
}

 

The linq query selects all customers, checks to see if the country is one of the specified countries, and then calculates the average based on the Demographics/IndividualSurvey/TotalPurchaseYTD element. At the end, we promote the resulting average, and reset the stream position to 0, so other pipeline components can use the stream again.

When testing, we can see the promotion is done correctly, and the pipeline component works. However, if we launch perfmon to analyze the memory usage, we can see the issue:

image

 

The memory usage goes up dramatically each time our pipeline is called (3 times).We top at 385MB consumed, and this is for 1 message going through. Once multiple or larger files are coming in, we might run into memory problems.

 

Parsing using a custom axis method

An alternative solution is to use a so called custom axis method. This method is responsible for returning a collection of elements (typically XElements) by using yield return. This provides deferred execution, which enables us to stream over the elements.

In the previous example we used this to load the message:

XmlReader reader = XmlReader.Create(bodyStream); 

XDocument document = XDocument.Load(reader);

and used our query like

document.Root.Elements("Customer").Where( … )

We will need to create an axis method for getting all the customers. We can then use Linq queries on this method. Here’s the implementation of our  custom axis method:

static IEnumerable<XElement> StreamElements(Stream stream, string nodeName) 

{ 
    using (XmlReader reader = XmlReader.Create(stream)) 
    { 
        while (reader.Read()) 
        { 
            if ((reader.NodeType == XmlNodeType.Element) && reader.Name == nodeName) 
            { 
                XElement element = (XElement)XElement.ReadFrom(reader); 
                yield return element; 
            } 
        } 
    } 
}

While reading, we check if the name of the current node is equal to the one we specified in our method description. If that is the case, we read the full node and yield return this.

Next to adding this axis method, we will need to modify our pipeline component to utilize the method instead of the using an XDocument:

public Microsoft.BizTalk.Message.Interop.IBaseMessage Execute(IPipelineContext pContext, Microsoft.BizTalk.Message.Interop.IBaseMessage pInMsg) 

{ 
    // Get the message stream   
    Stream bodyStream = pInMsg.BodyPart.GetOriginalDataStream();

    // If the stream can't seek, wrap it in a seekable stream 
    // so we can reset the position to 0 again 
    if (!bodyStream.CanSeek) 
    { 
        bodyStream = new ReadOnlySeekableStream(bodyStream); 
        pContext.ResourceTracker.AddResource(bodyStream); 
    } 
    XNamespace surveyNS = "http://schemas.microsoft.com/sqlserver/2004/07/adventure-works/IndividualSurvey";

    // Get the average purchase amount for the selected countries 
    string[] selectedCountries = Countries.Split(';'); 
    decimal averagePurchaseAmount = StreamElements(bodyStream, "Customer").Where(el => selectedCountries.Contains(el.Attribute("CountryRegionName").Value)).Average(el => Convert.ToDecimal(el.Element("Demographics").Element(surveyNS + "IndividualSurvey").Element(surveyNS + "TotalPurchaseYTD").Value));

    // Promote the result 
    pInMsg.Context.Promote("AveragePurchaseAmount", "http://schemas.codit.eu/blog/2012/06", averagePurchaseAmount);

    bodyStream.Position = 0; 
    pInMsg.BodyPart.Data = bodyStream;

    return pInMsg; 
}

We are no longer using an XDocument to load the stream, instead we forward this stream to our custom axis method so it can read and return XElements in a streaming way.

If we redeploy our pipeline, we can see the result in perfmon again (same scale as previous graph):

 

image

I have dropped the same file 3 times again, using the new pipeline component. It’s hard to notice when the files are dropped, but we can see the maximum memory used is 3MB (down from 385MB).

 

Remarks & Conclusion

The difference between using our custom axis method and just using XDocument is clear, the streaming component does its job with a significantly smaller memory footprint.

But that doesn’t mean we can just always use an axis method. In case of a large database table dump, log file, or other file with a relatively flat structure, it’s a good match since we can iterate over each single node individually, and each node is small. Once the xml structure gets more complex, it can be very difficult or impossible to create an efficient axis method.

Next to that, some Linq operators require all elements to be in memory to do its job. For example, when using OrderBy(), the memory usage will jump up because it needs to compare the current element of the iterator against the other elements which were processed already.

As a final note, we are reading the complete message in both scenario’s, and then reset the position to 0 again. This has the consequence that the message is read at least twice (once by our pipeline, once by the messaging engine). If we want to implement our components in a true streaming manner, we would need to create our own custom stream class. The issue then arises on how to, and when to call our linq queries, since we are no longer in control of when reads are happening. If you're interested in this scenario, it might be a good idea to look into the IObservable<T> and IObserver<T> interfaces introduced in .NET 4. Together with Reactive Extensions a solution might be available, but that is out of the scope of this blog entry.

Posted in: BizTalk | LINQ

Tags: