Chapter 4. Building Blocks

Building blocks are layered on top of channels. Most of them do not even need a channel, all they need is a class that implements interface Transport (channels do). This enables them to work on any type of group transport that obeys this interface. Building blocks can be used instead of channels whenever a higher-level interface is required. Whereas channels are simple socket-like constructs, building blocks may offer a far more sophisticated interface. In some cases, building blocks offer access to the underlying channel, so that -- if the building block at hand does not offer a certain functionality -- the channel can be accessed directly. Building blocks are located in the org.jgroups.blocks package. Only the ones that are relevant for application programmers are discussed below.

4.1. PullPushAdapter

Note that this building block has been deprecated and should not be used anymore !

This class is a converter (or adapter, as used in [Gamma:1995] between the pull-style of actively receiving messages from the channel and the push-style where clients register a callback which is invoked whenever a message has been received. Clients of a channel do not have to allocate a separate thread for message reception.

A PullPushAdapter is always created on top of a class that implements interface Transport (e.g. a channel). Clients interested in being called when a message is received can register with the PullPushAdapter using method setListener(). They have to implement interface MessageListener, whose receive() method will be called when a message arrives. When a client is interested in getting view, suspicion messages and blocks, then it must additionally register as a MembershipListener using method setMembershipListener(). Whenever a view, suspicion or block is received, the corresponding method will be called.

Upon creation, an instance of PullPushAdapter creates a thread which constantly calls the receive() method of the underlying Transport instance, blocking until a message is available. When a message is received, if there is a registered message listener, its receive() method will be called.

As this class does not implement interface Transport, but merely uses it for receiving messages, an underlying object has to be used to send messages (e.g. the channel on top of which an object of this class resides). This is shown in Figure 4.1, “Class PullPushAdapter”.

Figure 4.1. Class PullPushAdapter

As is shown, the thread constantly pulls messages from the channel and forwards them to the registered listeners. An application thus does not have to actively pull for messages, but the PullPushAdapter does this for it. Note however, that the application has to directly access the channel if it wants to send a message.

4.1.1. Example

This section shows sample code for using a PullPushAdapter. The example has been shortened for readability (error handling has been removed).

    public class PullPushTest implements MessageListener {
        Channel          channel;
        PullPushAdapter  adapter;
        byte[]           data="Hello world".getBytes();
        String           props; // fetch properties

        public void receive(Message msg) {
            System.out.println("Received msg: " + msg);
        }

        public void start() throws Exception {
            channel=new JChannel(props);
            channel.connect("PullPushTest");
            adapter=new PullPushAdapter(channel);
            adapter.setListener(this);

            for(int i=0; i < 10; i++) {
                System.out.println("Sending msg #" + i);
                channel.send(new Message(null, null, data));
                Thread.currentThread().sleep(1000);
            }
            adapter.stop();
            channel.close();
        }


        public static void main(String args[]) {
            try {
                new PullPushTest().start();
            }
            catch(Exception e) { /* error */ }
        }
    }
      

First a channel is created and connected to. Then an instance of PullPushAdapter is created with the channel as argument. The constructor of PullPushAdapter starts its own thread which continually reads on the channel. Then the MessageListener is set, which causes all messages received on the channel to be sent to receive(). Then a number of messages are sent via the channel to the entire group. As group messages are also received by the sender, the receive() method will be called every time a message is received. Finally the PullPushAdapter is stopped and the channel closed. Note that explicitly stopping the PullPushAdapter is not actually necessary, a closing the channel would cause the PullPushAdapter to terminate anyway.

Note that, compared to the pull-style example, push-style message reception is considerably easier (no separate thread management) and requires less code to program.

Note

The PullPushAdapter has been deprecated, and will be removed in 3.0. Use a Receiver implementation instead. The advantage of the Receiver-based (push) model is that we save 1 thread.

4.2. MessageDispatcher

Channels are simple patterns to asynchronously send a receive messages. However, a significant number of communication patterns in group communication require synchronous communication. For example, a sender would like to send a message to the group and wait for all responses. Or another application would like to send a message to the group and wait only until the majority of the receivers have sent a response, or until a timeout occurred.

MessageDispatcher offers a combination of the above pattern with other patterns. It provides synchronous (as well as asynchronous) message sending with request-response correlation, e.g. matching responses with the original request. It also offers push-style message reception (by internally using the PullPushAdapter).

An instance of MessageDispatcher is created with a channel as argument. It can now be used in both client and server role: a client sends requests and receives responses and a server receives requests and send responses. MessageDispatcher allows a application to be both at the same time. To be able to serve requests, the RequestHandler.handle() method has to be implemented:

    Object handle(Message msg);
    

The handle() method is called any time a request is received. It must return a return value (must be serializable, but can be null) or throw an exception. The return value will be returned to the sender (as a null response, see below). The exception will also be propagated to the requester.

The two methods to send requests are:

    public RspList castMessage(Vector dests, Message msg, int mode, long timeout);
    public Object sendMessage(Message msg, int mode, long timeout)
        throws TimeoutException;
    

The castMessage() method sends a message to all members defined in dests. If dests is null the message will be sent to all members of the current group. Note that a possible destination set in the message will be overridden. If a message is sent synchronously then the timeout argument defines the maximum amount of time in milliseconds to wait for the responses.

The mode parameter defines whether the message will be sent synchronously or asynchronously. The following values are valid (from org.jgroups.blocks.GroupRequest):

GET_FIRST

Returns the first response received.

GET_ALL

Waits for all responses (minus the ones from suspected members)

GET_MAJORITY

Waits for a majority of all responses (relative to the group size)

GET_ABS_MAJORITY

Waits for the majority (absolute, computed once)

GET_N

Wait for n responses (may block if n > group size)

GET_NONE

Wait for no responses, return immediately (non-blocking). This make the call asynchronous.

The sendMessage() method allows an application programmer to send a unicast message to a receiver and optionally receive the response. The destination of the message has to be non-null (valid address of a receiver). The mode argument is ignored (it is by default set to GroupRequest.GET_FIRST) unless it is set to GET_NONE in which case the request becomes asynchronous, ie. we will not wait for the response.

One advantage of using this building block is that failed members are removed from the set of expected responses. For example, when sending a message to 10 members and waiting for all responses, and 2 members crash before being able to send a response, the call will return with 8 valid responses and 2 marked as failed. The return value of castMessage() is a RspList which contains all responses (not all methods shown):

    public class RspList {
        public boolean isReceived(Address sender);
        public int     numSuspectedMembers();
        public Vector  getResults();
        public Vector  getSuspectedMembers();
        public boolean isSuspected(Address sender);
        public Object  get(Address sender);
        public int     size();
        public Object  elementAt(int i) throws ArrayIndexOutOfBoundsException;
    }
    

Method isReceived() checks whether a response from sender has already been received. Note that this is only true as long as no response has yet been received, and the member has not been marked as failed. numSuspectedMembers() returns the number of members that failed (e.g. crashed) during the wait for responses. getResults() returns a list of return values. get() returns the return value for a specific member.

4.2.1. Example

This section describes an example of how to use a MessageDispatcher.

    public class MessageDispatcherTest implements RequestHandler {
        Channel            channel;
        MessageDispatcher  disp;
        RspList            rsp_list;
        String             props; // to be set by application programmer

        public void start() throws Exception {
            channel=new JChannel(props);
            disp=new MessageDispatcher(channel, null, null, this);
            channel.connect("MessageDispatcherTestGroup");

            for(int i=0; i < 10; i++) {
                Util.sleep(100);
                System.out.println("Casting message #" + i);
                rsp_list=disp.castMessage(null,
                    new Message(null, null, new String("Number #" + i)),
                    GroupRequest.GET_ALL, 0);
                System.out.println("Responses:\n" +rsp_list);
            }
            channel.close();
            disp.stop();
        }

        public Object handle(Message msg) {
            System.out.println("handle(): " + msg);
            return new String("Success !");
        }

        public static void main(String[] args) {
            try {
                new MessageDispatcherTest().start();
            }
            catch(Exception e) {
                System.err.println(e);
            }
        }
    }
      

The example starts with the creation of a channel. Next, an instance of MessageDispatcher is created on top of the channel. Then the channel is connected. The MessageDispatcher will from now on send requests, receive matching responses (client role) and receive requests and send responses (server role).

We then send 10 messages to the group and wait for all responses. The timeout argument is 0, which causes the call to block until all responses have been received.

The handle() method simply prints out a message and returns a string.

Finally both the MessageDispatcher and channel are closed.

4.3. RpcDispatcher

This class is derived from MessageDispatcher. It allows a programmer to invoke remote methods in all (or single) group members and optionally wait for the return value(s). An application will typically create a channel and layer the RpcDispatcher building block on top of it, which allows it to dispatch remote methods (client role) and at the same time be called by other members (server role).

Compared to MessageDispatcher, no handle() method needs to be implemented. Instead the methods to be called can be placed directly in the class using regular method definitions (see example below). The invoke remote method calls (unicast and multicast) the following methods are used (not all methods shown):

    public RspList callRemoteMethods(Vector dests, String method_name, 
                                     int mode, long timeout);
    public RspList callRemoteMethods(Vector dests, String method_name, 
                                     Object arg1, int mode, long timeout);
    public Object callRemoteMethod(Address dest, String method_name,
                                   int mode, long timeout);
    public Object callRemoteMethod(Address dest, String method_name, 
                                   Object arg1, int mode, long timeout);
    

The family of callRemoteMethods() is invoked with a list of receiver addresses. If null, the method will be invoked in all group members (including the sender). Each call takes the name of the method to be invoked and the mode and timeout parameters, which are the same as for MessageDispatcher. Additionally, each method takes zero or more parameters: there are callRemoteMethods() methods with up to 3 arguments. As shown in the example above, the first 2 methods take zero and one parameters respectively.

The family of callRemoteMethod() methods takes almost the same parameters, except that there is only one destination address instead of a list. If the dest argument is null, the call will fail.

If a sender needs to use more than 3 arguments, it can use the generic versions of callRemoteMethod() and callRemoteMethods() which use a MethodCall[5] instance rather than explicit arguments.

Java's Reflection API is used to find the correct method in the receiver according to the method name and number and types of supplied arguments. There is a runtime exception if a method cannot be resolved.

(* Update: these methods are deprecated; must use MethodCall argument now *)

4.3.1. Example

The code below shows an example:

    public class RpcDispatcherTest {
        Channel            channel;
        RpcDispatcher      disp;
        RspList            rsp_list;
        String             props; // set by application

        public int print(int number) throws Exception {
            return number * 2;
        }

        public void start() throws Exception {
            channel=new JChannel(props);
            disp=new RpcDispatcher(channel, null, null, this);
            channel.connect("RpcDispatcherTestGroup");

            for(int i=0; i < 10; i++) {
                Util.sleep(100);
                rsp_list=disp.callRemoteMethods(null, "print",
                     new Integer(i), GroupRequest.GET_ALL, 0);
                System.out.println("Responses: " +rsp_list);
            }
            channel.close();
            disp.stop();
         }

        public static void main(String[] args) {
            try {
                new RpcDispatcherTest().start();
            }
            catch(Exception e) {
                System.err.println(e);
            }
        }
    }
     

Class RpcDispatcher defines method print() which will be called subsequently. The entry point start() method creates a channel and an RpcDispatcher which is layered on top. Method callRemoteMethods() then invokes the remote print() method in all group members (also in the caller). When all responses have been received, the call returns and the responses are printed.

As can be seen, the RpcDispatcher building block reduces the amount of code that needs to be written to implement RPC-based group communication applications by providing a higher abstraction level between the application and the primitive channels.

4.4. ReplicatedHashMap

This class was written as a demo of how state can be shared between nodes of a cluster. It has never been heavily tested and is therefore not meant to be used in production, and unsupported.

A ReplicatedHashMap uses a concurrent hashmap internally and allows to create several instances of hashmaps in different processes. All of these instances have exactly the same state at all times. When creating such an instance, a group name determines which group of replicated hashmaps will be joined. The new instance will then query the state from existing members and update itself before starting to service requests. If there are no existing members, it will simply start with an empty state.

Modifications such as put(), clear() or remove() will be propagated in orderly fashion to all replicas. Read-only requests such as get() will only be sent to the local copy.

Since both keys and values of a hashtable will be sent across the network, both of them have to be serializable. This allows for example to register remote RMI objects with any local instance of a hashtable, which can subsequently be looked up by another process which can then invoke remote methods (remote RMI objects are serializable). Thus, a distributed naming and registration service can be built in just a couple of lines.

A ReplicatedHashMap allows to register for notifications, e.g. when a new item is set, or an existing one removed. All registered listeners will notified when such an event occurs. Notification is always local; for example in the case of removing an element, first the element is removed in all replicas, which then notify their listener(s) of the removal (after the fact).

ReplicatedHashMap allow members in a group to share common state across process and machine boundaries.

4.5. NotificationBus

This class provides notification sending and handling capability. Also, it allows an application programmer to maintain a local cache which is replicated by all instances. NotificationBus also sits on top of a channel, however it creates its channel itself, so the application programmers do not have to provide their own channel. Notification consumers can subscribe to receive notifications by calling setConsumer() and implementing interface NotificationBus.Consumer:

    public interface Consumer {
        void          handleNotification(Serializable n);
        Serializable  getCache();
        void          memberJoined(Address mbr);
        void          memberLeft(Address mbr);
    }
    

Method handleNotification() is called whenever a notification is received from the channel. A notification is any object that is serializable. Method getCache() is called when someone wants to retrieve our state; the state can be returned as a serializable object. The memberJoined() and memberLeft() callbacks are invoked whenever a member joins or leaves (or crashes).

The most important methods of NotificationBus are:

    public class NotificationBus {
         public void setConsumer(Consumer c);
         public void start() throws Exception;
         public void stop();
         public void sendNotification(Serializable n);
         public Serializable getCacheFromCoordinator(long timeout, int max_tries);
         public Serializable getCacheFromMember(Address mbr, long timeout, int max_tries);
    } 
    

Method setConsumer() allows a consumer to register itself for notifications.

The start() and stop() methods start and stop the NotificationBus.

Method sendNotification() sends the serializable object given as argument to all members of the group, invoking their handleNotification() methods on reception.

Methods getCacheFromCoordinator() and getCacheFromMember() provide functionality to fetch the group state from the coordinator (first member in membership list) or any other member (if its address is known). They take as arguments a timeout and a maximum number of unsuccessful attempts until they return null. Typically one of these methods would be called just after creating a new NotificationBus to acquire the group state. Note that if these methods are used, then the consumers must implement Consumer.getCache(), otherwise the two methods above would always return null.



[5] See the Programmer's Guide and the Javadoc documentation for more information about this class.