BizTalk: Dynamic SFTP, SOAP adapter and synchronised delivery notification within an orchestration using the Custom SQL send adapter

Hi Folks,

I recently had to build a dispatching system, where a message would be received from our web service. The message would then be saved to the database with:

Stage: IMPORT and Status: SUCCESS

A standard BizTalk SQL Receive location would pull the messages and change the message in the SQL table to:

Stage: BizTalk and Status: Processing

Envelopes and De-batching

The above pull is done in batch mode, this means, that SQL will pull a batch of records and use the FOR XML clause. However when the message is built in SQL, there is NO root node. However the SQL adapter can inject one, see image below on how it does it:

image

Now the messages are DEBATCHED by using an ENEVELOPE schema which will recognise the xml data, how is this enforced? The Receive pipeline on the SQL adapter is set to: XMLReceive.

There is allot of information out there regarding DEBATCHING and using envelopes. However, when you create a XSD to represent the xml data, just remember to ensure the property for an ENVELOPE is set:

image

This property is found on the Schema Design in Visual Studio.

So, now the messages are de-batched into the BizTalk message box.

An orchestration will have a DIRECT receive port with a filter that subscribes to this type of debatched record. So if the schema for the batch was BATCHRecords.xsd containing a collection of Records.xsd schema, then the orchestration will subscribe to the schema type of Records.xsd. I always define the individual record schema first and then define the batchrecord schema, then in the batch schema I just IMPORT the individual record schema.

image

Ok, we getting diverted, but understanding envelopes is crucial in BizTalk when pulling data from SQL.

Orchestration

Now that the message is in the orchestration we need to have the following goals in mind:

  • If the message succeeds, a part of the message needs to be extracted using Xpath and saved to the SQL database, by calling a stored procedure with parameters that accept a Xml document and other variables. The stage and status is then set to END, SUCCESS
  • The default SQL adapter cannot do this very well, so I have a custom SQL adapter to do it, you can read up about it here:

    Romiko-Custom SQL Adapter

  • If the message fails, then the original message is sent to SQL and Stage and Status is set to END, FAILED.
  • If message fails being dispatched to an external SOAP or SFTP destination then the orchestration must handle the delivery failure and MUST NOT suspend the SFTP or SOAP send port, since the message is flagged in SQL as failed, so it can always be reset from the database using an interface for help desk.
  • A way to store the ACKNOWLEGEMENT from the SOAP or SFTP so we can define if the delivery failed. The orchestration MUST NOT use an asynchronous way to send to SFTP or SOAP and then carry on executing shapes further down the line.
  • The custom SQL adapter can be asynchronous, since this is in house part, if it fails, we want to be able to resume the port, so the SQL aspect MUST be resumable, else help desk gets no answer and see messages with stage BIZTALK and status PROCESSING.

 

Now, lets dispel some of the gossip out there about delivery notification.

Delivery notification will always give you a NACK or ACK back when a message fails or success respectively when sending to a port. This is done if the port is enable for delivery notification.

Now let me get rid of some incorrect information out there:

  1. You DO NOT need a Long Running/ATOMIC Transaction scope to use Delivery Notification
  2. You CAN use delivery notification on Dynamic Ports πŸ™‚

I say this, since I see books, a blogs saying that your delivery notification send shape must be in a scope that is set to Long Running or Atomic, not true at all…

So what do you need.

  1. A scope (Transaction = NONE), with the last shape being the send to the SOAP or SFTP port
  2. The SOAP and SFTP port must be setup for delivery notification
  3. The scope must be set to Synchronised
  4. A Delivery Notification Exception handler, A Soap handler (for soap failures) and a SYSTEM.Exception scope (NOT general exception, have no clue why it is default in BizTalk orchestration designer, it is not cool, always use a System.Exception type, then you have access to the Exception ex variable to get the message e.g. Trace.WriteLine(ex.Message)
  5. The ports enabled for delivery notification can be dynamic, however in a message construct, you must set the retry to 0, no point using retries, if you are flagging an external system with a FAIL.

ANY shapes after the send shape IN THE SCOPE will execute asynchronously, so ensure those shapes do not depend on the delivery, perhaps a DEBUG TRACE or something that you like to have for debugging. Another nice thing to have is a global boolean variable that is used to detect if a NACK was sent so you can always check this bool value AFTER the scope where the send shape was in, to ensure a ACK was received to continue processing, else you know it was a NACK and can then safely call the SQL code to send the message to SQL with Failure.

After the exception, is where the above logic for the SQL persistence can be put, also in its own scope, that is not set to synchronise and with no delivery notification, so we can also resume the send port of sql, but never resume SFTP or SOAP.

Below are screen images of the orchestration I used for this.

image

image

We subscribe using promotes properties of STAGE and STATUS.

Above we get the message and then detect if it is for SOAP sending or SFTP using some Distinguished fields.

Here is the if expression:

Record.MetaData.Protocol.Type == "SOAP"

Ok, so now lets check out the SFTP and then we will do the SOAP.

First thing to notice is that BizTalk does not support SFTP, however some cool developers have made one, and you can download it here:

Blogical.Shared.Adapters.SFTP:

http://sftpadapter.codeplex.com/Release/ProjectReleases.aspx?ReleaseId=19784

Ok, so here is the control flow:

image

 

image

Another thing is that SFTP is Request Model, not Request/Response like SOAP. you will see in the soap branch later that I will extract the response from the soap response message and use that, where as with SFTP, I use the originally request for database storage.

In the message construct we ensure we set the port retry to 0:

Here is the code for the message construct to set SFTP properties dynamically, I used distinguished fields to access the meta data from the message, basically when SQL got the the message it did a join with a control table to find out the destination SFTP location depending on the data pulled πŸ™‚

//Romiko van de dronker 14/01/2010
// Set Port Properties
Port_Dynamic_OneWay(Microsoft.XLANGs.BaseTypes.Address) = "SFTP://" + Record.MetaData.Protocol.FTPProperties.SSHHost + ":" + Record.MetaData.Protocol.FTPProperties.SSHPort + "/";
// Assign the ProcessSaleslead data to the new output message
ProcessSalesLeadMessage = xpath(Record," /*[local-name()=’Record’ and namespace-uri()=’
http://Romiko.Dispatch.Schemas’]/*[local-name()=’Data’ and namespace-uri()=’http://Romiko.Dispatch.Schemas’]/*[local-name()=’ProcessSalesLead’ and namespace-uri()=’http://www.starstandard.org/STAR/5′]");
ProcessSalesLeadMessage(BTS.RetryCount) = 0; // To support NACK from port.
// Set SFTP properties
ProcessSalesLeadMessage(BTS.OutboundTransportType) = "SFTP";
ProcessSalesLeadMessage(Blogical.Shared.Adapters.Sftp.Schemas.host) = Record.MetaData.Protocol.FTPProperties.SSHHost;
ProcessSalesLeadMessage(Blogical.Shared.Adapters.Sftp.Schemas.portno) = System.Int16.Parse(Record.MetaData.Protocol.FTPProperties.SSHPort);
ProcessSalesLeadMessage(Blogical.Shared.Adapters.Sftp.Schemas.user) = Record.MetaData.Protocol.FTPProperties.SSHUser;
ProcessSalesLeadMessage(Blogical.Shared.Adapters.Sftp.Schemas.password) = Record.MetaData.Protocol.FTPProperties.SSHPassword;
ProcessSalesLeadMessage(Blogical.Shared.Adapters.Sftp.Schemas.remotepath) = Record.MetaData.Protocol.FTPProperties.SSHRemotePath;
ProcessSalesLeadMessage(Blogical.Shared.Adapters.Sftp.Schemas.remotefile ) = .Helper.OrchestrationHelper.BuildDateTime(System.DateTime.Now) + "." + Record.MetaData.Protocol.FTPProperties.SSHRemoteFile;
//Connection TimeOut
ProcessSalesLeadMessage(Blogical.Shared.Adapters.Sftp.Schemas.connectionlimit)= System.Int16.Parse(Record.MetaData.Protocol.FTPProperties.SSHConnectionLimit);
//Optional Settings – Not yet supported, as null values can exist, need to build a helper class to not assign nulls, if they null.
//Debug
ProcessSalesLeadMessage(Blogical.Shared.Adapters.Sftp.Schemas.trace) = System.Boolean.Parse(Record.MetaData.Protocol.FTPProperties.SSHDebugTrace);

Ok, now that the message is constructed, all we do is create a scope that is SYNCRHONISED and ensure the SFTP port delivery notification is on:

image

image

So above we set Delivery Notification to Transmitted.

Now anything to do with SFTP goes into this scope and NOTHING else.

Notice my exception types, I have a delivery and system.exception:

image

image

In the exception, you can set the state to Failed πŸ™‚

SendSuccess = false;
Stage = "END";
Status = "FAI";
Error = "SFTP Failure, " + e.Message;
System.Diagnostics.Trace.WriteLine("BIZTALK: Throwing a Delivery Notification Error" + e.Message);

After this we then have a condition statement to see if the delivery was a success, if it was it executes the branch of SQL to make it a success else executes the sql to fail the record.

Notice this is a NEW SCOPE: (UPDATE SQL) tranaction = none

why? Well, delivery notification will only work in it’s scope, so a new scope is needed for anything new that MUST continue, it can be in a nested scope like ours or in the orchestration default scope, I like it is a nested scope for neatness and control.

if expression is:

SendSuccess == true

If this is the case, I will create a new message type and send the data to SQL, This is done in the last message construct:

//Romiko van de dronker 14/01/2010
OutRecord = Record;
OutRecord(MMIT.Dispatch.Schemas.Stage) = Stage;
OutRecord(MMIT.Dispatch.Schemas.Status) = Status;

 

Also with the CUSTOM SQL adapter, I send this data to the store procedure, note I send XML directly to SQL πŸ™‚

image 

For the SOAP aspect, the same logic applies. However there are some catches.

The URL address set in the orchestration is in this format SOAP://myaddress.com/mypage.asmx.

Now when using a static adapter, this is not a problem, since in the adapter you set the URL to http:// format.

There is no shortcut, you need to make a custom pipeline component to handle this. The adapter will understand the SOAP format, however the pipeline will then do the magic and replace the SOAP with HTTP.

Another thing, you can even set the ASSEMBLY and proxy class for the SOAP adapter dynamically, the message that came into the orchestration has this information, and I set this at runtime πŸ™‚ Another cool feature is I have a SOAP Header and a SOAP body, so my soap proxy has two method parameter, this means that my multipart message MUST match the paramter types of the soap proxy. lets see how this works:

The MULTIPART message looks like this:

For my SOAP proxy method paramters, I use type XmlDocument for both, so my multipart message that I will send to the SOAP adapter is this:

image

The header uses a schema from:

<Security xmlns="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd"&gt;

<UsernameToken>

<Username>string</Username>

<Password>string</Password>

</UsernameToken>

<KeyIdentifier>string</KeyIdentifier>

</Security>

It is a common schema from oasis.

So what we going to do is parse the message body and the header to the SOAP adapter and the proxy will do the rest:

Here is my proxy class code:

using System;
using System.Net;
using System.Web.Services.Protocols;
using System.Xml;
using Romiko.Proxies.MMITGMDSConsumer;
using System.Xml.Serialization;

namespace Romiko.Proxies.Suppliers
{
    [System.Web.Services.WebServiceBindingAttribute(Name = "StarBodToMMITSoap", Namespace = "
http://Romiko.WebServices/")]
    public partial class MMITConsumer : System.Web.Services.Protocols.SoapHttpClientProtocol
    {

        [System.Web.Services.Protocols.SoapDocumentMethodAttribute("http://Romiko.WebServices/SubmitStarProcessSalesLead", Use = System.Web.Services.Description.SoapBindingUse.Literal, ParameterStyle = System.Web.Services.Protocols.SoapParameterStyle.Bare)]
        public AcknowledgeSalesLeadType Dispatch(XmlDocument MessagePartBody, XmlDocument MessagePartSOAPHeader)
        {
            try
            {
                System.Diagnostics.Trace.WriteLine("ServiceProxy:Dispatch to " + this.Url);
                System.Diagnostics.Trace.WriteLine("ServiceProxy:Message in is: " + MessagePartBody.OuterXml);
                System.Diagnostics.Trace.WriteLine("ServiceProxy:Header is: " + MessagePartSOAPHeader.OuterXml);

                StarBodToMMIT ws = new StarBodToMMIT();
                XmlSerializer X = new XmlSerializer(typeof(ProcessSalesLeadType), "
http://www.starstandard.org/STAR/5");
                ProcessSalesLeadType processSalesLead = (ProcessSalesLeadType)X.Deserialize(new XmlNodeReader(MessagePartBody));

                ws.Url = this.Url;

                XmlSerializer Y= new XmlSerializer(typeof(SecuritySoapHeader), "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd");
                ws.Security = (SecuritySoapHeader)Y.Deserialize(new XmlNodeReader(MessagePartSOAPHeader));

                AcknowledgeSalesLeadType response = ws.SubmitStarProcessSalesLead(processSalesLead);           
                return response;

            }
            catch (WebException)
            {
                // To use the retry functionality of the send port
                // This exception type includes time outs and service unavailable from the web service.
                throw;
            }
            catch (Exception e)
            {
                throw new Exception("Problems occured in the proxy:" + e + e.InnerException);
            }
        }
    }
}

BizTalk will automatically detect the message parts, and the name they called is resolved to the proxy parameter names, as you can see they the same as the multi-part message πŸ™‚

 

Ok, so that part was easy. How do you tell the soap adapter to do this dynamically, the magic is in the message construct:

//Romiko van de dronker 15/01/2010

// Assign the ProcessSaleslead data to the new output message
ProcessSalesLeadMessageRequest.MessagePartBody = xpath(DRONKYRecord," /*[local-name()=’DRONKYRecord’ and namespace-uri()=’
http://Romiko.DRONKY.Dispatch.Schemas’]/*[local-name()=’Data’ and namespace-uri()=’http://Romiko.DRONKY.Dispatch.Schemas’]/*[local-name()=’ProcessSalesLead’ and namespace-uri()=’http://www.starstandard.org/STAR/5′]");

//Assign SOAP Header to Context
ProcessSalesLeadMessageRequest.MessagePartSOAPHeader = Romiko.DRONKY.Helper.OrchestrationHelper.BuildSoapHeader(DRONKYRecord.MetaData.Protocol.SOAPProperties.SOAPHeaderUserName, DRONKYRecord.MetaData.Protocol.SOAPProperties.SOAPHeaderPassword);

// Retry Count MUST be 0 for NACK management, also we in a synchronisation scope.
ProcessSalesLeadMessageRequest(BTS.RetryCount) = 0; // To support NACK from port.

// Set Port Properties
Port_Dynamic_SOAP(Microsoft.XLANGs.BaseTypes.Address) = DRONKYRecord.MetaData.Protocol.SOAPProperties.SOAPURI;
System.Diagnostics.Trace.WriteLine("URI is: " + DRONKYRecord.MetaData.Protocol.SOAPProperties.SOAPURI);

// Set SOAP Proxy assembly
ProcessSalesLeadMessageRequest(SOAP.AssemblyName) = DRONKYRecord.MetaData.Protocol.SOAPProperties.SOAPAssemblyName;
ProcessSalesLeadMessageRequest(SOAP.TypeName) = DRONKYRecord.MetaData.Protocol.SOAPProperties.SOAPTypeName;
ProcessSalesLeadMessageRequest(SOAP.MethodName) = DRONKYRecord.MetaData.Protocol.SOAPProperties.SOAPMethodName;

//Set Misc SOAP settings
ProcessSalesLeadMessageRequest(SOAP.ClientConnectionTimeout) = System.Int32.Parse(DRONKYRecord.MetaData.Protocol.SOAPProperties.SOAPClientConnectionTimeOut);
ProcessSalesLeadMessageRequest(SOAP.UseSoap12) = System.Boolean.Parse(DRONKYRecord.MetaData.Protocol.SOAPProperties.SOAPUseSoap12);

You can see above, I dynamically set the assembly, class and method to call:

Cool!

ok, the magic part I spoke of for the pipeline, i assume you can build them,so here is the code of the execute and the custom method:

First set the logical port properties for the SEND Pipeline (ENCODER)

image

Ok, here is the partial code of the pipeline:

note the part that replaces SOAP string with HTTP, i do some extra checks, to support HTTPS on default ports 80 and 443 πŸ™‚

public IBaseMessage Execute(IPipelineContext pc, IBaseMessage inmsg)
        {
            try
            {
                System.Diagnostics.Trace.WriteLine("Pipeline:Execute entry");

                string address = (string)inmsg.Context.Read("OutboundTransportLocation", "http://schemas.microsoft.com/BizTalk/2003/system-properties");
                string method = (string)inmsg.Context.Read("MethodName", "http://schemas.microsoft.com/BizTalk/2003/soap-properties");

                System.Diagnostics.Trace.WriteLine("Pipeline:Updating context");

                Uri uri = new Uri(address);

                string protocol = "https";

                if (uri.Port == 80 || uri.Port == 8080)
                    protocol = "http";

                //Send over HTTP or HTTPS
                string addr = protocol + "://" + uri.Host + ":" + uri.Port + uri.PathAndQuery;
                System.Diagnostics.Trace.WriteLine("HTTP url is: " + addr);
                //Here is the magic where we can now convert SOAP://…. to
HTTP://…. and now we ready for some full contact generic web service proxy calls!

                inmsg.Context.Write("OutboundTransportLocation", "http://schemas.microsoft.com/BizTalk/2003/system-properties", addr);
                inmsg.Context.Write("Operation", "
http://schemas.microsoft.com/BizTalk/2003/system-properties", method);

                System.Diagnostics.Trace.WriteLine("Pipeline:End, Number of messages parts: " + inmsg.PartCount);
                System.Diagnostics.Trace.WriteLine("Pipeline:End, Body Part name:" + inmsg.BodyPartName);
                return inmsg;
            }
            finally
            {
                System.Diagnostics.Trace.WriteLine("Pipeline:Execute exit");
            }
        }

 

Well I hope this has shown how powerful orchestration development can be, and in fact you can CONTROL how an orchestration resumes by using scopes to control persistence points, in my case, I want the orchestration to NEVER suspend and always succeed, and handle failures by updating status states in the SQL database for support to handle πŸ™‚

As soon as I get time, I will work on WCF and other cool features of BizTalk itching to upgrade to BizTalk 2009! Which I am doing right now πŸ™‚

Good Luck.

Advertisement
  • Uncategorized

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s