LibraryLink ToToggle FramesPrintFeedback

Chapter 36. MINA

The mina: component is a transport for working with Apache MINA

mina:tcp://hostname[:port]
mina:udp://hostname[:port]
mina:multicast://hostname[:port]
mina:vm://hostname[:port}

From FUSE Mediation Router 1.3 onwards you can specify a codec in the Registry using the codec option. If you are using TCP and no codec is specified then the textline flag is used to determine if text line based codec or object serialization should be used instead. By default the object serialization is used.

For UDP/Multicast if no codec is specified the default uses a basic ByteBuffer based codec.

Multicast also has a shorthand notation mcast.

The VM protocol is used as a direct forwarding mechanism in the same JVM. See the MINA VM-Pipe API documentation for details.

A MinaProducer has a default timeout value of 30 seconds, while it waits for a response from the remote server.

In normal usage camel-mina only supports marshalling the body content - message headers and exchange properties will not be sent. However the option transferExchange does allow to transfer the exchange itself over the wire. See options below.

Option Default Value Description
codec null As of 1.3 or later you can refer to a named ProtocolCodecFactory instance in your Registry such as your Spring ApplicationContext which is then used for the marshalling
codec null FUSE Mediation Router 2.0: You must use the # notation to lookup your codec in the Registry. eg use #myCodec to lookup a bean with the id myCodec.
textline false Only used for TCP. If no codec is specified then you can use this flag in 1.3 or later to indicate a text line based codec; if not specified or the value is false then Object Serialization is assumed over TCP.
textlineDelimiter DEFAULT FUSE Mediation Router 1.5.1/2.0 Only used for TCP and if textline=true. Sets the text line delimiter to use. Possible values are: DEFAULT, AUTO, WINDOWS, UNIX or MAC. If none provided FUSE Mediation Router will default use DEFAULT. This delimiter is used to mark the end of text.
sync false/true As of 1.3 or later you can configure the exchange pattern to be either InOnly (default) or InOut. Setting sync=true means a synchronous exchange (InOut), where the client can read the response from MINA (The exchange out message). The default value has changed in FUSE Mediation Router 1.5 to true. In older releases the default value is false.
lazySessionCreation See description As of 1.3 or later session can be lazy created to avoid exceptions if the remote server is not up and running when the Camel producer is started. Is default false in FUSE Mediation Router 1.x. Is default true in FUSE Mediation Router 2.0 onwards.
timeout 30000 As of 1.3 or later you can configure the timeout while waiting for a response from a remote server. The timeout unit is in millis, so 60000 is 60 seconds. The timeout is only used for MinaProducer.
encoding JVM Default As of 1.3 or later you can configure the encoding (is a charset name) to use for the TCP textline codec and the UDP protocol. If not provided Camel will use the JVM default Charset.
transferExchange false Only used for TCP. As of 1.3 or later you can transfer the exchange over the wire instead of just the body. The following fields is transferred: in body, out body, fault body, in headers, out headers, fault headers, exchange properties, exchange exception. This requires that the objects are Serializable. Camel will exclude any non serializable objects and log it at WARN level.
minaLogger false As of 1.3 or later you can enable Apache MINA logging filter. Apache MINA uses slf4j logging at INFO level to log all input and output.
filters null As of 2.0 or later you can set a list of Mina IoFilters to register. The type must be List<org.apache.mina.common.IoFilter>.

In FUSE Mediation Router 2.0 the codec option must use # notation for lookup of the codec bean in the Registry. In FUSE Mediation Router 2.0 the lazySessionCreation option is now default true.

In FUSE Mediation Router 1.5 the sync option has changed its default value from false to true, as we felt it was confusing for end-users when they used Mina to call remote servers and FUSE Mediation Router wouldn't wait for the response.

In FUSE Mediation Router 1.4 or later codec=textline is no longer supported. Use the textline=true option instead.

See the Mina documentation how to write your own codec. To use your custom codec with camel-mina you should register your codec in the Registry such as the Spring XML file. Then use the codec option to set your codec with its bean id. See HL7 that has a custom codec.

In this sample we let FUSE Mediation Router expose a service that listen for TCP connections on port 6200. We use the textline codec. In out route we create the mina in the from to create the consumer that listen on port 6200:

from("mina:tcp://localhost:6200?textline=true&sync=false").to("mock:result");

As the sample is part of an unit test we test it by sending some data on port 6200 to it.

MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("Hello World");

template.sendBody("mina:tcp://localhost:6200?textline=true&sync=false", "Hello World");

assertMockEndpointsSatisfied();

In the next sample we have a more common use-case where we expose a TCP service on port 6201 also using the textline codec. However this time we want to return a response and indicate that we support this so we set the sync option to true on the consumer.

from("mina:tcp://localhost:6201?textline=true&sync=true").process(new Processor() {
    public void process(Exchange exchange) throws Exception {
        String body = exchange.getIn().getBody(String.class);
        exchange.getOut().setBody("Bye " + body);
    }
});

Then we test it by sending some data and retrieving the response using the template.requestBody() method. As we know the response is a String we cast it to String and can assert that the response is in fact something we have dynamically set in our processor code logic.

String response = (String)template.requestBody("mina:tcp://localhost:6201?textline=true&sync=true", "World");
assertEquals("Bye World", response);

Spring DSL can of course also be used for Mina. In the simple sample below we expose a TCP server on port 5555:

   <route>
     <from uri="mina:tcp://localhost:5555?textline=true"/>
     <to uri="bean:myTCPOrderHandler"/>
  </route>

In the route above we expose a TCP server on port 5555 using the textline codec and we let a spring bean with the id myTCPOrderHandler handle the request and return a reply. For instance this can be done as:

    public String handleOrder(String payload) {
        ...
        return "Order: OK"
   }

Avaiable as of FUSE Mediation Router 2.0

Configuration of Mina endpoints is now possible using regular Spring bean style configuration in the Spring DSL.

However configuring Apache Mina itself is quite complex to setup the acceptor, connector as you can not use simple setters. To resolve this we will leverage the MinaComponent as a Spring factory bean to configure this for us. If you really need to configure this yourself there are setters on the MinaEndpoint to set these when needed.

The sample below shows the factory approach:

<!-- Creating mina endpoints is a bit complex so we reuse MinaComponnet
     as a factory bean to create our endpoint, this is the easiest to do -->
<bean id="myMinaFactory" class="org.apache.camel.component.mina.MinaComponent">
    <!-- we must provide a camel context so we refer to it by its id -->
    <constructor-arg index="0" ref="myCamel"/>
</bean>

<!-- This is our mina endpoint configured with spring, we will use the factory above
     to create it for us. The goal is to invoke the createEndpoint method with the
     mina configuration parameter we defined using the constructor-arg option -->
<bean id="myMinaEndpoint"
      factory-bean="myMinaFactory"
      factory-method="createEndpoint">
    <!-- and here we can pass it our configuration -->
    <constructor-arg index="0" ref="myMinaConfig"/>
</bean>

<!-- this is our mina configuration with plain properties -->
<bean id="myMinaConfig" class="org.apache.camel.component.mina.MinaConfiguration">
    <property name="protocol" value="tcp"/>
    <property name="host" value="localhost"/>
    <property name="port" value="1234"/>
    <property name="sync" value="false"/>
</bean>

And then we can refer to our endpoint directly in the route such as:

<route>
    <!-- here we route from or mina endpoint we have defined above -->
    <from ref="myMinaEndpoint"/>
    <to uri="mock:result"/>
</route>

Avaiable as of FUSE Mediation Router 1.6.1

When acting as a server you sometimes want to close the session when e.g. a client conversion is finished. To instruct FUSE Mediation Router to close the session you should set add a header with the key CamelMinaCloseSessionWhenComplete to a boolean true value.

For instance the example below will close the session after it have written the bye message back to the client:

        from("mina:tcp://localhost:8080?sync=true&textline=true").process(new Processor() {
            public void process(Exchange exchange) throws Exception {
                String body = exchange.getIn().getBody(String.class);
                exchange.getOut().setBody("Bye " + body);
                exchange.getOut().setHeader(MinaConsumer.HEADER_CLOSE_SESSION_WHEN_COMPLETE, true);
            }
        });

Avaiable as of FUSE Mediation Router 2.0

Filters permits you to use some Mina Filters, such as SslFilter. You can also implement some customized filters. Please note that codec and logger are also Mina IoFitler, and the filters you may define are appended at the end of the FilterChain, then after codec and logger.

For instance, the example below will send a keep-alive message after 10 seconds of inactivity:

public class KeepAliveFilter extends IoFilterAdapter {
    @Override
    public void sessionCreated(NextFilter nextFilter, IoSession session)
            throws Exception {
        session.setIdleTime(IdleStatus.BOTH_IDLE, 10);

        nextFilter.sessionCreated(session);
    }

    @Override
    public void sessionIdle(NextFilter nextFilter, IoSession session,
            IdleStatus status) throws Exception {
        session.write("NOOP"); // NOOP is a FTP command for keep alive
        nextFilter.sessionIdle(session, status);
    }
}

As Camel Mina may ues a request-reply scheme, the endpoint as a client would like to drop some message, such as greeting when the connection is established. For example, when you connect to an FTP server, you will get a 220 message with a greeting (220 Welcome to Pure-FTPd). If you don't drop the message, you request-reply scheme will be broken.

public class DropGreetingFilter extends IoFilterAdapter {
 
    @Override
    public void messageReceived(NextFilter nextFilter, IoSession session,
            Object message) throws Exception {
        if (message instanceof String) {
            String ftpMessage = (String) message;
            // "220" is given as greeting. "200 Zzz" is given as a response to "NOOP" (keep alive)
            if (ftpMessage.startsWith("220") || or ftpMessage.startsWith("200 Zzz")) {
                // Dropping greeting
                return;
            }
        }
        nextFilter.messageReceived(session, message);
    }
}

Then, you can configure your endpoint. Using Spring DSL:

<bean id="myMinaFactory" class="org.apache.camel.component.mina.MinaComponent">
    <constructor-arg index="0" ref="camelContext" />
</bean>
    
<bean id="myMinaEndpoint"
      factory-bean="myMinaFactory"
      factory-method="createEndpoint">
    <constructor-arg index="0" ref="myMinaConfig"/>
</bean>

<bean id="myMinaConfig" class="org.apache.camel.component.mina.MinaConfiguration">
    <property name="protocol" value="tcp" />
    <property name="host" value="localhost" />
    <property name="port" value="2121" />
    <property name="sync" value="true" />
    <property name="minaLogger" value="true" />
    <property name="filters" ref="listFilters"/>
</bean>

<bean id="listFilters" class="java.util.ArrayList" >
    <constructor-arg>
        <list value-type="org.apache.mina.common.IoFilter">
            <bean class="com.example.KeepAliveFilter"/>
            <bean class="com.example.DropGreetingFilter"/>
        </list>
    </constructor-arg>
</bean>