Requirements
We should support to deploy cxf client(as service consumer) and cxf server (as service provider) easilly into servicemix.
We need a cxf service engine where we can deploy cxf client and server into. As a service engine (SE) provides some type of logic inside the JBI environment and only communicates with the NMR. If a Cxf SE needs to communicate outside the JBI environment (e.g. the client inside servicemix but server outside servicemix or vice verse), it must send a message to a binding component (via the NMR), so we also need a cxf binding component to bridge the incoming/outgoing message. Also we should leverage ws-* feature of CXF by cxf binding component.
key components
cxf service engine
overview
ServiceMix Cxf Se component is a JBI Service Engine exposing (annotated) POJO as services on the JBI Bus.
It reuses JBI binding and JBI transport of apache cxf internally to perform service invocations and xml marshaling.
Here for how-to-use cxf se.
features for cxf service engine
- jsr181 annotations
- jaxb2 binding
- wsdl auto generation
- java proxy support
- MTOM / attachments support
main class for cxf service engine
We use maven-xbean-plugin to generate component xbean schema, so we need add comment to specify the xml element name firstly
/**
*
* @org.apache.xbean.XBean element="component"
*/
This class just extends servicemix DefaultComponent
Which is JBI spec compitable component. By extending servicemix DefaultComponent, we can only focus cxf specific issue for our cxf service engine since all JBI spec details is handled by servicemix DefaultComponent. Here we just init cxf bus when CxfSeComponent init.
@Override
protected void doInit() throws Exception {
bus = BusFactory.getDefaultBus();
super.doInit();
}
We use maven-xbean-plugin to generate endpoint xbean schema, so we need add comment to specify the xml element name firstly
/**
*
* @org.apache.xbean.XBean element="endpoint"
*/
In this class we init and publish jaxws endpoint with cxf JBI transport and JBI binding, and also register this endpoint to servicemix.
/*
* (non-Javadoc)
*
* @see org.apache.servicemix.common.Endpoint#validate()
*/
@Override
public void validate() throws DeploymentException {
if (getPojo() == null) {
throw new DeploymentException("pojo must be set");
}
JaxWsServiceFactoryBean serviceFactory = new JaxWsServiceFactoryBean();
serviceFactory.setPopulateFromClass(true);
endpoint = new EndpointImpl(getBus(), getPojo(),
new JaxWsServerFactoryBean(serviceFactory));
endpoint.setBindingUri(org.apache.cxf.binding.jbi.JBIConstants.NS_JBI_BINDING); endpoint.setInInterceptors(getInInterceptors());
endpoint.setInFaultInterceptors(getInFaultInterceptors());
endpoint.setOutInterceptors(getOutInterceptors());
endpoint.setOutFaultInterceptors(getOutFaultInterceptors());
endpoint.getOutInterceptors().add(new WrapperOutInterceptor());
if (isMtomEnabled()) {
endpoint.getInInterceptors().add(new AttachmentInInterceptor());
}
JaxWsImplementorInfo implInfo = new JaxWsImplementorInfo(getPojo()
.getClass());
setService(implInfo.getServiceName()); setInterfaceName(implInfo.getInterfaceName()); setEndpoint(implInfo.getEndpointName().getLocalPart()); super.validate();
}
We publish the endpoint when endpoint start in servicemix
/*
* (non-Javadoc)
*
* @see org.apache.servicemix.common.endpoints.ProviderEndpoint#start()
*/
@Override
public void start() throws Exception {
super.start();
address = "jbi: + ID_GENERATOR.generateSanitizedId();//use jbi prefix for the endpoint address to pick up cxf JBI transport
endpoint.publish(address);
setService(endpoint.getServer().getEndpoint().getService().getName());
setEndpoint(endpoint.getServer().getEndpoint().getEndpointInfo()
.getName().getLocalPart());
}
We just get MessageExchange from Servicemix NMR and delegate it to cxf JBI transport
/*
* (non-Javadoc)
*
* @see org.apache.servicemix.common.endpoints.ProviderEndpoint#process(javax.jbi.messaging.MessageExchange)
*/
@Override
public void process(MessageExchange exchange) throws Exception {
JBITransportFactory jbiTransportFactory = (JBITransportFactory) getBus()
.getExtension(ConduitInitiatorManager.class)
.getConduitInitiator(CxfSeComponent.JBI_TRANSPORT_ID); QName serviceName = exchange.getService();
if (serviceName == null) {
serviceName = getService();
exchange.setService(serviceName);
}
QName interfaceName = exchange.getInterfaceName();
if (interfaceName == null) {
interfaceName = getInterfaceName();
exchange.setInterfaceName(interfaceName);
}
JBIDestination jbiDestination = jbiTransportFactory
.getDestination(serviceName.toString()
+ interfaceName.toString()); DeliveryChannel dc = getContext().getDeliveryChannel();
jbiTransportFactory.setDeliveryChannel(dc);
jbiDestination.setDeliveryChannel(dc);
if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
jbiDestination.getJBIDispatcherUtil().dispatch(exchange); }
}
We use maven-xbean-plugin to generate proxy xbean schema, so we need add comment to specify the xml element name firstly
/**
*
* @org.apache.xbean.XBean element="proxy" description="A CXF proxy"
*
*/
We create client proxy exactly the same as we do in cxf
private Object createProxy() throws Exception {
JaxWsProxyFactoryBean cf = new JaxWsProxyFactoryBean();
cf.setServiceName(getService());
cf.setServiceClass(type);
cf.setAddress("jbi: + new IdGenerator().generateSanitizedId());//use jbi prefix for the endpoint address to pick up cxf JBI transport
cf.setBindingId(org.apache.cxf.binding.jbi.JBIConstants.NS_JBI_BINDING); Bus bus = BusFactory.getDefaultBus();
JBITransportFactory jbiTransportFactory = (JBITransportFactory) bus
.getExtension(ConduitInitiatorManager.class)
.getConduitInitiator(CxfSeComponent.JBI_TRANSPORT_ID);
if (getInternalContext() != null) {
DeliveryChannel dc = getInternalContext().getDeliveryChannel();
if (dc != null) {
jbiTransportFactory.setDeliveryChannel(dc);
}
}
return cf.create();
}
configureation
interceptors configuration
Since cxfse is using apache cxf internally, so customer can configure cxf se endpoint with inteceptors which follow cxf inteceptor api.
example per as below
<cxfse:endpoint>
<cxfse:pojo>
<bean class="org.apache.cxf.calculator.CalculatorImpl">
</bean>
</cxfse:pojo>
<cxfse:inInterceptors>
<bean class="org.apache.cxf.interceptor.LoggingInInterceptor"/>
</cxfse:inInterceptors>
<cxfse:outInterceptors>
<bean class="org.apache.cxf.interceptor.LoggingOutInterceptor"/>
</cxfse:outInterceptors>
<cxfse:inFaultInterceptors>
<bean class="org.apache.cxf.interceptor.LoggingInInterceptor"/>
</cxfse:inFaultInterceptors>
<cxfse:outFaultInterceptors>
<bean class="org.apache.cxf.interceptor.LoggingOutInterceptor"/>
</cxfse:outFaultInterceptors>
</cxfse:endpoint>
proxy configuration
Customer can use proxy from one of the client bean, or from inside another component, and call the JBI endpoint as a plain Java object.
<bean class="org.apache.servicemix.cxfse.GreeterImplForClientProxy">
<property name="calculator">
<cxfse:proxy service="calculator:CalculatorService" type="org.apache.cxf.calculator.CalculatorPortType" />
</property>
</bean>
cxf binding component
overview
ServiceMix ships with a JBI compliant HTTP/SOAP or JMS/SOAP binding component named servicemix-cxf-bc which use apache cxf internally.
Here for how-to-use cxf bc.
features for cxf binding component
- JBI compliant Binding Component
- SOAP 1.1 and 1.2 support
- MIME attachments
- Support for InOut or InOnly MEPs as consumers or providers
- SSL support
- WS-Security support
- WS-Policy support
- WS-RM support
- WS-Addressing support
main class for cxf binding component
We use maven-xbean-plugin to generate component xbean schema, so we need add comment to specify the xml element name firstly
/**
*
* @org.apache.xbean.XBean element="component"
*/
This class just extends servicemix DefaultComponent
Which is JBI spec compitable component. By extends servicemix DefaultComponent, we can only focus cxf specific issue for our cxf binding component since all JBI spec details is handled by servicemix DefaultComponent. Here we just init cxf bus when CxfBcComponent init.
@Override
protected void doInit() throws Exception {
if (getBusConfig() != null) { SpringBusFactory bf = new SpringBusFactory();
bus = bf.createBus(getBusConfig());
} else {
bus = BusFactory.newInstance().createBus();
}
super.doInit();
}
We use maven-xbean-plugin to generate consumer xbean schema, so we need add comment to specify the xml element name firstly
/**
*
* @org.apache.xbean.XBean element="consumer"
*/
Consumer of binding component means expose JBI internal endpoint to outside client, for example
standalone cxf client ===> cxf bc consumer===> service in cxf se
CxfBcConsumer play two role in the message process
CxfBcProvider
We use maven-xbean-plugin to generate provider xbean schema, so we need add comment to specify the xml element name first
/**
*
* @org.apache.xbean.XBean element="provider"
*/
Provider of binding component means bridge internal request to outside server, for example
cxf proxy inside servicemix ===> cxf bc provider===> standalone cxf server outside servicemix
Similiar as CxfBcConsumer, CxfBcProvider also play two role in the message process
- servicemix provider
- cxf client
The message process looks like
provider get incoming jbi message from NMR==> transform it to soap message==> send out to external service==> in the registered MessageObserver, get response soap message from external service==> transform it to jbi message==> send back to the NMR.
This process splitted into two classes,
The first half in CxfBcProvider
public void process(MessageExchange exchange) throws Exception {
NormalizedMessage nm = exchange.getMessage("in");
CxfBcProviderMessageObserver obs = new CxfBcProviderMessageObserver(exchange, this);
conduit.setMessageObserver(obs); Message message = ep.getBinding().createMessage(); message.put(MessageExchange.class, exchange);
Exchange cxfExchange = new ExchangeImpl();
message.setExchange(cxfExchange);
cxfExchange.setOutMessage(message);
...
...
message.setInterceptorChain(outChain);
InputStream is = JBIMessageHelper.convertMessageToInputStream(nm.getContent());
StreamSource source = new StreamSource(is);
message.setContent(Source.class, source);
message.setContent(InputStream.class, is);
conduit.prepare(message);
OutputStream os = message.getContent(OutputStream.class);
XMLStreamWriter writer = message.getContent(XMLStreamWriter.class);
String encoding = getEncoding(message);
try {
writer = StaxOutInterceptor.getXMLOutputFactory(message).createXMLStreamWriter(os, encoding);
} catch (XMLStreamException e) {
}
message.setContent(XMLStreamWriter.class, writer);
message.put(org.apache.cxf.message.Message.REQUESTOR_ROLE, true);
outChain.doIntercept(message); XMLStreamWriter xtw = message.getContent(XMLStreamWriter.class);
if (xtw != null) {
xtw.writeEndDocument();
xtw.close();
}
os.flush();
is.close();
os.close();
}
The second half in CxfBcProviderMessageObserver
public void onMessage(Message message) {
try {
...
...
inChain.add(providerEndpoint.getInInterceptors());
inChain.add(providerEndpoint.getInFaultInterceptors());
soapMessage.setInterceptorChain(inChain);
inChain.doIntercept(soapMessage);
if (boi.getOperationInfo().isOneWay()) {
messageExchange.setStatus(ExchangeStatus.DONE);
} else if (soapMessage.get("jbiFault") != null
&& soapMessage.get("jbiFault").equals(true)) {
...
...
} else {
messageExchange.setStatus(ExchangeStatus.DONE);
}
boolean txSync = messageExchange.getStatus() == ExchangeStatus.ACTIVE
&& messageExchange.isTransacted()
&& Boolean.TRUE.equals(messageExchange
.getProperty(JbiConstants.SEND_SYNC));
if (txSync) {
providerEndpoint.getContext().getDeliveryChannel().sendSync(
messageExchange);
} else {
providerEndpoint.getContext().getDeliveryChannel().send(
messageExchange);
}
} catch (Exception e) {
e.printStackTrace();
}
...
...
}
configuration
Since cxfbc is using apache cxf internally, so customer can configure cxf bc provider or consumer with inteceptors which follow cxf inteceptor api. A full example
typical use case
We have test cases almost cover all features of cxf bc and se, such as