BizTalk: Executing Inline Send Pipelines in an Orchestration

Hi Folks,

No, I have not forgotten to post the blog about web services part 2 and MSBuild part 2, I need to find sometime to do it, I hate to rush it, you know…

Introduction

After toiling with Inline pipelines to achieve low latency objectives, I thought it might be a good idea to share what I have learnt about them.

I have two Send Pipelines.

One of them is used to prepare a dynamic Windows SharePoint Services Send Port

The other is a very complicated pipeline that transforms flat files to a very specific XML schema, it has it’s own rule engine and built in auditing components as well as a host of other class libraries that it uses for flat file conversion to XML. You might ask me, why the Fu… did you write your own component, when there is a BizTalk Mapper, you want me to be honest. I think the BizTalk mapper is a load of S%$^ for enterprise applications. It is cool for flight itinerary examples and very small transformations. Secondly it is slow. Thirdly is consumes huge amounts of memory.

So I decided to develop a Flat File mapping tool that is called from within a Pipeline, this allows me to use the streaming model within the pipeline and process one record at a time from a flat file. This I think is much more flexible. Secondly I can use a custom rule engine to apply manipulation and lastly, I can use a XSLT 2.0! You heard me right, BizTalk does not support XSLT 2.0, so there goes allot of mapping features. How it works in a nutshell, is that I have a database that is used to dynamically detect file feeds and apply the appropriate transformation, the XSLT 2.0 templates are stored in a configuration database that the FileManagement web service interfaces with, the Pipeline component library uses the handlers to these libraries for managing all the metadata. It is extremely fast. Also the actually mapping of the data is stored in a Serializable class, and a UI is used to De-serialize and Serialize this mapping data for on the fly changes. This means, that if the flat file schema changes or the mapping needs to change, I DO NOT need to REDPLOY half of my bloody BizTalk assembles, you know the score, the schemas need to be redeployed, the mappings and so the list goes on.

This is not what this blog is about, but I thought it would be nice since the orchestration I show you is a FileManagement orchestration and is responsible for this part. The orchestration will process a flat file, send it to SharePoint for archiving and then transform it to XML format and then send it to a central database for workflow processing.

Inline Send Pipeline

Here is an overview of the Orchestration, I kept it simple, so big warning! This orchestration is not yet optimised to reduce persistence points. However using Inline Pipelines can improve latency, if done correctly. So if you use this orchestration please optimise it. For example, I have no Suspend shapes, and I should have them, I don’t like to lose my data!

SharePoint Send Pipeline

The task of this pipeline is to promote some properties that the dynamic send port will use e.g.

pInMsg.Context.Write("ConfigPropertiesXml", "http://schemas.microsoft.com/BizTalk/2006/WindowsSharePointServices-properties", configxml);

You can read my previous blog about this property at:

developing-a-dynamic-biztalk-windows-sharepoint-adapter

Another task that it does is generate custom metadata by interfacing with a Business Object Layer to detect all metadata.

So in a nutshell this pipeline is more of a property manager.

SendWSSProperties.btp

image 

Here is the Execute method of the WSSFilesEncoder component.

public IBaseMessage Execute(IPipelineContext pContext, IBaseMessage pInMsg)
       {
           IBaseMessage outMsg;          
           string recieveFileNamePath = pInMsg.Context.Read("ReceivedFileName", "http://schemas.microsoft.com/BizTalk/2003/file-properties").ToString();
           string originalFileName = Path.GetFileName(recieveFileNamePath);
           string receiveFolderName = Path.GetDirectoryName(recieveFileNamePath);
           AuditFileFeedRequest request = new AuditFileFeedRequest();
           try
           {
               //Link the context of the output message to the input message
               outMsg = pContext.GetMessageFactory().CreateMessage();
               outMsg.Context = pInMsg.Context;
               outMsg.AddPart(pInMsg.BodyPartName, pContext.GetMessageFactory().CreateMessagePart(), true);

               //initialize the FileMetaData
               string messageID =  pInMsg.Context.Read("InterchangeID", "http://schemas.microsoft.com/BizTalk/2003/system-properties").ToString();
               SharePointMetaData smd = new SharePointMetaData(originalFileName, receiveFolderName, messageID);
               ConfigPropertiesXml  spp = smd.GetCustomSharePointColumns();
               request = AuditManager.GenerateAuditRequest(spp.SPSIDValue, spp.FileFeedSourceValue, spp.OriginalFileNameValue, spp.NameValue);
               outMsg.Context.Write("Filename", "http://schemas.microsoft.com/BizTalk/2006/WindowsSharePointServices-properties", smd.NewFileName);
               outMsg.Context.Promote("Filename", "http://schemas.microsoft.com/BizTalk/2006/WindowsSharePointServices-properties", smd.NewFileName);
               string configxml = spp.Serialize().InnerXml;
               pInMsg.Context.Write("ConfigPropertiesXml", "http://schemas.microsoft.com/BizTalk/2006/WindowsSharePointServices-properties", configxml);
           }
           catch (Exception e )
           {
               LogManager.LogGeneral("Exception Occured " + e.Message);
               AuditManager.Audit(request, null, "SharePoint SendPort: " + incomingArchiving, "Failure", e.Message);
               throw new Exception("Exception Occured: " + e.Message);

           }
           AuditManager.Audit(request, null, "SharePoint SendPort: " + incomingArchiving, "Success","SharePoint Import Ended");
           return pInMsg;
       }

So above, a flat file goes in and a flat file comes out with some new properties!

For those sharepoint Guru’s? I developed a custom class that can be used to manage properties being sent to a document library.

So if you look at my Document Library, you can see when I send a flat file that metadata is populated, this is all done with the class I show below.

image

It is a really cool class, if using a dynamic send port to SharePoint you can use this class to generate the config properties in the message context at runtime, just customise it for you! Compare this to the Document Library columns I have above. Then in a pipeline you can instantiate this class and fill in the properties from a xml file or a database!

using System;
using System.CodeDom.Compiler;
using System.ComponentModel;
using System.Diagnostics;
using System.IO;
using System.Xml;
using System.Xml.Schema;
using System.Xml.Serialization;

namespace MMIT.FileManagement.BOL.SharePoint
{
    /// <remarks/>
    [GeneratedCode("xsd", "2.0.50727.42")]
    [Serializable]
    [DebuggerStepThrough]
    [DesignerCategory("code")]
    [XmlType(AnonymousType = true)]
    [XmlRootAttribute(Namespace = "", IsNullable = false)]
    public class ConfigPropertiesXml
    {
        private string SPSIDField = "SPSID";

        private string propertySource1Field ="";

        private string FileFeedSourceField = "FileFeedSource";

        private string propertySource2Field = "";

        private string OriginalFileNameField = "OriginalFileName";

        private string propertySource3Field = "";

        private string BizTalkMessageIDField = "BizTalkMessageID";

        private string propertySource4Field = "";

        private string RegionField = "Region";

        private string propertySource5Field = "";

        private string CountryField = "Country";

        private string propertySource6Field = "";

        private string BrandField = "Brand";

        private string propertySource7Field = "";

        private string NameField = "Name";

        private string propertySource8Field = "";     

        /// <remarks/>
        [XmlElement(Form = XmlSchemaForm.Unqualified, ElementName = "PropertyName1")]
        public string SPSID
        {
            get { return SPSIDField;}
            set { SPSIDField = value; }
        }

        /// <remarks/>
        [XmlElement(Form = XmlSchemaForm.Unqualified, ElementName = "PropertySource1")]
        public string SPSIDValue
        {
            get { return propertySource1Field; }
            set { propertySource1Field = value; }
        }

        /// <remarks/>
        [XmlElement(Form = XmlSchemaForm.Unqualified, ElementName = "PropertyName2")]
        public string FileFeedSource
        {
            get { return FileFeedSourceField; }
            set { FileFeedSourceField = value; }
        }

        /// <remarks/>
        [XmlElement(Form = XmlSchemaForm.Unqualified, ElementName = "PropertySource2")]
        public string FileFeedSourceValue
        {
            get { return propertySource2Field; }
            set { propertySource2Field = value; }
        }

        /// <remarks/>
        [XmlElement(Form = XmlSchemaForm.Unqualified, ElementName = "PropertyName3")]
        public string OriginalFileName
        {
            get { return OriginalFileNameField; }
            set { OriginalFileNameField = value; }
        }

        /// <remarks/>
        [XmlElement(Form = XmlSchemaForm.Unqualified, ElementName = "PropertySource3")]
        public string OriginalFileNameValue
        {
            get { return propertySource3Field; }
            set { propertySource3Field = value; }
        }

        /// <remarks/>
        [XmlElement(Form = XmlSchemaForm.Unqualified, ElementName = "PropertyName4")]
        public string BizTalkMessageID
        {
            get { return BizTalkMessageIDField; }
            set { BizTalkMessageIDField = value; }
        }

        /// <remarks/>
        [XmlElement(Form = XmlSchemaForm.Unqualified, ElementName = "PropertySource4")]
        public string BizTalkMessageIDValue
        {
            get { return propertySource4Field; }
            set { propertySource4Field = value; }
        }

        /// <remarks/>
        [XmlElement(Form = XmlSchemaForm.Unqualified, ElementName = "PropertyName5")]
        public string Region
        {
            get { return RegionField; }
            set { RegionField = value; }
        }

        /// <remarks/>
        [XmlElement(Form = XmlSchemaForm.Unqualified, ElementName = "PropertySource5")]
        public string RegionValue
        {
            get { return propertySource5Field; }
            set { propertySource5Field = value; }
        }

        /// <remarks/>
        [XmlElement(Form = XmlSchemaForm.Unqualified, ElementName = "PropertyName6")]
        public string Country
        {
            get { return CountryField; }
            set { CountryField = value; }
        }

        /// <remarks/>
        [XmlElement(Form = XmlSchemaForm.Unqualified, ElementName = "PropertySource6")]
        public string CountryValue
        {
            get { return propertySource6Field; }
            set { propertySource6Field = value; }
        }

        /// <remarks/>
        [XmlElement(Form = XmlSchemaForm.Unqualified, ElementName = "PropertyName7")]
        public string Brand
        {
            get { return BrandField; }
            set { BrandField = value; }
        }

        /// <remarks/>
        [XmlElement(Form = XmlSchemaForm.Unqualified, ElementName = "PropertySource7")]
        public string BrandValue
        {
            get { return propertySource7Field; }
            set { propertySource7Field = value; }
        }

        /// <remarks/>
        [XmlElement(Form = XmlSchemaForm.Unqualified, ElementName = "PropertyName8")]
        public string Name
        {
            get { return NameField; }
            set { NameField = value; }
        }

        /// <remarks/>
        [XmlElement(Form = XmlSchemaForm.Unqualified, ElementName = "PropertySource8")]
        public string NameValue
        {
            get { return propertySource8Field; }
            set { propertySource8Field = value; }
        }

        /// <summary>
        /// From XML to Object
        /// </summary>
        /// <param name="doc"></param>
        /// <returns></returns>
        public static ConfigPropertiesXml BuildConfigPropertiesXml(XmlNode doc)
        {
            if (doc == null)
                return null;
            XmlSerializer X = new XmlSerializer(typeof(ConfigPropertiesXml));
            return (ConfigPropertiesXml)X.Deserialize(new XmlNodeReader(doc));
        }

        /// <summary>
        /// From Objec to XML
        /// </summary>
        /// <returns></returns>
        public XmlDocument Serialize()
        {
            XmlSerializerNamespaces ns = new XmlSerializerNamespaces();
            ns.Add("", "");
            XmlDocument doc = new XmlDocument();
            XmlWriterSettings writerSettings = new XmlWriterSettings();
            writerSettings.OmitXmlDeclaration = true;
            StringWriter stringWriter = new StringWriter();
            using (XmlWriter xmlWriter = XmlWriter.Create(stringWriter, writerSettings))
            {
                XmlSerializer X = new XmlSerializer(typeof(ConfigPropertiesXml));
                X.Serialize(xmlWriter, this,ns);
                doc.LoadXml(stringWriter.ToString());
            }        
            return doc;
        }

        /// <summary>
        /// Returns a populated ConfigPropertiesXml string
        /// </summary>
        /// <returns></returns>
        public static ConfigPropertiesXml SetSharePointColumns(int SharePointID, string FileFeedSource, string OriginalFileName, string BizTalkMessageID, string Region, string Country, string Brand, string NewFileName)
        {
            ConfigPropertiesXml cp = new ConfigPropertiesXml();

            cp.SPSIDValue = SharePointID.ToString();
            cp.FileFeedSourceValue = FileFeedSource;
            cp.OriginalFileNameValue = OriginalFileName;
            cp.BizTalkMessageIDValue = BizTalkMessageID;
            cp.RegionValue = Region;
            cp.CountryValue = Country;
            cp.BrandValue = Brand;
            cp.NameValue = NewFileName;
            return cp;
        }

    }
}

File Translator Pipeline

This pipeline actually modifies the message! A flat file goes in and a XML comes out. I apologise for the name UFT, it means universal file translator, I never made up this name! it is rather funny.

SendUFT.btp

image

Here is the execute method of the FileTranslator Pipeline.

public IBaseMessage Execute(IPipelineContext pc, IBaseMessage pInMsg)
       {
           RecordCount recordCount = new RecordCount();
           string configPropertiesXml = pInMsg.Context.Read("ConfigPropertiesXml", "http://schemas.microsoft.com/BizTalk/2006/WindowsSharePointServices-properties").ToString();
           XmlDocument doc = new XmlDocument();
           doc.LoadXml(configPropertiesXml);
           XmlNode node = doc.SelectSingleNode("ConfigPropertiesXml");
           ConfigPropertiesXml spp = ConfigPropertiesXml.BuildConfigPropertiesXml(node);
           AuditFileFeedRequest request = AuditManager.GenerateAuditRequest(spp.SPSIDValue, spp.FileFeedSourceValue, spp.OriginalFileNameValue, spp.NameValue);
           LogManager.LogGeneral("Started, UFT Import from SharePoint: " + spp.NameValue);

           try
           {

               UFTMappingAgent vFileConfig = new UFTMappingAgent(spp.FileFeedSourceValue, spp.SPSIDValue);
               StreamReader inputStream = new StreamReader(pInMsg.BodyPart.GetOriginalDataStream(), vFileConfig.UftMapping.Encoding);
               MemoryStream outputMemoryStream = UFTEngine.StartUFT(inputStream, recordCount, Simulation.Flag.Disabled, vFileConfig);
               outputMemoryStream.Position = 0;
               outputMemoryStream.Seek(0, SeekOrigin.Begin);
               pInMsg.BodyPart.Data = outputMemoryStream;
               LogManager.LogGeneral("Completed UFT PipeLine, File Import: " + spp.NameValue);
               //#if(DEBUG)
               DebugData(outputMemoryStream);
               //#endif
           }
           catch (Exception e)
           {
               LogManager.LogGeneral("File Import Failed: " + spp.NameValue + Environment.NewLine + e.Message);
               AuditManager.Audit(request, recordCount, "UFT", "Failure", e.Message);
               throw;
           }
           AuditManager.Audit(request, recordCount, "UFT", "Success", "UFT Ended");
           return pInMsg;
       }

Orchestration

Ok, now the exciting bit for those of you that already have pipelines and just want to know how to execute them. Here is my orchestration.

image

I highlighted the shapes that are used for preparing Inline Pipelines.

image

In this shape I take the input message that came into the orchestration and I add it to a PipeLine collection of type:

Microsoft.XLANGs.Pipeline.SendPipelineInputMessages

Basically, what you need to realize is that when you call a Send Pipeline it needs to know the 3 things:

1. The type of pipeline to call

2. The SendPipelineInputMessages

3. The Output variable that the pipeline uses to assign it’s output message

So before we execute a pipeline, the first thing we do is prepare the SendPipelineInputMessages. In my case, I am not batching, I just send one message to the collection. Remember messages are immutable in BPEL so when a pipeline spits out a message you MUST assign it to a NEW message. Ok here are the variable properties for my collection.

image

 

Here is the code for the expression shape above:

image

The next shape is specific to my business process and does not concern us much, but I put it here, it is a detection mechanism to detect a file feed and grabs its configuration from a database.

image

image

I then have an If statement to check if the file was successfully detected. If it is we can then execute the pipeline. Since we already added the input message to the collection, we ready to go. But before we do so, I have some good news!! Whenever you add messages to a collection of type Microsoft.XLANGs.Pipeline.SendPipelineInputMessages, the CONTEXT properties associated with the message is preserved!

image

You realise that the call to the Pipeline is ALWAYS in a Construction Shape since we will create a new message, in this case the new message will be: PromotedFlatFile

image

image

Here is the code for executing the pipeline, Notice the variables for the method!

image

So, there you have it, I then take the PromotedFlatFIle and send it to SharePoint.

image

 

image

I also have another pipeline further down, in that section, I do allot of the work in one expression within the Message Construct. Lets check it out!

image

image

Above, I use a new collection for my message (We could have used the existing one, BUT NEVER set it to null, you need the constructor!) So the statement below will never work.

FlatFileUFTPipelineInputMessage = null;

FlatFileUFTPipelineInputMessage.Add(PromotedFlatFile);

Here is the text representation of the call:

System.Diagnostics.Trace.WriteLine("Completed to send flat file to SharePoint");
SendUFTMessage = null;
FlatFileUFTPipelineInputMessage.Add(PromotedFlatFile);
System.Diagnostics.Trace.WriteLine("Executing UFT Pipeline");
Microsoft.XLANGs.Pipeline.XLANGPipelineManager.ExecuteSendPipeline
(
typeof(MMIT.FileManagement.Pipelines.SndUFT),
FlatFileUFTPipelineInputMessage,
SendUFTMessage
);
System.Diagnostics.Trace.WriteLine("Completed Executing UFT Pipeline");

Now I just send the New

SendUFTMessage which is an XML file and not a flat file!

What I do here is call another orchestration to import the xml data.

image

image

 

Conclusion

Executing Inline pipelines is pretty straight forward, however you need to make sure you working with the latest version of a message, in my case the UFT Pipeline was working with a message constructed from the previous pipeline!

Also, the above orchestration is not optimised. So choosing transaction scopes wisely and the use of Suspend shapes is a good idea, which I will do when i go back to work on Monday morning, sigh………..

Advertisements
Uncategorized

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s