Chapter 7. List of Protocols

This section is work in progress; we strive to update the documentation as we make changes to the code.

The most important properties are described on the wiki. The idea is that users take one of the predefined configurations (shipped with JGroups) and make only minor changes to it.

For each protocol define:

7.1. Transport

7.1.1. UDP

7.1.2. TCP

7.1.3. TCP_NIO

7.1.4. TUNNEL

7.1.5. JMS

7.1.6. LOOPBACK

7.2. Initial membership discovery

7.2.1. PING

7.2.2. TCPPING

7.2.3. TCPGOSSIP

7.2.4. MPING

7.3. Merging after a network partition

7.3.1. MERGE2, MERGE3, MERGEFAST

7.4. Failure Detection

The task of failure detection is to probe members of a group and see whether they are alive. When a member is suspected (= deemed dead), then a SUSPECT message is sent to all nodes of the cluster. It is not the task of the failure detection layer to exclude a crashed member (this is done by the group membership protocol, GMS), but simply to notify everyone that a node in the cluster is suspected of having crashed.

7.4.1. FD

Failure detection based on heartbeat messages. If reply is not received without timeout ms, max_tries times, a member is declared suspected, and will be excluded by GMS

Each member send a message containing a "FD" - HEARTBEAT header to its neighbor to the right (identified by the ping_dest address). The heartbeats are sent by the inner class Monitor. When the neighbor receives the HEARTBEAT, it replies with a message containing a "FD" - HEARTBEAT_ACK header. The first member watches for "FD" - HEARTBEAT_ACK replies from its neigbor. For each received reply, it resets the last_ack timestamp (sets it to current time) and num_tries counter (sets it to 0). The same Monitor instance that sends heartbeats whatches the difference between current time and last_ack. If this difference grows over timeout, the Monitor cycles several more times (until max_tries) is reached) and then sends a SUSPECT message for the neighbor's address. The SUSPECT message is sent down the stack, is addressed to all members, and is as a regular message with a FdHeader.SUSPECT header.

Table 7.1. Properties

NameDescription
timeoutMax number of ms to wait for a response, e.g timeout="2500"
max_triesMax number of missed responses until a member is declare suspected., e.g. max_tries="5"
shunOnce a member is excluded from the group, and then rejoins (e.g. because it didn't crash, but was just slow, or a router that had crashed came back), it will be excluded (shunned) and then has to rejoin. JGroups allows to configure itself such that shunning leads to automatic rejoins and state transfer (default in JBoss), e.g. shun="true".

Automatic rejoins can be enabled by setting channel option AUTO_RECONNECT to true:

                                    channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE).
                                    

Same for automatically fetching the state after automatic reconnection:

                                channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE).
                                    

7.4.2. FD_ALL

Failure detection based on simple heartbeat protocol. Every member periodically multicasts a heartbeat. Every member also maintains a table of all members (minus itself). When data or a heartbeat from P are received, we reset the timestamp for P to the current time. Periodically, we check for expired members, and suspect those.

Example: <FD_ALL interval="3000" timeout="10000"/>

In the exampe above, we send a heartbeat every 3 seconds and suspect members if we haven't received a heartbeat (or traffic) for more than 10 seconds. Note that since we check the timestamps every 'interval' milliseconds, we will suspect a member after roughly 4 * 3s == 12 seconds. If we set the timeout to 8500, then we would suspect a member after 3 * 3 secs == 9 seconds.

Table 7.2. Properties

NameDescription
timeout Max number of milliseconds until a member is suspected
interval Interval (in milliseconds) to multicast heartbeat messages to the cluster
msg_counts_as_heartbeat If this is true, we treat traffic from P as if P sent a heartbeat, ie. we set the timestamp for P to the current time. Default is true
shunOnce a member is excluded from the group, and then rejoins (e.g. because it didn't crash, but was just slow, or a router that had crashed came back), it will be excluded (shunned) and then has to rejoin. JGroups allows to configure itself such that shunning leads to automatic rejoins and state transfer (default in JBoss). , e.g. shun="true"

7.4.3. FD_SIMPLE

7.4.4. FD_PING

FD_PING uses a script or command that is run with 1 argument (the host to be pinged) and needs to return 0 (success) or 1 (failure). The default command is /sbin/ping (ping.exe on Windows), but this is user configurable and can be replaced with any user-provided script or executable.

Table 7.3. Properties

NameDescription
cmdThe command to be executed, e.g. "/sbin/ping" (or "ping" if found on path)
verboseWhether or not to show the output of the command. Valid: "true" or "false"

7.4.5. FD_ICMP

Uses InetAddress.isReachable() to determine whether a host is up or not. Note that this is only available in JDK 5, so reflection is used to determine whether InetAddress provides such a method. If not, an exception will be thrown at protocol initialization time.

The problem with InetAddress.isReachable() is that it may or may not use ICMP in its implementation ! For example, an implementation might try to establish a TCP connection to port 9 (echo service), and - if the echo service is not running - the host would be suspected, although a real ICMP packet would not have suspected the host ! Please check your JDK/OS combo before running this protocol.

Table 7.4. Properties

NameDescription
bind_addrThe network interface to be used for sending ICMP packets, e.g. bind_addr="192.16.8.0.2"

7.4.6. FD_SOCK

Failure detection protocol based on a ring of TCP sockets created between group members. Each member in a group connects to its neighbor (last member connects to first) thus forming a ring. Member B is suspected when its neighbor A detects abnormally closed TCP socket (presumably due to a node B crash). However, if a member B is about to leave gracefully, it lets its neighbor A know, so that it does not become suspected.

If you are using a multi NIC machine note that JGroups versions prior to 2.2.8 have FD_SOCK implementation that does not assume this possibility. Therefore JVM can possibly select NIC unreachable to its neighbor and setup FD_SOCK server socket on it. Neighbor would be unable to connect to that server socket thus resulting in immediate suspecting of a member. Suspected member is kicked out of the group, tries to rejoin, and thus goes into join/leave loop. JGroups version 2.2.8 introduces srv_sock_bind_addr property so you can specify network interface where FD_SOCK TCP server socket should be bound. This network interface is most likely the same interface used for other JGroups traffic. JGroups versions 2.2.9 and newer consult bind.address system property or you can specify network interface directly as FD_SOCK bind_addr property.

Table 7.5. Properties

NameDescription
bind_addrThe network interface to be used for sending ICMP packets, e.g. bind_addr="192.16.8.0.2"

7.4.7. VERIFY_SUSPECT

7.5. Reliable message transmission

7.5.1. pbcast.NAKACK

NAKACK provides reliable delivery and FIFO (= First In First Out) properties for messages sent to all nodes in a cluster.

Reliable delivery means that no message sent by a sender will ever be lost, as all messages are numbered with sequence numbers (by sender) and retransmission requests are sent to the sender of a message[12] if that sequence number is not received.

FIFO order means that all messages from a given sender are received in exactly the order in which they were sent.

Table 7.6. Properties

NameDescription
retransmit_timeout A comma-separated list of milliseconds, e.g. 100,200,400,800,1600. We ask for retransmission of a given message 100ms after it wasn't received, then 200ms, and so on, until we're at 1600ms. From then on, we will send retransmit requests for that message every 100ms, until the message is received, or the original sender crashed.

Example: retransmit_timeout="300,600,1200,2400"

exponential_backoff Value in milliseconds. If greater than 0, exponential backoff for retransmission is enabled. The value is then the initial value, and we'll double it every time we ask for retransmission, until a max value of 15000ms.

If the value is greater than 0, exponential backoff is enabled and retransmit_timeout will be ignored.

Note that this property is experimental, and may be removed at any time.

Example: exponential_backoff="30"

use_stats_for_retransmission Boolean. If true, we ignore both retransmit_timeout and exponential_backoff, and use statistics that we collect during retransmission to compute the ideal retransmission times, based on actual retransmission times.

Note that this property is experimental, and may be removed at any time.

Example: use_stats_for_retransmission="true"

use_mcast_xmit When we get a retransmission request from P for a message M, then we send the retransmitted M to P. However, assuming that many nodes lost M, we might as well send M to the entire cluster, so that we can satisfy many retransmit requests at the same time.

Setting this option to true only makes sense for an IP multicast capable transport (e.g. UDP), where we send the retransmitted message one time. Otherwise, e.g. if we use TCP, we send the message N-1 times, one time for each node.

Example: use_mcast_xmit="true"

use_mcast_xmit_req Similar to use_mcast_xmit, but for requests: if enabled, we'll send a retransmit reuest via a multicast.

Note

Setting this option to true only makes sense for an IP multicast capable transport (e.g. UDP), where we send the retransmit requests one time. Otherwise, e.g. if we use TCP, we send the message N-1 times, one time for each node !

Example: use_mcast_xmit_req="true"

xmit_from_random_member Instead of sending all retransmit requests to the original sender of a message, we could also pick a random member of the cluster. Setting xmit_from_random_member="true" does this. The advantage is that the retransmission load is distributed more equally across the cluster. The down side is that a random member may or may not yet have received a given message, so it may not be able to satisfy the retransmission request. In this case, it may take a few retransmission requests to different members in the cluster to finally get the requested message.

Note that discard_delivered_msgs must be set to false if this option is enabled.

Example: xmit_from_random_member="true"

discard_delivered_msgs If set to true, messages received from other members are not buffered until stability purges them, but instead discarded immediately. This means that retransmission requests can only be satisfied by the original sender (and xmit_from_random_member won't work). However, since we don't have to wait for stability to kick in and purge messages seen by everyone, we conserve memory.

Example: discard_delivered_msgs="true"

max_xmit_buf_size Number of entries to keep in the retransmission tables. Since messages are buffered in the retransmission tables, we have to wait for a given message M to be delivered until all messages preceeding M have been delivered as well. Using max_xmit_buf_size makes the retransmission tables bounded, and older messages will get discarded if they haven't been received when max_xmit_buf_size elements are exceeded.

Note that setting this property to true will cause message loss in the case where a message is lost and not retransmitted because the retransmission buffer is full !

eager_lock_release By default, we release the lock on the sender in up() after the up() method call passed up the stack returns. However, with eager_lock_release enabled (default), we release the lock as soon as the application calls Channel.down() within the receive() callback. This leads to issues as the one described in http://jira.jboss.com/jira/browse/JGRP-656.

Note that ordering is still correct , but messages from self might get delivered concurrently. This can be turned off by setting eager_lock_release to false.

7.5.2. UNICAST

UNICAST provides reliable delivery and FIFO (= First In First Out) properties for point-to-point messages between one sender and one receiver.

Reliable delivery means that no message sent by a sender will ever be lost, as all messages are numbered with sequence numbers (by sender) and retransmission requests are sent to the sender of a message[13] if that sequence number is not received.

FIFO order means that all messages from a given sender are received in exactly the order in which they were sent.

On top of a reliable transport, such as TCP, UNICAST is not really needed. However, concurrent delivery of messages from the same sender is prevented by UNICAST by acquiring a lock on the sender's retransmission table, so unless concurrent delivery is desired, UNICAST should not be removed from the stack even if TCP is used.

Table 7.7. Properties

NameDescription
retransmit_timeout A comma-separated list of milliseconds, e.g. 100,200,400,800,1600. We ask for retransmission of a given message 100ms after it wasn't received, then 200ms, and so on, until we're at 1600ms. From then on, we will send retransmit requests for that message every 100ms, until the message is received, or the original sender crashed.

Example: retransmit_timeout="300,600,1200,2400"

eager_lock_release By default, we release the lock on the sender in up() after the up() method call passed up the stack returns. However, with eager_lock_release enabled (default), we release the lock as soon as the application calls Channel.down() within the receive() callback. This leads to issues as the one described in http://jira.jboss.com/jira/browse/JGRP-656.

Note that ordering is still correct , but messages from self might get delivered concurrently. This can be turned off by setting eager_lock_release to false.

immediate_ack (default: false) By default, we ack a message only aftre it has been delivered to the application. However, because UNICAST gets a lock per sender and only passes up a single message per sender at any time, we might have received a message (i.e, added to the received table), but not yet sent an ack because the message has not yet been processed. This leads to unnecessary retransmissison requests from the sender.

Setting immediate_ack="true" sends an ack as soon as the message has been added to the received table.

Note that the sender cannot really overwhelm the receiver because flow control (FC) will throttle the sender should it send too many messages for the receiver to keep up with.

7.5.3. SMACK

7.6. Fragmentation

7.6.1. FRAG and FRAG2

7.7. Ordering (FIFO covered by NAKACK)

7.7.1. Total Order (SEQUENCER)

7.8. Group Membership

Group membership takes care of joining new members, handling leave requests by existing members, and handling SUSPECT messages for crashed members, as emitted by failure detection protocols. The algorithm for joining a new member is essentially:

                - loop
                - find initial members (discovery)
                - if no responses:
                - become singleton group and break out of the loop
                - else:
                - determine the coordinator (oldest member) from the responses
                - send JOIN request to coordinator
                - wait for JOIN response
                - if JOIN response received:
                - install view and break out of the loop
                - else
                - sleep for 5 seconds and continue the loop
            

7.8.1. pbcast.GMS

Table 7.8. Properties

NameDescription
join_timeoutNumber of milliseconds to wait for a JOIN response from the coordinator, until we send a new JOIN request. Default=5000
join_retry_timeouNumber of ms to wait before sending a new JOIN request
leave_timeoutNumber of ms to wait until a LEAVE response has been received from the coordinator. Once this time has elapsed, we leave anyway.
shun 
merge_leader 
print_local_addrWhether or not to print to stdout the local address of a newly started member. Default is "true". Example:
                                    -------------------------------------------------------
                                    GMS: address is 192.168.5.2:4682
                                    -------------------------------------------------------
                                
merge_timeout 
digest_timeout 
view_ack_collection_timeout 
resume_task_timeout 
disable_initial_coord 
handle_concurrent_startupDefault: "true". Handles concurrent starting of N initial members. Setting it to false is only used for unit tests, where the correctness of the subsequent merge is tested, we don't recommend setting it to false.
num_prev_mbrs 
use_flush 
flush_timeout 
reject_join_from_existing_member If we receive a JOIN request from P and P is already in the current membership, then we send back a JOIN response with an error message when this property is set to true (Channel.connect() will fail). Otherwise, we return the current view
view_bundlingWhether to enable view bundling (default is true). View bundling means that multiple view-affecting requests to GNS, such as JOIN, LEAVE or SUSPECT, are bundled for a number of milliseconds in order to avoid having to generate one view per request.

This is especially interesting if we have many members joining or leaving a group at the same time.
max_bundling_timeMax number of milliseconds to wait for subsequent JOIN/LEAVE/SUSPECT requests (default is 50). Therefore, when 5 JOIN or LEAVE requests are received within 50ms, only 1 view will be generated

7.8.1.1. Disabling the initial coordinator

Consider the following situation: a new member wants to join a group. The prodedure to do so is:

  • Multicast an (unreliable) discovery request (ping)

  • Wait for n responses or m milliseconds (whichever is first)

  • Every member responds with the address of the coordinator

  • If the initial responses are > 0: determine the coordinator and start the JOIN protocolg

  • If the initial response are 0: become coordinator, assuming that no one else is out there

However, the problem is that the initial mcast discovery request might get lost, e.g. when multiple members start at the same time, the outgoing network buffer might overflow, and the mcast packet might get dropped. Nobody receives it and thus the sender will not receive any responses, resulting in an initial membership of 0. This could result in multiple coordinators, and multiple subgroups forming. How can we overcome this problem ? There are 3 solutions:

  1. Increase the timeout, or number of responses received. This will only help if the reason of the empty membership was a slow host. If the mcast packet was dropped, this solution won't help

  2. Add the MERGE(2) protocol. This doesn't actually prevent multiple initial cordinators, but rectifies the problem by merging different subgroups back into one. Note that this involves state merging which needs to be done by the application.

  3. (new) Prevent members from becoming coordinator on initial startup. This solution is applicable when we know which member is going to be the initial coordinator of a fresh group. We don't care about afterwards, then coordinatorship can migrate to another member. In this case, we configure the member that is always supposed to be started first with disable_initial_coord=false (the default) and all other members with disable_initial_coord=true.This works as described below.

When the initial membership is received, and is null, and the property disable_initial_coord is true, then we just continue in the loop and retry receving the initial membership (until it is non-null). If the property is false, we are allowed to become coordinator, and will do so. Note that - if a member is started as first member of a group - but its property is set to true, then it will loop until another member whose disable_initial_coord property is set to false, is started.

7.9. Security

7.9.1. ENCRYPT

7.9.2. AUTH

7.10. State Transfer

7.10.1. pbcast.STATE_TRANSFER

7.10.2. pbcast.STREAMING_STATE_TRANSFER

7.10.2.1. Overview

In order to transfer application state to a joining member of a group pbcast.STATE_TRANSFER has to load entire state into memory and send it to a joining member. Major limitation of this approach is that the state transfer that is very large (>1Gb) would likely result in OutOfMemoryException. In order to alleviate this problem a new state transfer methodology, based on a streaming state transfer, was introduced in JGroups 2.4

Streaming state transfer supports both partial and full state transfer.

Streaming state transfer provides an InputStream to a state reader and an OutputStream to a state writer. OutputStream and InputStream abstractions enable state transfer in byte chunks thus resulting in smaller memory requirements. For example, if application state consists a huge DOM tree, whose aggregate size is 2GB (and which has partly been passivated to disk), then the state provider (ie. the coordinator) can simply iterate over the DOM tree (activating the parts which have been passivated out to disk), and write to the OutputStream as it traverses the tree. The state receiver will simply read from the InputStream and reconstruct the tree on its side, possibly again passivating parts to disk.

Rather than having to provide a 2GB byte[] buffer, streaming state transfer transfers the state in chunks of N bytes where N is user configurable.

7.10.2.2. API

Streaming state transfer, just as regular byte based state transfer, can be used in both pull and push mode. Similarly to the current getState and setState methods of org.jgroups.MessageListener, application interested in streaming state transfer in a push mode would implement streaming getState method(s) by sending/writing state through a provided OutputStream reference and setState method(s) by receiving/reading state through a provided InputStream reference. In order to use streaming state transfer in a push mode, existing ExtendedMessageListener has been expanded to include additional four methods:

                        public interface ExtendedMessageListener
                        {

                        /*non-streaming callback methods ommitted for clarity*/


                        /**
                        * Allows an application to write a state through a provided OutputStream.
                        * An application is obligated to always close the given OutputStream reference.
                        *
                        * @param ostream the OutputStream
                        * @see OutputStream#close()
                        */
                        public void getState(OutputStream ostream);

                        /**
                        * Allows an application to write a partial state through a provided OutputStream.
                        * An application is obligated to always close the given OutputStream reference.
                        *
                        * @param state_id id of the partial state requested
                        * @param ostream the OutputStream
                        *
                        * @see OutputStream#close()
                        */
                        public void getState(String state_id, OutputStream ostream);


                        /**
                        * Allows an application to read a state through a provided InputStream.
                        * An application is obligated to always close the given InputStream reference.
                        *
                        * @param istream the InputStream
                        * @see InputStream#close()
                        */
                        public void setState(InputStream istream);

                        /**
                        * Allows an application to read a partial state through a provided InputStream.
                        * An application is obligated to always close the given InputStream reference.
                        *
                        * @param state_id id of the partial state requested
                        * @param istream the InputStream
                        *
                        * @see InputStream#close()
                        */
                        public void setState(String state_id, InputStream istream);

                        }
                    

For a pull mode (when application uses channel.receive() to fetch events) two new event classes will be introduced:

  • StreamingGetStateEvent

  • StreamingSetStateEvent

These two events/classes are very similar to existing GetStateEvent and SetStateEvent but introduce a new field; StreamingGetStateEvent has an OutputStream and StreamingSetStateEvent has an InputStream.

The following code snippet demonstrates how to pull events from a channel, processing StreamingGetStateEvent and sending hypothetical state through a provided OutputStream reference. Handling of StreamingSetStateEvent is analogous to this example:

...
                    Object obj=channel.receive(0);
                    if(obj instanceof StreamingGetStateEvent) {
                    StreamingGetStateEvent evt=(StreamingGetStateEvent)obj;
                    OutputStream oos = null;
                    try {
                    oos = new ObjectOutputStream(evt.getArg());
                    oos.writeObject(state);
                    oos.flush();
                    } catch (Exception e) {}
                    finally{
                    try {
                    oos.close();
                    } catch (IOException e) {
                    System.err.println(e);
                    }
                    }
                    }
                    ...
                

API that initiates state transfer on a JChannel level has the following methods:

public boolean getState(Address target,long timeout)throws
                        ChannelNotConnectedException,ChannelClosedException;
                        public boolean getState(Address target,String state_id,long timeout)throws
                        ChannelNotConnectedException,ChannelClosedException;
                    

Introduction of STREAMING_STATE_TRANSFER does not change the current API.

7.10.2.3. Configuration

State transfer type choice is static, implicit and mutually exclusive. JChannel cannot use both STREAMING_STATE_TRANSFER and STATE_TRANSFER in one JChannel configuration.

STREAMING_STATE_TRANSFER allows the following confguration parameters:

Table 7.9. Properties

NameDescription
bind_addrThe network interface to be used for receiving of state requests, e.g. bind_addr="192.16.8.0.2"
start_portPort on the bind_addr network interface to be used for receiving of state requests, e.g. start_port="4444"
max_poolMaximum number of threads in the pool providing state requests, default=5, e.g. max_pool="10"
pool_thread_keep_alivePool thread keep alive in msec, default=30000, e.g. pool_thread_keep_alive="60000"
use_reading_threadUse separate thread for reading state, default=false, e.g. use_reading_thread="true"
socket_buffer_sizeChunk size used for state transfer, default=8192 e.g. socket_buffer_size="32768"

7.10.2.4. Other considerations

Threading model used for state writing in a member providing state and state reading in a member receiving a state is tunable. For state provider thread pool is used to spawn threads providing state. Thus member providing state, in a push mode, will be able to concurrently serve N state requests where N is max_threads configuration parameter of the thread pool. If there are no further state transfer requests pool threads will be automatically reaped after configurable "pool_thread_keep_alive" timeout expires. For a channel operating in the push mode state reader channel can read state by piggybacking on jgroups protocol stack thread or optionally use a separate thread. State reader should use a separate thread if state reading is expensive (eg. large state, serialization) thus potentially affecting liveness of jgroups protocol thread. Since most state transfers are very short (<2-3 sec) by default we do not use a separate thread.

7.11. Flow control

Flow control takes care of adjusting the rate of a message sender to the rate of the slowest receiver over time. If a sender continuously sends messages at a rate that is faster than the receiver(s), the receivers will either queue up messages, or the messages will get discarded by the receiver(s), triggering costly retransmissions. In addition, there is spurious traffic on the cluster, causing even more retransmissions.

Flow control throttles the sender so the receivers are not overrun with messages.

7.11.1. FC

FC uses a credit based system, where each sender has max_credits credits and decrements them whenever a message is sent. The sender blocks when the credits fall below 0, and only resumes sending messages when it receives a replenishment message from the receivers.

The receivers maintain a table of credits for all senders and decrement the given sender's credits as well, when a message is received.

When a sender's credits drops below a threshold, the receiver will send a replenishment message to the sender. The threshold is defined by min_bytes or min_threshold.

Table 7.10. Properties

NameDescription
max_creditsMax number of bytes the sender is allowed to send before blocking until replenishments from the receivers are received
min_creditsMin credits in bytes. If the available credits for a sender drop below this value, a receiver will send a replenishment message to the sender
min_thresholdSame as min_credits, but expressed as a percentage of max_credits, e.g. 0.1 (10% of max_credits)
max_block_timeThe maximum time in milliseconds a sender can be blocked. After this time has elapsed, and no replenishment has been received, the sender replenishes itself and continues sending. Set it to 0 to prevent this
max_block_times A list of pairs KEY:VAL, separated by commas, where KEY is the number of bytes and VAL the max number of milliseconds to block. Compared to max_block_time, this property can be used to define the max number of milliseconds per message length to block if not enough credits are available. After that time, the message will be sent regardless of whether credits were received in the meantime.

Example: max_block_times="50:2,500:5,1500:10,10000:20,100000:500"

This means that messages smaller than or equal to 50 bytes will block for 2 ms max ( or not block at all if enough credits are available, of course). Messages between 51 and 500 bytes will block a max time of 5 ms, and so on. All message larger than 100'000 bytes will block for a max time of 500 ms.

7.11.2. SFC

A simplified version of FC. FC can actually still overrun receivers when the transport's latency is very small. SFC is a simple flow control protocol for group (= multipoint) messages.

Every sender has max_credits bytes for sending multicast messages to the group.

Every multicast message (we don't consider unicast messages) decrements max_credits by its size. When max_credits falls below 0, the sender asks all receivers for new credits and blocks until *all* credits have been received from all members.

When the receiver receives a credit request, it checks whether it has received max_credits bytes from the requester since the last credit request. If yes, it sends new credits to the requester and resets the max_credits for the requester. Else, it takes a note of the credit request from P and - when max_credits bytes have finally been received from P - it sends the credits to P and resets max_credits for P.

The maximum amount of memory for received messages is therefore <number of senders> * max_credits.

The relationship with STABLE is as follows: when a member Q is slow, it will prevent STABLE from collecting messages above the ones seen by Q (everybody else has seen more messages). However, because Q will *not* send credits back to the senders until it has processed all messages worth max_credits bytes, the senders will block. This in turn allows STABLE to progress and eventually garbage collect most messages from all senders. Therefore, SFC and STABLE complement each other, with SFC blocking senders so that STABLE can catch up.

Table 7.11. Properties

NameDescription
max_creditsMax number of bytes the sender is allowed to send before blocking until replenishments from the receivers are received

7.12. Message stability

To serve potential retransmission requests, a member has to store received messages until it is known that every member in the cluster has received them. Message stability for a given message M means that M has been seen by everyone in the cluster.

The stability protocol periodically (or when a certain number of bytes have been received) initiates a consensus protocol, which multicasts a stable message containing the highest message numbers for a given member. This is called a digest.

When everyone has received everybody else's stable messages, a digest is computed which consists of the minimum sequence numbers of all received digests so far. This is the stability vector, and contain only message sequence numbers that have been seen by everyone.

This stability vector is the broadcast to the group and everyone can remove messages from their retransmission tables whose sequence numbers are smaller than the ones received in the stability vector. These messages can then be garbage collected.

7.12.1. STABLE

Table 7.12. Properties

NameDescription
desired_avg_gossip Interval in milliseconds at which a stable message is broadcast to the cluster. This is randomized to prevent all members from sending the message at the same time. If set to 0, it is disabled.
max_bytes Maximum number of bytes received after which a stable message is broadcast to the cluster. A high number means fewer stability rounds which purge more messages. A smaller value means a higher frequency of stability rounds which purge fewer messages. This is similar to garbage collection in the JVM.
stability_delay When sending a stability message, we wait a randomized time between 1 and stability_delay milliseconds before sending it. If, when about to send the message, a stability message is received, we cancel our own message. This is to prevent everyone from sending the message at the same time.

7.13. Diagnostics

7.13.1. PERF

7.13.2. SIZE

7.13.3. TRACE

7.13.4. PRINTOBJS

7.14. Misc

7.14.1. COMPRESS

7.14.2. pbcast.FLUSH

Flushing forces group members to send all their pending messages prior to a certain event. The process of flushing acquiesces the cluster so that state transfer or a join can be done. It is also called the stop-the-world model as nobody will be able to send messages while a flush is in process. Flush is used:

  • State transfer

    When a member requests state transfer it tells everyone to stop sending messages and waits for everyone's ack. Then it asks the application for its state and ships it back to the requester. After the requester has received and set the state successfully, the requester tells everyone to resume sending messages.

  • View changes (e.g.a join). Before installing a new view V2, flushing would ensure that all messages *sent* in the current view V1 are indeed *delivered* in V1, rather than in V2 (in all non-faulty members). This is essentially Virtual Synchrony.

FLUSH is designed as another protocol positioned just below the channel, e.g. above STATE_TRANSFER and FC. STATE_TRANSFER and GMS protocol request flush by sending a SUSPEND event up the stack, where it is handled by the FLUSH protcol. The SUSPEND_OK ack sent back by the FLUSH protocol let's the caller know that the flush has completed. When done (e.g. view was installed or state transferred), the protocol sends up a RESUME event, which will allow everyone in the cluster to resume sending.

Channel can be notified that FLUSH phase has been started by turning channel block option on. By default it is turned off. If channel blocking is turned on FLUSH notifies application layer that channel has been blocked by sending EVENT.BLOCK event. Channel responds by sending EVENT.BLOCK_OK event down to FLUSH protocol. We recommend turning on channel block notification only if channel is used in push mode. In push mode application that uses channel can perform block logic by implementing MembershipListener.block() callback method.

Table 7.13. Properties

NameDescription
timeoutMaximum time that FLUSH.down() will be blocked before being unblocked. Should be sufficient enough to allow large state transfers,default=8000 msec
block_timeoutMaximum amount of time that FLUSH will be waiting for EVENT.BLOCK_OK once Event.BLOCK has been sent to application level, default=10000 msec


[12] Note that NAKACK can also be configured to send retransmission requests for M to anyone in the cluster, rather than only to the sender of M.