Monday, September 28, 2009

Developing and testing a Streaming Pipeline using the Xpath mutator

When developing pipeline components it is higly recommended to do any processing in a streaming manner to ensure that the performance hit is minimal.

The other day I had to build a pipeline component to strip Attachments from the messagestream. I decided to use an XPath mutator because of the streaming features, in this blog I try to describe what steps I followed to implement and test the pipeline.

· Extracting the BizTalk message dll’s from the GAC
· Creating the pipeline component Using the XPath mutator
· Testing the pipeline component

1) Extracting the BizTalk message dll’s from the GAC

Because some of the nicest BizTalk assemblies are only in the GAC it requires some effort to extract these. The assembies are: Microsoft.BizTalk.Streaming.dll and Microsoft.BizTalk.XPathReader.dll.

See the following link for information on how to extract the dll's:

2) Creating the pipeline component Using the XPath mutator

The XPath mutator allows a stream to be mutated using XPath queries that fire like C# events; on this event you are able to modify the current stream. Although the XPath mutator is explained on several sites, I still had to do some research in how to use the mutator and how to create a new stream using the mutated stream.

Gotcha: when the event is fired, the outputstream is mutated, so when an Xpath statement is used and it fires multiple times on the same node, the ORGINAL value is provided, not the mutated.

The basic steps in how to implement a pipeline component can be found anywhere so I won’t go into detail.

The following code will process the message, mutates the stream and creates a new stream based on the mutations.

public IBaseMessage Execute(IPipelineContext pc, IBaseMessage inmsg)
inmsg.BodyPart.Data = ProcessMessage(inmsg);
return inmsg;

// Process the incoming message and extract the attachment.
private Stream ProcessMessage(IBaseMessage inmsg)
byte[] buffer = new byte[4096];
int count = 0;

//new memory stream to manipulate the message.
MemoryStream messageStream = new MemoryStream();

//stream pointer to the original stream.
Stream originalStream = inmsg.BodyPart.Data;

//write the original message to the memory stream
while (0 != (count = originalStream.Read(buffer, 0, 4096)))
messageStream.Write(buffer, 0, count);

//rewind the stream to the beginning
messageStream.Position = 0;

// 1) Extract / replace the content
XPathCollection xPathCollection = new XPathCollection();
xPathCollection.Add(new XPathExpression(AttachmentXpath));
ValueMutator mutator = new ValueMutator(this.AttachmentMutator);

// Define the XpathMutator
XPathMutatorStream mutatorStream = new XPathMutatorStream(messageStream, xPathCollection, mutator);

MemoryStream resultStream = new MemoryStream();

// Read the stream and process the Xpath results
// 2) - write the content to disk and return the file location
// 3) - replace the content with the file location
while (0 != (count = mutatorStream.Read(buffer, 0, 4096)))
resultStream.Write(buffer, 0, count);

// 4) set the output stream to the beginning
resultStream.Position = 0;

// 5) promote a context property and store the file location
inmsg.Context.Promote(AttachmentContextPropertyName, AttachmentContextPropertyNamespace, AttachmentFileName);

// 6) return the updated message stream
return resultStream;

// Modifies the attachment value in the message
private void AttachmentMutator(int matchIdx, XPathExpression matchExpr, string origVal, ref string finalVal)
// write the content to a file
AttachmentFileName = WriteContentToLocation(FileLocation, origVal);

//replace the content value with the file location
finalVal = AttachmentFileName;

Note: The following link helped me a lot

3) Testing the pipeline component

The most important step, is the testing stage! I have used the pipeline testing library from Tomas Restepo ( that provides an clean and easy interface.

After unpacking the pipeline testing library, the following dll’s are used to do pipeline component testing:


The following code is then implemented to test the actual component, pretty nice and easy.

string expectedContent = File.ReadAllText(@"TestData\attachment");

string XPATH_CONTENT = "/*[local-name()='Message']/*[local-name()='Body']/*/*[local-name()='Attachments']/*[local-name()='attachment']/*[local-name()='content']";

string testStep = File.ReadAllText(@"TestData\InputSendTest.xml");
testStep = testStep.Replace("FILELOCATION", @"TestData\attachment");

//create the inputmessage based on a xml file
IBaseMessage inputMessage = MessageHelper.CreateFromString(testStep);

//create an empty pipeline
SendPipelineWrapper sendPipeline = PipelineFactory.CreateEmptySendPipeline();

//create the pipeline component
IBaseComponent component = new Custom.B2B.PipelineComponents.Attachments.AttachmentAssembler();

//add the pipeline component
sendPipeline.AddComponent(component, PipelineStage.Encode);

string fileLocation = System.IO.Path.GetFullPath(@"TestData");

//configure the pipeline component
(component as AttachmentAssembler).FileLocation = fileLocation;
(component as AttachmentAssembler).AttachmentLocationXpath = XPATH_CONTENT;
(component as AttachmentAssembler).AttachmentContentXpath = XPATH_CONTENT;

//create a list to store the BizTalk message
List inputMessages = new List();

//execute the pipeline
IBaseMessage resultMessage = sendPipeline.Execute(inputMessages.ToArray());

//process the result
StreamReader sr = new StreamReader(resultMessage.BodyPart.Data);
string result = sr.ReadToEnd();

XmlDocument xDoc = new XmlDocument();

string content = xDoc.SelectSingleNode(XPATH_CONTENT).InnerText;

//test the result
Assert.AreEqual(content, expectedContent, "The attachment is not identical to the expected data!");

1 comment:

Poonam said...

It's Very Nice Blog