Requirements

We should support to deploy cxf client(as service consumer) and cxf server (as service provider) easilly into servicemix.
We need a cxf service engine where we can deploy cxf client and server into. As a service engine (SE) provides some type of logic inside the JBI environment and only communicates with the NMR. If a Cxf SE needs to communicate outside the JBI environment (e.g. the client inside servicemix but server outside servicemix or vice verse), it must send a message to a binding component (via the NMR), so we also need a cxf binding component to bridge the incoming/outgoing message. Also we should leverage ws-* feature of CXF by cxf binding component.

key components

cxf service engine

overview

ServiceMix Cxf Se component is a JBI Service Engine exposing (annotated) POJO as services on the JBI Bus.
It reuses JBI binding and JBI transport of apache cxf internally to perform service invocations and xml marshaling.
Here for how-to-use cxf se.

features for cxf service engine

  • jsr181 annotations
  • jaxb2 binding
  • wsdl auto generation
  • java proxy support
  • MTOM / attachments support

main class for cxf service engine

CxfSeComponent

We use maven-xbean-plugin to generate component xbean schema, so we need add comment to specify the xml element name firstly

/**
 * 
 * @org.apache.xbean.XBean element="component"
 */

This class just extends servicemix DefaultComponent
Which is JBI spec compitable component. By extending servicemix DefaultComponent, we can only focus cxf specific issue for our cxf service engine since all JBI spec details is handled by servicemix DefaultComponent. Here we just init cxf bus when CxfSeComponent init.

    @Override
    protected void doInit() throws Exception {
        bus = BusFactory.getDefaultBus();
        super.doInit();
    }

CxfSeEndpoint

We use maven-xbean-plugin to generate endpoint xbean schema, so we need add comment to specify the xml element name firstly

/**
 * 
 * @org.apache.xbean.XBean element="endpoint"
 */

In this class we init and publish jaxws endpoint with cxf JBI transport and JBI binding, and also register this endpoint to servicemix.

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.servicemix.common.Endpoint#validate()
     */
    @Override
    public void validate() throws DeploymentException {
        if (getPojo() == null) {
            throw new DeploymentException("pojo must be set");
        }
        JaxWsServiceFactoryBean serviceFactory = new JaxWsServiceFactoryBean();
        serviceFactory.setPopulateFromClass(true);
        endpoint = new EndpointImpl(getBus(), getPojo(),
                new JaxWsServerFactoryBean(serviceFactory));
        endpoint.setBindingUri(org.apache.cxf.binding.jbi.JBIConstants.NS_JBI_BINDING);//using cxf jbi binding, so the message use JBI wrapper
        endpoint.setInInterceptors(getInInterceptors());
        endpoint.setInFaultInterceptors(getInFaultInterceptors());
        endpoint.setOutInterceptors(getOutInterceptors());
        endpoint.setOutFaultInterceptors(getOutFaultInterceptors());
        endpoint.getOutInterceptors().add(new WrapperOutInterceptor());
        if (isMtomEnabled()) {
            endpoint.getInInterceptors().add(new AttachmentInInterceptor());
        }
        JaxWsImplementorInfo implInfo = new JaxWsImplementorInfo(getPojo()
                .getClass());
        
        setService(implInfo.getServiceName()); // register servicename to NMR
        setInterfaceName(implInfo.getInterfaceName()); // register interfacename(portType) to NMR 
        setEndpoint(implInfo.getEndpointName().getLocalPart()); // register Endpoint(Port) name to NMR
        super.validate();
    }

We publish the endpoint when endpoint start in servicemix

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.servicemix.common.endpoints.ProviderEndpoint#start()
     */
    @Override
    public void start() throws Exception {
        super.start();
        address = "jbi://" + ID_GENERATOR.generateSanitizedId();//use jbi prefix for the endpoint address to pick up cxf JBI transport
        endpoint.publish(address);
        setService(endpoint.getServer().getEndpoint().getService().getName());
        setEndpoint(endpoint.getServer().getEndpoint().getEndpointInfo()
                .getName().getLocalPart());
    }

We just get MessageExchange from Servicemix NMR and delegate it to cxf JBI transport

    /*
     * (non-Javadoc)
     * 
     * @see org.apache.servicemix.common.endpoints.ProviderEndpoint#process(javax.jbi.messaging.MessageExchange)
     */
    @Override
    public void process(MessageExchange exchange) throws Exception {
        JBITransportFactory jbiTransportFactory = (JBITransportFactory) getBus()
                .getExtension(ConduitInitiatorManager.class)
                .getConduitInitiator(CxfSeComponent.JBI_TRANSPORT_ID);//load JBITransportFactory from cxf bus
        QName serviceName = exchange.getService();
        if (serviceName == null) {
            serviceName = getService();
            exchange.setService(serviceName);
        }
        QName interfaceName = exchange.getInterfaceName();
        if (interfaceName == null) {
            interfaceName = getInterfaceName();
            exchange.setInterfaceName(interfaceName);
        }
        JBIDestination jbiDestination = jbiTransportFactory
                .getDestination(serviceName.toString()
                        + interfaceName.toString());// each endpoint should have its own JBIDestination, so just query it by serviceName and interfaceName
        DeliveryChannel dc = getContext().getDeliveryChannel();
        jbiTransportFactory.setDeliveryChannel(dc);
        jbiDestination.setDeliveryChannel(dc);
        if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
            jbiDestination.getJBIDispatcherUtil().dispatch(exchange);//delegate MessageExchange to cxf JBI transport, cxf in charge of message unmarshalling and service invocation 
        }
        
    }

CxfSeProxyFactoryBean

We use maven-xbean-plugin to generate proxy xbean schema, so we need add comment to specify the xml element name firstly

/**
 * 
 * @org.apache.xbean.XBean element="proxy" description="A CXF proxy"
 * 
 */

We create client proxy exactly the same as we do in cxf

    private Object createProxy() throws Exception {
        JaxWsProxyFactoryBean cf = new JaxWsProxyFactoryBean();
        cf.setServiceName(getService());
        cf.setServiceClass(type);
        cf.setAddress("jbi://" + new IdGenerator().generateSanitizedId());//use jbi prefix for the endpoint address to pick up cxf JBI transport
        cf.setBindingId(org.apache.cxf.binding.jbi.JBIConstants.NS_JBI_BINDING);//use cxf JBI binding
        Bus bus = BusFactory.getDefaultBus();
        JBITransportFactory jbiTransportFactory = (JBITransportFactory) bus
                .getExtension(ConduitInitiatorManager.class)
                .getConduitInitiator(CxfSeComponent.JBI_TRANSPORT_ID);
        if (getInternalContext() != null) { 
            DeliveryChannel dc = getInternalContext().getDeliveryChannel();
            if (dc != null) {
                jbiTransportFactory.setDeliveryChannel(dc);
            }
        }
        return cf.create();
    }

configureation

interceptors configuration

Since cxfse is using apache cxf internally, so customer can configure cxf se endpoint with inteceptors which follow cxf inteceptor api.
example per as below

      <cxfse:endpoint>
        <cxfse:pojo>
          <bean class="org.apache.cxf.calculator.CalculatorImpl">
          </bean>

        </cxfse:pojo>
        <cxfse:inInterceptors>
          <bean class="org.apache.cxf.interceptor.LoggingInInterceptor"/>
        </cxfse:inInterceptors>
        <cxfse:outInterceptors>
          <bean class="org.apache.cxf.interceptor.LoggingOutInterceptor"/>
        </cxfse:outInterceptors>
        <cxfse:inFaultInterceptors>
          <bean class="org.apache.cxf.interceptor.LoggingInInterceptor"/>
        </cxfse:inFaultInterceptors>
        <cxfse:outFaultInterceptors>
          <bean class="org.apache.cxf.interceptor.LoggingOutInterceptor"/>
        </cxfse:outFaultInterceptors>
      </cxfse:endpoint>

proxy configuration

Customer can use proxy from one of the client bean, or from inside another component, and call the JBI endpoint as a plain Java object.

      <bean class="org.apache.servicemix.cxfse.GreeterImplForClientProxy">
       <property name="calculator">
          <cxfse:proxy service="calculator:CalculatorService" type="org.apache.cxf.calculator.CalculatorPortType" />
       </property>
     </bean>

cxf binding component

overview

ServiceMix ships with a JBI compliant HTTP/SOAP or JMS/SOAP binding component named servicemix-cxf-bc which use apache cxf internally.
Here for how-to-use cxf bc.

features for cxf binding component

  • JBI compliant Binding Component
  • SOAP 1.1 and 1.2 support
  • MIME attachments
  • Support for InOut or InOnly MEPs as consumers or providers
  • SSL support
  • WS-Security support
  • WS-Policy support
  • WS-RM support
  • WS-Addressing support

main class for cxf binding component

CxfBcComponent

We use maven-xbean-plugin to generate component xbean schema, so we need add comment to specify the xml element name firstly

/**
 * 
 * @org.apache.xbean.XBean element="component"
 */

This class just extends servicemix DefaultComponent
Which is JBI spec compitable component. By extends servicemix DefaultComponent, we can only focus cxf specific issue for our cxf binding component since all JBI spec details is handled by servicemix DefaultComponent. Here we just init cxf bus when CxfBcComponent init.

    @Override
    protected void doInit() throws Exception {
        if (getBusConfig() != null) {//for the cxf bc, we need pass in cxf bus configuration file sometimes, e.g. we need configure bus to support ws-*
            SpringBusFactory bf = new SpringBusFactory();
            bus = bf.createBus(getBusConfig());
        } else {
            bus = BusFactory.newInstance().createBus();
        }
        super.doInit();
    }

CxfBcConsumer

We use maven-xbean-plugin to generate consumer xbean schema, so we need add comment to specify the xml element name firstly

/**
 * 
 * @org.apache.xbean.XBean element="consumer"
 */

Consumer of binding component means expose JBI internal endpoint to outside client, for example
standalone cxf client ===> cxf bc consumer===> service in cxf se
CxfBcConsumer play two role in the message process

  • cxf server
  • servicemix consumer
    As cxf server, just init a dummy server without real service class. This cxf server init the proper transport(http or jms) from the service model which is built from the wsdl. Since there is no service class for this server, this server will do no real service invocation, just transform message format between soap binding and jbi binding.
    For a request message, need transform soap message to jbi message, two main interceptors get involved
    JbiInInterceptor
    JbiInInterceptor mainly create JBI NormalizedMessage and copy headers and attachments from soap message to JBI message
    JbiInWsdl1Interceptor
    JbiInWsdl1Interceptor transform soap messasge to jbi message according to the service model
    For a reponse message, need transform jbi message to soap message, also two main interceptors get involved
    JbiOutInterceptor
    JbiOutInterceptor mainly copy attachments and headers from jbi message to soap message
    JbiOutWsdl1Interceptor
    JbiOutWsdl1Interceptor transform jbi mesage to soap message based on the service model
    As servicemix consumer, send jbi message which transformed from the soap message to NMR and need process the response jbi message from the NMR.
    CxfBcConsumer use its own JbiInvokerInterceptor to wire these two role together.
    Main part of JbiInvokerInterceptor
    ublic class JbiInvokerInterceptor extends
                AbstractPhaseInterceptor<Message> {
    
            public JbiInvokerInterceptor() {
                super(Phase.INVOKE);
            }
    
            public void handleMessage(final Message message) throws Fault {
    
                final Exchange cxfExchange = message.getExchange();
                final Endpoint endpoint = cxfExchange.get(Endpoint.class);
                final Service service = endpoint.getService();
                final Invoker invoker = service.getInvoker();
    
                MessageExchange exchange = message
                        .getContent(MessageExchange.class);
                ComponentContext context = message.getExchange().get(
                        ComponentContext.class);
                CxfBcConsumer.this.configureExchangeTarget(exchange);
                CxfBcConsumer.this.messages.put(exchange.getExchangeId(), message);
                CxfBcConsumer.this.isOneway = message.getExchange().get(
                        BindingOperationInfo.class).getOperationInfo().isOneWay();
                message.getExchange().setOneWay(CxfBcConsumer.this.isOneway);
    
                try {
                    if (CxfBcConsumer.this.synchronous
                            && !CxfBcConsumer.this.isOneway) {
                        message.getInterceptorChain().pause();//pause the incoming intercepor chain of cxf server, the mesage transformation from soap to jbi is done now
                        context.getDeliveryChannel().sendSync(exchange, timeout*1000);//send exchange which contain jbi message to NMR
                        process(exchange);
                    } else {
                        context.getDeliveryChannel().send(exchange);
    
                    }
                } catch (Exception e) {
                    throw new Fault(e);
                }
            }
    
        }
    

    Also the code for processing response jbi message looks like

        public void process(MessageExchange exchange) throws Exception {
            Message message = messages.remove(exchange.getExchangeId());
            message.getInterceptorChain().resume();//resume the cxf outgoing interceptor chain, will do the message transformation from jbi to soap
            if (exchange.getStatus() == ExchangeStatus.ACTIVE) {
                exchange.setStatus(ExchangeStatus.DONE);
                message.getExchange().get(ComponentContext.class)
                        .getDeliveryChannel().send(exchange);
            }
        }
    

CxfBcProvider
We use maven-xbean-plugin to generate provider xbean schema, so we need add comment to specify the xml element name first

/**
 * 
 * @org.apache.xbean.XBean element="provider"
 */

Provider of binding component means bridge internal request to outside server, for example
cxf proxy inside servicemix ===> cxf bc provider===> standalone cxf server outside servicemix
Similiar as CxfBcConsumer, CxfBcProvider also play two role in the message process

  • servicemix provider
  • cxf client
    The message process looks like
    provider get incoming jbi message from NMR==> transform it to soap message==> send out to external service==> in the registered MessageObserver, get response soap message from external service==> transform it to jbi message==> send back to the NMR.
    This process splitted into two classes,
    The first half in CxfBcProvider
        public void process(MessageExchange exchange) throws Exception {
            NormalizedMessage nm = exchange.getMessage("in");//get incoming jbi message
                   
            CxfBcProviderMessageObserver obs = new CxfBcProviderMessageObserver(exchange, this);
            conduit.setMessageObserver(obs);// register message observer for cxf client
            Message message = ep.getBinding().createMessage();//create outgoing cxf message based on the service model, both soap11 and soap12 is supported
            message.put(MessageExchange.class, exchange);
            Exchange cxfExchange = new ExchangeImpl();
            message.setExchange(cxfExchange);
            cxfExchange.setOutMessage(message);       
              ...
              ...        
            message.setInterceptorChain(outChain);
            InputStream is = JBIMessageHelper.convertMessageToInputStream(nm.getContent());
            
            StreamSource source = new StreamSource(is);
            message.setContent(Source.class, source);
            
            message.setContent(InputStream.class, is);
            
            conduit.prepare(message);
            OutputStream os = message.getContent(OutputStream.class);
            XMLStreamWriter writer = message.getContent(XMLStreamWriter.class);
            
    
            String encoding = getEncoding(message);
            
            try {
                writer = StaxOutInterceptor.getXMLOutputFactory(message).createXMLStreamWriter(os, encoding);
            } catch (XMLStreamException e) {
                //
            }
            message.setContent(XMLStreamWriter.class, writer);
            message.put(org.apache.cxf.message.Message.REQUESTOR_ROLE, true);
            outChain.doIntercept(message);//transform jbi message to soap message
            XMLStreamWriter xtw = message.getContent(XMLStreamWriter.class);
            if (xtw != null) {
                xtw.writeEndDocument();
                xtw.close();
            }
    
            os.flush();
            is.close();
            os.close();//send out message to external service
    
        }
    

The second half in CxfBcProviderMessageObserver

    public void onMessage(Message message) {
        try {
             ...
             ...
            inChain.add(providerEndpoint.getInInterceptors());
            inChain.add(providerEndpoint.getInFaultInterceptors());
            soapMessage.setInterceptorChain(inChain);
            inChain.doIntercept(soapMessage);//transform soap message to jbi message
           
            //send back jbi message to nmr
            if (boi.getOperationInfo().isOneWay()) {
                messageExchange.setStatus(ExchangeStatus.DONE);
            } else if (soapMessage.get("jbiFault") != null
                    && soapMessage.get("jbiFault").equals(true)) {
                ...
                ...
            } else {
                messageExchange.setStatus(ExchangeStatus.DONE);

            }
            boolean txSync = messageExchange.getStatus() == ExchangeStatus.ACTIVE
                    && messageExchange.isTransacted()
                    && Boolean.TRUE.equals(messageExchange
                            .getProperty(JbiConstants.SEND_SYNC));
            if (txSync) {
                providerEndpoint.getContext().getDeliveryChannel().sendSync(
                        messageExchange);
            } else {
                providerEndpoint.getContext().getDeliveryChannel().send(
                        messageExchange);
            }

        } catch (Exception e) {
            e.printStackTrace();
        } 
        ...
        ...
    }

configuration

Since cxfbc is using apache cxf internally, so customer can configure cxf bc provider or consumer with inteceptors which follow cxf inteceptor api. A full example

typical use case

We have test cases almost cover all features of cxf bc and se, such as

  • proxy
  • ws-policy
  • ws-security
  • ws-rm
  • ws-addressing
  • MTOM
  • multiple transport support
  • oneway
  • Exception
    Go through these tests for servicemix-cxf-se and servicemix-cxf-bc

    some tips

    what JBI binding message looks like

    <jbi:message xmlns:jbi="http://java.sun.com/xml/ns/jbi/wsdl-11-wrapper" xmlns:msg="http://apache.org/cxf/calculator" name="add" type="msg:add" version="1.0">
        <jbi:part>
            <add xmlns="http://apache.org/cxf/calculator/types">
                <arg0>1</arg0>
                <arg1>2</arg1>
            </add>
        </jbi:part>
    </jbi:message>
    

    what SOAP binding message looks like

    <soap:Envelope xmlns:soap="http://www.w3.org/2003/05/soap-envelope">
        <soap:Body>
            <add xmlns="http://apache.org/cxf/calculator/types">
                <arg0>1</arg0>
                <arg1>2</arg1>
            </add>
        </soap:Body>
    </soap:Envelope>
    

    can cxf component work with other component inside servicemix

    Yes, provided those components can hanle the normalized JBI message.