Asynchronous Pipelines
In the previous section we showed how pipelines can shedule services which use any XML object model and showed that NetKernel
processes may be written in either procedural or declarative languages. In this section we show how NetKernel's asynchronous
architecture may be used to increase throughput when processing high-latency XML sources.
Asynchronous
In order to simulate a high-latency source, such as a complex DB query or a networked service request, we will create a delayed-echo
service. Below is a simple Beanshell script which sleeps for 3 seconds before returning it's param argument as it's response.
/**************************
* Beanshell delayed-echo service
***************************/
void main()
{ //Sleep for 3 seconds
Thread.currentThread().sleep(3000);
//Source parameter
param=context.source("this:param:param");
//Finally return param
response=context.createResponseFrom(param);
context.setResponse(response);
}
Below is our familiar pipeline, in the first instruction we are requesting the delayed-echo beanshell script but in addition we're showing some of
the power of the active URI model - we are adding the active:dpml URI as the param argument of the request. What this means is that the delayed-echo
service will be called, it will sleep for 3 seconds, it will then request it's param argument, which will
execute stage3.idoc. So we have created a delayed execution of our original stage3.idoc pipeline!
All very fancy as this is - the point is simply to simulate a high-latency XML source. How this service is requested is discussed below...
/**************************
* Beanshell script to perform multistage
* XQuery into XSLT.
***************************/
void main()
{ //Request stage3.idoc through the delayed-echo service
req=context.createSubRequest();
req.setURI("active:beanshell");
req.addArgument("operator","delayed-echo.bsh");
req.addArgument("param", "active:dpml+operand@ffcpl:/demos/xquery/pipeline2/stage3.idoc");
//Issue request asynchronously / ie fork sub-process
handle=context.issueAsyncSubRequest(req);
//In the mean time handle the parameter argument
param=null;
try
{ param=context.source("this:param:param");
}
catch(Exception e)
{ //Use default
param=context.source("../pipeline4/default.xml");
}
//Just to prove we're executing asynchronously
for(int i=0; i< 100; i++)
{ System.out.println(i+": still waiting for delayed-echo");
}
//Now join with the async process to retrieve result
output=handle.join();
//Extract all speeches with lines containing the parameter argument
req=context.createSubRequest();
req.setURI("active:xquery");
req.addArgument("input", output);
req.addArgument("param", param);
req.addArgument("operator","../pipeline3/xq4.xq");
output=context.issueSubRequest(req);
//Apply XSLT
req=context.createSubRequest();
req.setURI("active:xslt");
req.addArgument("operand", output);
req.addArgument("param", param);
req.addArgument("operator","../pipeline3/style1.xsl");
output=context.issueSubRequest(req);
//Finally return response
response=context.createResponseFrom(output);
context.setResponse(response);
}
try it!
More important than the delayed echo service is that we are using the issueAsynchRequest() method to issue the request.
This is the NetKernel equivalent of a Unix fork. This request forks off asynchronously whilst the calling process does not
block and continues execution.
To prove that the parent process continues we process the param arguments and then do a simple loop, outputting some messages to standard-out.
We then join with the asynchronous forked process to retrieve its result, the call to join() will block until the async request is completed.
Finally we continue as before.
The net result of this pipeline is that whilst the high-latency request is made for the results of stage3.idoc we do some useful (and some not so useful!)
work. Try executing this pipeline and you will see that it always takes 3 seconds and that each time it runs we get messages output to the std-out.
Finally, just to show that this is generally applicable, regardless of the choice of processing language, here's the same asynchronous pipeline written in Groovy...
/**************************
* Groovy script to perform multistage
* Delayed XQuery into XSLT.
***************************/
req=context.createSubRequest()
req.setURI("active:beanshell")
req.addArgument("operator","delayed-echo.bsh")
req.addArgument("param", "active:dpml+operand@ffcpl:/demos/xquery/pipeline2/stage3.idoc")
//Issue request asynchronously / ie fork sub-process
handle=context.issueAsyncSubRequest(req)
//In the mean time handle the parameter argument
param=null
try
{ param=context.source("this:param:param")
}
catch(Exception e)
{ //Use default
param="../pipeline4/default.xml"
}
//Just to prove we're executing asynchronously (and in groovy)
for (value in 0...100)
{ println value + " : waiting for delayed-echo" }
//Now join with the async process to retrieve result
output=handle.join()
//Extract all speeches with lines containing the parameter argument
req=context.createSubRequest()
args = [ "input" : output ,
"param" : param ,
"operator" : "../pipeline3/xq4.xq"
]
req.setURI("active:xquery")
for (arg in args) { req.addArgument(arg.key, arg.value ) }
output=context.issueSubRequest(req)
//Apply XSLT
req=context.createSubRequest()
args = [ "operand" : output ,
"param" : param ,
"operator" : "../pipeline3/style1.xsl"
]
req.setURI("active:xslt")
for (arg in args) { req.addArgument(arg.key, arg.value ) }
output=context.issueSubRequest(req)
//Finally return response
response=context.createResponseFrom(output)
context.setResponse(response)
try it!
Conclusion
This guide has shown some of the patterns for creating, linking and scheduling XML pipelines on NetKernel. It has used XQuery and XSLT as
the main XML technologies to illustrate the patterns. There is no limitation on the type of XML technology that can deployed in a pipeline, for example it
is trivial to add multiple schema validation stages in a wide choice of schema languages. Equally it is straightforward to add new XML technologies or
business message specific XML services.