This chapter explains the classes available in JGroups that will be used by applications to build reliable group communication applications. The focus is on creating and using channels.
Information in this document may not be up-to-date, but the nature of the classes in the JGroups toolkit described here is the same. For the most up-to-date information refer to the Javadoc-generated documentation in the doc/javadoc directory.
All of the classes discussed below reside in the org.jgroups package unless otherwise mentioned.
The org.jgroups.util.Util class contains a collection of useful functionality which cannot be assigned to any particular other package.
The first method takes an object as argument and serializes it into a byte buffer (the object has to be serializable or externalizable). The byte array is then returned. This method is often used to serialize objects into the byte buffer of a message. The second method returns a reconstructed object from a buffer. Both methods throw an exception if the object cannot be serialized or unseriali
Returns a strings containing a pretty-printed list of currently running threads.
These interfaces are used with some of the APIs presented below, therefore they are listed first.
Interface Transport looks as follows:
public interface Transport { public void send(Message msg) throws Exception; public Object receive(long timeout) throws Exception; }
It defines a very small subset of the functionality of a channel, essentially only the methods for sending and receiving messages. There are a number of classes that implement Transport , among others Channel . Many building blocks (see Chapter 4, Building Blocks ) require nothing else than a bare-bone facility to send and receive messages; therefore the Transport interface was created. It increases the genericness and portability of building blocks: being so simple, the Transport interface can easily be ported to a different toolkit, without requiring any modifications to building blocks.
Contrary to the pull-style of channels, some building blocks (e.g. PullPushAdapter ) provide an event-like push-style message delivery model. In this case, the entity to be notified of message reception needs to provide a callback to be invoked whenever a message has been received. The MessageListener interface below provides a method to do so:
public interface MessageListener { public void receive(Message msg); byte[] getState(); void setState(byte[] state); }
Method receive() will be called when a message is received. The getState() and setState() methods are used to fetch and set the group state (e.g. when joining). Refer to Section 3.7.11, “Getting the group's state” for a discussion of state transfer.
JGroups release 2.3 introduces ExtendedMessageListener enabling partial state transfer (refer to Section 3.7.13, “Partial state transfer” ) while release 2.4 further expands ExtendedMessageListener with streaming state transfer callbacks:
public interface ExtendedMessageListener extends MessageListener { byte[] getState(String state_id); void setState(String state_id, byte[] state); /*** since JGroups 2.4 *****/ void getState(OutputStream ostream); void getState(String state_id, OutputStream ostream); void setState(InputStream istream); void setState(String state_id, InputStream istream); }
Method receive() will be called when a message is received. The getState() and setState() methods are used to fetch and set the group state (e.g. when joining). Refer to Section 3.7.11, “Getting the group's state” for a discussion of state transfer.
The MembershipListener interface is similar to the MessageListener interface above: every time a new view, a suspicion message, or a block event is received, the corresponding method of the class implementing MembershipListener will be called.
public interface MembershipListener { public void viewAccepted(View new_view); public void suspect(Object suspected_mbr); public void block(); }
Oftentimes the only method containing any functionality will be viewAccepted() which notifies the receiver that a new member has joined the group or that an existing member has left or crashed. The suspect() callback is invoked by JGroups whenever a member if suspected of having crashed, but not yet excluded [1].
The block() method is called to notify the member that it will soon be blocked sending messages. This is done by the FLUSH protocol, for example to ensure that nobody is sending messages while a state transfer is in progress. When block() returns, any thread sending messages will be blocked, until FLUSH unblocks the thread again, e.g. after the state has been transferred successfully.
Therefore, block() can be used to send pending messages or complete some other work. However, sending of messages should be done on a different thread, e.g. the current thread blocks on a mutex, starts a different thread which notifies the mutex once the work has been done.
Note that block() should take a small amount of time to complete, otherwise the entire FLUSH protocol is blocked.
The ExtendedMembershipListener interface extends MembershipListener:
public interface ExtendedMembershipListener extends MembershipListener { public void unblock(); }
The unblock() method is called to notify the member that the flush protocol has completed and the member can resume sending messages. If the member did not stop sending messages on block(), FLUSH simply blocked them and will resume, so no action is required from a member. Implementation of the unblock() callback is optional.
public interface ChannelListener { void channelConnected(Channel channel); void channelDisconnected(Channel channel); void channelClosed(Channel channel); void channelShunned(); void channelReconnected(Address addr); }
A class implementing ChannelListener can use the Channel.setChannelListener() method to register with a channel to obtain information about state changes in a channel. Whenever a channel is closed, disconnected or opened a callback will be invoked.
public interface Receiver extends MessageListener, MembershipListener { }
A Receiver can be used to receive all relevant messages and view changes in push-style; rather than having to pull these events from a channel, they will be dispatched to the receiver as soon as they have been received. This saves one thread (application thread, pulling messages from a channel, or the PullPushAdapter thread
public interface ExtendedReceiver extends ExtendedMessageListener, MembershipListener { }
This is a receiver who will be able to handle partial state transfer
The Extended- interfaces (ExtendedMessageListener, ExtendedReceiver) will be merged with their parents in the 3.0 release of JGroups. The reason is that this will create an API backwards incompatibility, which we didn't want to introduce in the 2.x series.
Each member of a group has an address, which uniquely identifies the member. The interface for such an address is Address, which requires concrete implementations to provide methods for comparison and sorting of addresses, and for determination whether the address is a multicast address. JGroups addresses have to implement the following interface:
public interface Address extends Externalizable, Comparable, Cloneable { boolean isMulticastAddress(); int compareTo(Object o) throws ClassCastException; boolean equals(Object obj); int hashCode(); String toString(); }
Actual implementations of addresses are often generated by the bottommost protocol layer (e.g. UDP or TCP). This allows for all possible sorts of addresses to be used with JGroups, e.g. ATM.
In JChannel, it is the IP address of the host on which the stack is running and the port on which the stack is receiving incoming messages; it is represented by the concrete class org.jgroups.stack.IpAddress . Instances of this class are only used within the JChannel protocol stack; users of a channel see addresses (of any kind) only as Addresses . Since an address uniquely identifies a channel, and therefore a group member, it can be used to send messages to that group member, e.g. in Messages (see next section).
Data is sent between members in the form of messages ( Message ). A message can be sent by a member to a single member , or to all members of the group of which the channel is an endpoint. The structure of a message is shown in Figure 3.1, “Structure of a message” .
A message contains 5 fields:
The address of the receiver. If null , the message will be sent to all current group members
The address of the sender. Can be left null , and will be filled in by the transport protocol (e.g. UDP) before the message is put on the network
This is one byte used for flags. The currently recognized flags are OOB, LOW_PRIO and HIGH_PRIO. See the discussion on the concurrent stack for OOB.
The actual data (as a byte buffer). The Message class contains convenience methods to set a serializable object and to retrieve it again, using serialization to convert the object to/from a byte buffer.
A list of headers that can be attached to a message. Anything that should not be in the payload can be attached to a message as a header. Methods putHeader() , getHeader() and removeHeader() of Message can be used to manipulate headers.
A message is similar to an IP packet and consists of the payload (a byte buffer) and the addresses of the sender and receiver (as Addresses). Any message put on the network can be routed to its destination (receiver address), and replies can be returned to the sender's address.
A message usually does not need to fill in the sender's address when sending a message; this is done automatically by the protocol stack before a message is put on the network. However, there may be cases, when the sender of a message wants to give an address different from its own, so that for example, a response should be returned to some other member.
The destination address (receiver) can be an Address, denoting the address of a member, determined e.g. from a message received previously, or it can be null , which means that the message will be sent to all members of the group. A typical multicast message, sending string "Hello" to all members would look like this:
Message msg=new Message(null, null, "Hello".getBytes()); channel.send(msg);
A View ( View ) is a list of the current members of a group. It consists of a ViewId , which uniquely identifies the view (see below), and a list of members. Views are set in a channel automatically by the underlying protocol stack whenever a new member joins or an existing one leaves (or crashes). All members of a group see the same sequence of views.
Note that there is a comparison function which orders all the members of a group in the same way. Usually, the first member of the list is the coordinator (the one who emits new views). Thus, whenever the membership changes, every member can determine the coordinator easily and without having to contact other members.
The code below shows how to send a (unicast) message to the first member of a view (error checking code omitted):
View myview=channel.getView(); Address first=myview.getMembers().first(); Message msg=new Message(first, null, "Hello world"); channel.send(msg);
Whenever an application is notified that a new view has been installed (e.g. by MembershipListener.viewAccepted() or Channel.receive() ), the view is already set in the channel. For example, calling Channel.getView() in a viewAccepted() callback would return the same view (or possibly the next one in case there has already been a new view !).
The ViewId is used to uniquely number views. It consists of the address of the view creator and a sequence number. ViewIds can be compared for equality and put in a hashtable as they implement equals() and hashCode() methods.
Whenever a group splits into subgroups, e.g. due to a network partition, and later the subgroups merge back together, a MergeView instead of a View will be received by the application. The MergeView class is a subclass of View and contains as additional instance variable the list of views that were merged. As an example if the group denoted by view V1:(p,q,r,s,t) split into subgroups V2:(p,q,r) and V2:(s,t) , the merged view might be V3:(p,q,r,s,t) . In this case the MergeView would contains a list of 2 views: V2:(p,q,r) and V2:(s,t) .
This class can be used for keeping rack of members instead of a Vector class. It adds several functions, such as duplicate elimination, merging with other Membership instances and sorting.
In order to join a group and send messages, a process has to create a channel. A channel is like a socket. When a client connects to a channel, it gives the the name of the group it would like to join. Thus, a channel is (in its connected state) always associated with a particular group. The protocol stack takes care that channels with the same group name find each other: whenever a client connects to a channel given group name G, then it tries to find existing channels with the same name, and joins them, resulting in a new view being installed (which contains the new member). If no members exist, a new group will be created.
A state transition diagram for the major states a channel can assume are shown in Figure 3.2, “Channel states” .
When a channel is first created, it is in the unconnected state. An attempt to perform certain operations which are only valid in the connected state (e.g. send/receive messages) will result in an exception. After a successful connection by a client, it moves to the connected state. Now channels will receive messages, views and suspicions from other members and may send messages to other members or to the group. Getting the local address of a channel is guaranteed to be a valid operation in this state (see below). When the channel is disconnected, it moves back to the unconnected state. Both a connected and unconnected channel may be closed, which makes the channel unusable for further operations. Any attempt to do so will result in an exception. When a channel is closed directly from a connected state, it will first be disconnected, and then closed.
The methods available for creating and manipulating channels are discussed now.
A channel can be created in two ways: an instance of a subclass of Channel is created directly using its public constructor (e.g. new JChannel() ), or a channel factory is created, which -- upon request -- creates instances of channels. We will only look at the first method of creating channel: by direct instantiation. Note that instantiation may differ between the various channel implementations. As example we will look at JChannel .
The public constructor of JChannel looks as follows:
public JChannel(Object properties) throws ChannelException {}
It creates an instance of JChannel . The properties argument defines the composition of the protocol stack (number and type of layers, parameters for each layer, and their order). For JChannel, this has to be a String. An example of a channel creation is:
String props="UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=32):" + "PING(timeout=3000;num_initial_members=6):" + "FD(timeout=5000):" + "VERIFY_SUSPECT(timeout=1500):" + "pbcast.STABLE(desired_avg_gossip=10000):" + "pbcast.NAKACK(gc_lag=10;retransmit_timeout=3000):" + "UNICAST(timeout=5000;min_wait_time=2000):" + "FRAG:" + "pbcast.GMS(initial_mbrs_timeout=4000;join_timeout=5000;" + "shun=false;print_local_addr=false)"; JChannel channel; try { channel=new JChannel(props); } catch(Exception ex) { // channel creation failed }
The argument is a colon-delimited string of protocols, specified from bottom to top (left to right). The example properties argument will be used to create a protocol stack that uses IP Multicast (UDP) as bottom protocol, the PING protocol to locate the initial members, FD for failure detection, VERIFY_SUSPECT for double-checking of suspected members, STABLE for garbage collection of messages received by all members, NAKACK for lossless delivery of multicast messages, UNICAST for lossless delivery of unicast messages and GMS for group membership (handling of join or leave requests).
If the properties argument is null, the default properties will be used. An exception will be thrown if the channel cannot be created. Possible causes include protocols that were specified in the property argument, but were not found, or wrong parameters to protocols.
In version 2.0 of JGroups an XML-based scheme to define protocol stacks was introduced. Instead of specifying a string containing the protocol spec, an URL pointing to a valid protocol stack definition can be given. For example, the Draw demo can be launched as follows:
java org.javagroups.demos.Draw -props file:/home/bela/vsync.xml
or
java org.javagroups.demos.Draw -props http://www.jgroups.org/udp.xml
In the latter case, an application downloads its protocol stack specification from a server, which allows for central administration of application properties.
A sample XML configuration looks like this (edited from udp.xml):
<config> <UDP mcast_addr="${jgroups.udp.mcast_addr:228.10.10.10}" mcast_port="${jgroups.udp.mcast_port:45588}" discard_incompatible_packets="true" max_bundle_size="60000" max_bundle_timeout="30" ip_ttl="${jgroups.udp.ip_ttl:2}" enable_bundling="true" use_concurrent_stack="true" thread_pool.enabled="true" thread_pool.min_threads="1" thread_pool.max_threads="25" thread_pool.keep_alive_time="5000" thread_pool.queue_enabled="false" thread_pool.queue_max_size="100" thread_pool.rejection_policy="Run" oob_thread_pool.enabled="true" oob_thread_pool.min_threads="1" oob_thread_pool.max_threads="8" oob_thread_pool.keep_alive_time="5000" oob_thread_pool.queue_enabled="false" oob_thread_pool.queue_max_size="100" oob_thread_pool.rejection_policy="Run"/> <PING timeout="2000" num_initial_members="3"/> <MERGE2 max_interval="30000" min_interval="10000"/> <FD_SOCK/> <FD timeout="10000" max_tries="5" shun="true"/> <VERIFY_SUSPECT timeout="1500" /> <BARRIER /> <pbcast.NAKACK use_mcast_xmit="false" gc_lag="0" retransmit_timeout="300,600,1200,2400,4800" discard_delivered_msgs="true"/> <UNICAST timeout="300,600,1200,2400,3600"/> <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000" max_bytes="400000"/> <VIEW_SYNC avg_send_interval="60000" /> <pbcast.GMS print_local_addr="true" join_timeout="3000" shun="false" view_bundling="true"/> <FC max_credits="20000000" min_threshold="0.10"/> <FRAG2 frag_size="60000" /> <pbcast.STATE_TRANSFER /> </config>
A stack is wrapped by <config> and </config> elements and lists all protocols from bottom (UDP) to top (STATE_TRANSFER). Each element defines one protocol.
Each protocol is implemented as a Java class. When a protocol stack is created based on the above XML configuration, the first element ("UDP") becomes the bottom-most layer, the second one will be placed on the first, etc: the stack is created from the bottom to the top.
Each element has to be the name of a Java class that resides in the org.jgroups.stack.protocols package. Note that only the base name has to be given, not the fully specified class name ( UDP instead of org.jgroups.stack.protocols.UDP ). If the protocol class is not found, JGroups assumes that the name given is a fully qualified classname and will therefore try to instantiate that class. If this does not work an exception is thrown. This allows for protocol classes to reside in different packages altogether, e.g. a valid protocol name could be com.sun.eng.protocols.reliable.UCAST .
Each layer may have zero or more arguments, which are specified as a list of name/value pairs in parentheses directly after the protocol name. In the example above, UDP is configured with some options, one of them being the IP multicast address (mcast_addr) which is set to 228.10.10.10, or to the value of the system property jgroups.udp.mcast_addr, if set.
Note that all members in a group have to have the same protocol stack.
A number of options can be set in a channel. To do so, the following method is used:
public void setOpt(int option, Object value);
Arguments are the options number and a value. The following options are currently recognized:
The argument is a boolean object. If true, block messages will be received. If this option is set to true, views will also be set to true. Default is false.
Local delivery. The argument is a boolean value. If set to true, a member will receive all messages it sent to itself. Otherwise, all messages sent by itself will be discarded. This option allows to send messages to the group, without receiving a copy. Default is true (members will receive their own copy of messages multicast to the group).
When set to true, a shunned channel will leave the group and then try to automatically re-join. Default is false
When set to true a shunned channel, after reconnection, will attempt to fetch the state from the coordinator. This requires AUTO_RECONNECT to be true as well. Default is false.
The equivalent method to get options is getOpt() :
public Object getOpt(int option);
Given an option, the current value of the option is returned.
When a client wants to join a group, it connects to a channel giving the name of the group to be joined:
public void connect(String clustername) throws ChannelClosed;
The cluster name is a string, naming the cluster to be joined. All channels that are connected to the same name form a cluster. Messages multicast on any channel in the cluster will be received by all members (including the one who sent it [2] ).
The method returns as soon as the group has been joined successfully. If the channel is in the closed state (see Figure 3.2, “Channel states” ), an exception will be thrown. If there are no other members, i.e. no other client has connected to a group with this name, then a new group is created and the member joined. The first member of a group becomes its coordinator . A coordinator is in charge of multicasting new views whenever the membership changes [3] .
Clients can also join a cluster group and fetch cluster state in one operation. The best way to conceptualize connect and fetch state connect method is to think of it as an invocation of regular connect and getstate methods executed in succession. However, there are several advantages of using connect and fetch state connect method over regular connect. First of all, underlying message exchange is heavily optimized, especially if the flush protocol is used in the stack. But more importantly, from clients perspective, connect and join operations become one atomic operation.
public void connect(string cluster_name, address target, string state_id, long timeout) throws channelexception;
Just as in regular connect method cluster name represents a cluster to be joined. Address parameter indicates a cluster member to fetch state from. Null address parameter indicates that state should be fetched from the cluster coordinator. If state should be fetched from a particular member other than coordinator clients can provide an address of that member. State id used for partial state transfer while timeout bounds entire join and fetch operation.
Method getLocalAddress() returns the local address of the channel. In the case of JChannel , the local address is generated by the bottom-most layer of the protocol stack when the stack is connected to. That means that -- depending on the channel implementation -- the local address may or may not be available when a channel is in the unconnected state.
public Address getLocalAddress();
Method getClusterlName() returns the name of the cluster in which the channel is a member:
public String getClusterName();
Again, the result is undefined if the channel is in the unconnected or closed state.
The following method can be used to get the current view of a channel:
public View getView();
This method does not retrieve a new view (message) from the channel, but only returns the current view of the channel. The current view is updated every time a view message is received: when method receive() is called, and the return value is a view, before the view is returned, it will be installed in the channel, i.e. it will become the current view.
Calling this method on an unconnected or closed channel is implementation defined. A channel may return null, or it may return the last view it knew of.
Once the channel is connected, messages can be sent using the send() methods:
public void send(Message msg) throws ChannelNotConnected, ChannelClosed; public void send(Address dst, Address src, Object obj) throws ChannelNotConnected, ChannelClosed;
The first send() method has only one argument, which is the message to be sent. The message's destination should either be the address of the receiver (unicast) or null (multicast). When it is null, the message will be sent to all members of the group (including itself). The source address may be null; if it is, it will be set to the channel's address (so that recipients may generate a response and send it back to the sender).
The second send() method is a helper method and uses the former method internally. It requires the address of receiver and sender and an object (which has to be serializable), constructs a Message and sends it.
If the channel is not connected, or was closed, an exception will be thrown upon attempting to send a message.
Here's an example of sending a (multicast) message to all members of a group:
Hashtable data; // any serializable data try { channel.send(null, null, data); } catch(Exception ex) { // handle errors }
The null value as destination address means that the message will be sent to all members in the group. The sender's address will be filled in by the bottom-most protocol. The payload is a hashtable, which will be serialized into the message's buffer and unserialized at the receiver's end. Alternatively, any other means of generating a byte buffer and setting the message's buffer to it (e.g. using Message.setBuffer()) would also work.
Here's an example of sending a (unicast) message to the first member (coordinator) of a group:
Address receiver; Message msg; Hashtable data; try { receiver=channel.getView().getMembers().first(); channel.send(receiver, null, data); } catch(Exception ex) { // handle errors }
It creates a Message with a specific address for the receiver (the first member of the group). Again, the sender's address can be left null as it will be filled in by the bottom-most protocol.
Method receive() is used to receive messages, views, suspicions and blocks:
public Object receive(long timeout) throws ChannelNotConnected, ChannelClosed, Timeout;
A channel receives messages asynchronously from the network and stores them in a queue. When receive() is called, the next available message from the top of that queue is removed and returned. When there are no messages on the queue, the method will block. If timeout is greater than 0, it will wait the specified number of milliseconds for a message to be received, and throw a TimeoutException exception if none was received during that time. If the timeout is 0 or negative, the method will wait indefinitely for the next available message.
Depending on the channel options (see Section 3.7.2, “Setting options” ), the following types of objects may be received:
A regular message. To send a response to the sender, a new message can be created. Its destination address would be the received message's source address. Method Message.makeReply() is a helper method to create a response.
A view change, signalling that a member has joined, left or crashed. The application may or may not perform some action upon receiving a view change (e.g. updating a GUI object of the membership, or redistributing a load-balanced collaborative task to all members). Note that a longer action, or any action that blocks should be performed in a separate thread. A MergeView will be received when 2 or more subgroups merged into one (see Section 3.5.2, “MergeView” for details). Here, a possible state merge by the application needs to be done in a separate thread.
Notification of a member that is suspected. Method SuspectEvent.getMember() retrieves the address of the suspected member. Usually this message will be followed by a view change.
The application has to stop sending messages. When the application has stopped sending messages, it needs to acknowledge this message with a Channel.blockOk() method.
The BlockEvent reception can be used to complete pending tasks, e.g. send pending messages, but once Channel.blockOk() has been called, all threads that send messages (calling Channel.send() or Channel.down()) will be blocked until FLUSH unblocks them.
The application can resume sending messages. Any previously messages blocked by FLUSH will be unblocked; when the UnblockEvent is received the channel has already been unblocked.
Received when the application's current state should be saved (for a later state transfer. A copy of the current state should be made (possibly wrapped in a synchronized statement and returned calling method Channel.returnState() . If state transfer events are not enabled on the channel (default), then this event will never be received. This message will only be received with the Virtual Synchrony suite of protocols (see the Programmer's Guide).
Received when the application's current state should be provided to a state requesting group member. If state transfer events are not enabled on the channel (default), or if channel is not configured with pbcast.STREAMING_STATE_TRANSFER then this event will never be received.
Received as response to a getState(s) method call. The argument contains the state of a single member ( byte[] ) or of all members ( Vector ). Since the state of a single member could also be a vector, the interpretation of the argument is left to the application.
Received at state requesting member when the state InputStream becomes ready for reading. If state transfer events are not enabled on the channel (default), or if channel is not configured with pbcast.STREAMING_STATE_TRANSFER then this event will never be received.
The caller has to check the type of the object returned. This can be done using the instanceof operator, as follows:
Object obj; Message msg; View v; obj=channel.receive(0); // wait forever if(obj instanceof Message) msg=(Message)obj; else if(obj instanceof View) v=(View)obj; else ; // don't handle suspicions or blocks
If for example views, suspicions and blocks are disabled, then the caller is guaranteed to only receive return values of type Message . In this case, the return value can be cast to a Message directly, without using the instanceof operator.
If the channel is not connected, or was closed, a corresponding exception will be thrown.
The example below shows how to retrieve the "Hello world" string from a message:
Message msg; // received above String s; try { s=(String)msg.getObject(); // error if object not Serializable // alternative: s=new String(msg.getBuffer()); } catch(Exception ex) { // handle errors, e.g. casting error above) }
The Message.getObject() method retrieves the message's byte buffer, converts it into a (serializable) object and returns the object.
Instead of pulling messages from a channel in an application thread, a Receiver can be registered with a channel; all received messages, view changes and state transfer requests will invoke callbacks on the registered Receiver:
JChannel ch=new JChannel(); ch.setReceiver(new ExtendedReceiverAdapter() { public void receive(Message msg) { System.out.println("received message " + msg); } public void viewAccepted(View new_view) { System.out.println("received view " + new_view); } }); ch.connect("bla");
The ExtendedReceiverAdapter class implements all callbacks of ExtendedReceiver with no-ops, in the example above we override receive() and viewAccepted().
The advantage of using a Receiver is that the application doesn't have to waste 1 thread for pulling messages out of a channel. In addition, the channel doesn't have to maintain an (unbounded) queue of messages/views, which can quickly get large if the receiver cannot process messages fast enough, and the sender keeps sending messages.
Instead of removing the next available message from the channel, peek() just returns a reference to the next message, but does not remove it. This is useful when one has to check the type of the next message, e.g. whether it is a regular message, or a view change. The signature of this method is not shown here, it is the same as for receive() .
A newly joined member may wish to retrieve the state of the group before starting work. This is done with getState(). This method returns the state of one member (in most cases, of the oldest member, the coordinator). It returns true or false, depending on whether a valid state could be retrieved. For example, if a member is a singleton, then calling this method would always return false [4] .
The actual state is returned as the return value of one of the subsequent receive() calls, in the form of a SetStateEvent object. If getState() returned true, then a valid state (non-null) will be returned, otherwise a null state will be returned. Alternatively if an application uses MembershipListener (see Section 3.2.4, “MembershipListener” ) instead of pulling messages from a channel, the getState() method will be invoked and a copy of the current state should be returned. By the same token, setting a state would be accomplished by JGroups calling the setState() method of the state fetcher.
The reason for not directly returning the state as a result of getState() is that the state has to be returned in the correct position relative to other messages. Returning it directly would violate the FIFO properties of a channel, and state transfer would not be correct.
The following code fragment shows how a group member participates in state transfers:
channel=new JChannel(); channel.connect("TestChannel"); boolean rc=channel.getState(null, 5000); ... Object state, copy; Object ret=channel.receive(0); if(ret instanceof Message) ; else if(ret instanceof GetStateEvent) { copy=copyState(state); // make a copy so that other msgs don't change the state channel.returnState(Util.objectToByteBuffer(copy)); } else if(ret instanceof SetStateEvent) { SetStateEvent e=(SetStateEvent)ret; state=e.getArg(); }
A JChannel has to be created whose stack includes the STATE_TRANSFER or pbcast.STATE_TRANSFER protocols (see Chapter 5, Advanced Concepts ). Method getState() subsequently asks the channel to return the current state. If there is a current state (there may not be any other members in the group !), then true is returned. In this case, one of the subsequent receive() method invocations on the channel will return a SetStateEvent object which contains the current state. In this case, the caller sets its state to the one received from the channel.
Method receive() might return a GetStateEvent object, requesting the state of the member to be returned. In this case, a copy of the current state should be made and returned using JChannel.returnState() . It is important to a) synchronize access to the state when returning it since other accesses may modify it while it is being returned and b) make a copy of the state since other accesses after returning the state may still be able to modify it ! This is possible because the state is not immediately returned, but travels down the stack (in the same address space), and a reference to it could still alter it.
As an alternative to handling the GetStateEvent and SetStateEvent events, and calling Channel.returnState(), a Receiver could be used. The example above would look like this:
class MyReceiver extends ReceiverAdapter { final Map m=new HashMap(); public byte[] getState() { synchronized(m) { // so nobody else can modify the map while we serialize it byte[] state=Util.objectToByteBuffer(m); return state; } } public void setState(byte[] state) { synchronized(m) { Map new_m=(Map)Util.objectFromByteBuffer(state); m.clear(); m.addAll(new_m); } } } channel=new JChannel(); // use default properties (has to include pbcast.STATE_TRANSFER protocol) channel.setReceiver(new MyReceiver()); channel.connect("TestChannel"); boolean rc=channel.getState(null, 5000);
In a group consisting of A,B and C, with D joining the group and calling Channel.getState(), the following sequence of callbacks happens:
Partial state transfer means that instead of transferring the entire state, we may want to transfer only a substate. For example, with HTTP session replication, a new node in a cluster may want to transfer only the state of a specific session, not all HTTP sessions. This can be done with either the pull or push model. The method to call would be Channel.getState(), including the ID of the substate (a string). In the pull model, GetStateEvent and SetStateEvent have an additional member, state_id, and in the push model, there are 2 additional getState() and setState() callbacks. The example below shows partial state transfer for the push model:
class MyReceiver extends ExtendedReceiverAdapter { final Map m=new HashMap(); public byte[] getState() { return getState(null); } public byte[] getState(String substate_id) { synchronized(m) { // so nobody else can modify the map while we serialize it byte[] state=null; if(substate_id == null) { state=Util.objectToByteBuffer(m); } else { Object value=m.get(substate_id); if(value != null) { return Util.objectToByteBuffer(value); } } return state; } } public void setState(byte[] state) { setState(null, state); } public void setState(String substate_id, byte[] state) { synchronized(m) { if(substate_id != null) { Object value=Util.objectFromByteBuffer(state); m.put(substate_id, value); } else { Map new_m=(Map)Util.objectFromByteBuffer(state); m.clear(); m.addAll(new_m); } } } } channel=new JChannel(); // use default properties (has to include pbcast.STATE_TRANSFER protocol) channel.setReceiver(new MyReceiver()); channel.connect("TestChannel"); boolean rc=channel.getState(null, "MyID", 5000);
The example shows that the Channel.getState() method specifies the ID of the substate, in this case "MyID". The getState(String substate_id) method checks whether the substate ID is not null, and returns the substate pertaining to the ID, or the entire state if the substate_id is null. The same goes for setting the substate: if setState(String substate_id, byte[] state) has a non-null substate_id, only that part of the current state will be overwritten, otherwise (if null) the entire state will be overwritten.
Streaming state transfer allows transfer of application (partial) state without having to load entire state into memory prior to sending it to a joining member. Streaming state transfer is especially useful if the state is very large (>1Gb), and use of regular state transfer would likely result in OutOfMemoryException. Streaming state transfer was introduced in JGroups 2.4. JGroups channel has to be configured with either regular or streaming state transfer. The JChannel API that invokes state transfer (i.e. JChannel.getState(long timeout, Address member)) remains the same.
Streaming state transfer, just as regular byte based state transfer, can be used in both pull and push mode. Similarly to the current getState and setState methods of org.jgroups.MessageListener, the application interested in streaming state transfer in a push mode would implement streaming getState method(s) by sending/writing state through a provided OutputStream reference and setState method(s) by receiving/reading state through a provided InputStream reference. In order to use streaming state transfer in a push mode, existing ExtendedMessageListener has been expanded to include additional four methods:
public interface ExtendedMessageListener { /*non-streaming callback methods ommitted for clarity*/ void getState(OutputStream ostream); void getState(String state_id, OutputStream ostream); void setState(InputStream istream); void setState(String state_id, InputStream istream); }
For a pull mode (when application uses channel.receive() to fetch events) two new event classes will be introduced:
StreamingGetStateEvent
StreamingSetStateEvent
These two events/classes are very similar to existing GetStateEvent and SetStateEvent but introduce a new field; StreamingGetStateEvent has an OutputStream and StreamingSetStateEvent has an InputStream.
The following code snippet demonstrates how to pull events from a channel, processing StreamingGetStateEvent and sending hypothetical state through a provided OutputStream reference. Handling of StreamingSetStateEvent is analogous to this example:
... Object obj=channel.receive(0); if(obj instanceof StreamingGetStateEvent) { StreamingGetStateEvent evt=(StreamingGetStateEvent)obj; OutputStream oos = null; try { oos = new ObjectOutputStream(evt.getArg()); oos.writeObject(state); oos.flush(); } catch (Exception e) {} finally { try { oos.close(); } catch (IOException e) { System.err.println(e); } } } ...
JGroups has a great flexibility with state transfer methodology by allowing application developers to implement both byte based and streaming based state transfers. Application can, for example, implement streaming and byte based state transfer callbacks and then interchange state transfer protocol in channel configuration to use either streaming or byte based state transfer. However, one cannot configure a channel with both state transfers at the same time and then in runtime choose which particular state transfer type to use.
Disconnecting from a channel is done using the following method:
public void disconnect();
It will have no effect if the channel is already in the disconnected or closed state. If connected, it will remove itself from the group membership. This is done (transparently for a channel user) by sending a leave request to the current coordinator. The latter will subsequently remove the channel's address from its local view and send the new view to all remaining members.
After a successful disconnect, the channel will be in the unconnected state, and may subsequently be re-connected to.
To destroy a channel instance (destroy the associated protocol stack, and release all resources), method close() is used:
public void close();
It moves the channel to the closed state, in which no further operations are allowed (most throw an exception when invoked on a closed channel). In this state, a channel instance is not considered used any longer by an application and -- when the reference to the instance is reset -- the channel essentially only lingers around until it is garbage collected by the Java runtime system.
[1] It could be that the member is suspected falsely, in which case the next view would still contain the suspected member (there is currently no unsuspect() method
[2] Local delivery can be turned on/off using setOpt() .
[3] This is managed internally however, and an application programmer does not need to be concerned about it.
[4] A member will never retrieve the state from itself !