package org.jboss.cache;
import org.jboss.cache.eviction.LRUPolicy;
import org.jboss.cache.interceptors.Interceptor;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.lock.IsolationLevel;
import org.jboss.cache.lock.LockStrategyFactory;
import org.jboss.cache.lock.LockingException;
import org.jboss.cache.lock.TimeoutException;
import org.jboss.logging.Logger;
import org.jboss.system.ServiceMBeanSupport;
import org.jboss.util.NestedRuntimeException;
import org.jgroups.*;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.MethodCall;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
import org.jgroups.util.Util;
import org.w3c.dom.Attr;
import org.w3c.dom.Element;
import org.w3c.dom.NamedNodeMap;
import org.w3c.dom.NodeList;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.lang.reflect.Method;
import java.util.*;
public class TreeCache extends ServiceMBeanSupport implements TreeCacheMBean, Cloneable, MembershipListener {
protected Node root=new Node(SEPARATOR, Fqn.fromString(SEPARATOR), null, null, this);
protected final Vector listeners=new Vector();
protected JChannel channel=null;
protected boolean coordinator=false;
protected String cluster_name="TreeCache-Group";
protected String cluster_props=null;
protected final Vector members=new Vector();
protected RpcDispatcher disp=null;
protected MessageListener ml=new MessageListenerAdaptor(this, log);
protected long state_fetch_timeout=5000;
protected long sync_repl_timeout=15000;
protected boolean use_repl_queue=false;
protected int repl_queue_max_elements=1000;
protected long repl_queue_interval=5000;
private final TransactionTable tx_table=new TransactionTable();
private final HashMap lock_table=new HashMap();
protected boolean fetch_state_on_startup=true;
protected long lock_acquisition_timeout=10000;
protected String eviction_policy_class=null;
protected TreeCacheListener eviction_policy_provider = null;
protected int cache_mode=LOCAL;
public static Method putDataMethodLocal=null;
public static Method putDataEraseMethodLocal=null;
public static Method putKeyValMethodLocal=null;
public static Method putFailFastKeyValueMethodLocal=null;
public static Method removeNodeMethodLocal=null;
public static Method removeKeyMethodLocal=null;
public static Method removeDataMethodLocal=null;
public static Method evictNodeMethodLocal=null;
public static Method prepareMethod=null;
public static Method commitMethod=null;
public static Method rollbackMethod=null;
public static Method replicateMethod=null;
public static Method replicateAllMethod=null;
public static Method addChildMethodLocal=null;
public static Method getKeyValueMethodLocal=null;
public static Method getNodeMethodLocal=null;
public static Method getKeysMethodLocal=null;
public static Method getChildrenNamesMethodLocal=null;
public static Method releaseAllLocksMethodLocal=null;
public static Method printMethodLocal=null;
public static Method lockMethodLocal=null;
static LinkedList crud_methods=new LinkedList();
protected boolean isStateSet=false;
private final Object stateLock=new Object();
protected IsolationLevel isolationLevel=IsolationLevel.REPEATABLE_READ;
protected Element evictConfig_ = null;
public MessageListener getMessageListener() {
return ml;
}
protected Interceptor interceptor_chain=null;
protected Replicatable replication_handler=null;
protected TransactionManagerLookup tm_lookup=null;
protected String tm_lookup_class=null;
protected TransactionManager tm=null;
protected String cache_loader_class=null;
protected CacheLoader cache_loader=null;
protected Properties cache_loader_config=null;
protected boolean cache_loader_shared=true;
protected List cache_loader_preload=null;
protected boolean cache_loader_fetch_transient_state=true;
protected boolean cache_loader_fetch_persistent_state=true;
protected boolean sync_commit_phase=false;
protected boolean sync_rollback_phase=false;
protected boolean deadlockDetection=false;
protected ReplicationQueue repl_queue=null;
public static final String SEPARATOR="/";
public static final int LOCAL=1;
public static final int REPL_ASYNC=2;
public static final int REPL_SYNC=3;
static public final String UNINITIALIZED="jboss:internal:uninitialized"; static final String JNDI_LOCATOR_URI="socket://localhost:6789";
static {
try {
putDataMethodLocal=TreeCache.class.getDeclaredMethod("_put",
new Class[]{GlobalTransaction.class,
Fqn.class,
Map.class,
boolean.class});
putDataEraseMethodLocal=TreeCache.class.getDeclaredMethod("_put",
new Class[]{GlobalTransaction.class,
Fqn.class,
Map.class,
boolean.class,
boolean.class});
putKeyValMethodLocal=TreeCache.class.getDeclaredMethod("_put",
new Class[]{GlobalTransaction.class,
Fqn.class,
Object.class,
Object.class,
boolean.class});
putFailFastKeyValueMethodLocal=TreeCache.class.getDeclaredMethod("_put",
new Class[]{GlobalTransaction.class,
Fqn.class,
Object.class,
Object.class,
boolean.class,
long.class});
removeNodeMethodLocal=TreeCache.class.getDeclaredMethod("_remove",
new Class[]{GlobalTransaction.class,
Fqn.class,
boolean.class});
removeKeyMethodLocal=TreeCache.class.getDeclaredMethod("_remove",
new Class[]{GlobalTransaction.class,
Fqn.class,
Object.class,
boolean.class});
removeDataMethodLocal=TreeCache.class.getDeclaredMethod("_removeData",
new Class[]{GlobalTransaction.class,
Fqn.class,
boolean.class});
evictNodeMethodLocal=TreeCache.class.getDeclaredMethod("_evict", new Class[] {Fqn.class});
prepareMethod=TreeCache.class.getDeclaredMethod("prepare",
new Class[]{GlobalTransaction.class,
List.class,
Address.class,
boolean.class});
commitMethod=TreeCache.class.getDeclaredMethod("commit",
new Class[]{GlobalTransaction.class});
rollbackMethod=TreeCache.class.getDeclaredMethod("rollback",
new Class[]{GlobalTransaction.class});
addChildMethodLocal=TreeCache.class.getDeclaredMethod("_addChild",
new Class[]{GlobalTransaction.class,
Fqn.class, Object.class, Node.class});
getKeyValueMethodLocal=TreeCache.class.getDeclaredMethod("_get",
new Class[]{Fqn.class, Object.class, boolean.class});
getNodeMethodLocal=TreeCache.class.getDeclaredMethod("_get", new Class[]{Fqn.class});
getKeysMethodLocal=TreeCache.class.getDeclaredMethod("_getKeys", new Class[]{Fqn.class});
getChildrenNamesMethodLocal=TreeCache.class.getDeclaredMethod("_getChildrenNames", new Class[]{Fqn.class});
replicateMethod=TreeCache.class.getDeclaredMethod("_replicate", new Class[]{MethodCall.class});
replicateAllMethod=TreeCache.class.getDeclaredMethod("_replicate", new Class[]{List.class});
releaseAllLocksMethodLocal=TreeCache.class.getDeclaredMethod("_releaseAllLocks", new Class[]{Fqn.class});
printMethodLocal=TreeCache.class.getDeclaredMethod("_print", new Class[]{Fqn.class});
lockMethodLocal=TreeCache.class.getDeclaredMethod("_lock", new Class[]{Fqn.class,
int.class,
boolean.class});
}
catch(NoSuchMethodException ex) {
ex.printStackTrace();
throw new ExceptionInInitializerError(ex.toString());
}
crud_methods.add(putDataMethodLocal);
crud_methods.add(putDataEraseMethodLocal);
crud_methods.add(putKeyValMethodLocal);
crud_methods.add(putFailFastKeyValueMethodLocal);
crud_methods.add(removeNodeMethodLocal);
crud_methods.add(removeKeyMethodLocal);
crud_methods.add(removeDataMethodLocal);
}
public static boolean isCrudMethod(Method m) {
return m == null? false : crud_methods.contains(m);
}
public TreeCache(String cluster_name, String props, long state_fetch_timeout) throws Exception {
super();
if(cluster_name != null)
this.cluster_name=cluster_name;
if(props != null)
this.cluster_props=props;
this.state_fetch_timeout=state_fetch_timeout;
}
public TreeCache() throws Exception {
super();
}
public TreeCache(JChannel channel) throws Exception {
super();
this.channel=channel;
}
public Node getRoot() {
return root;
}
public Object getLocalAddress() {
return channel != null ? channel.getLocalAddress() : null;
}
public Vector getMembers() {
return members;
}
public boolean isCoordinator() {
return coordinator;
}
public String getClusterName() {
return cluster_name;
}
public void setClusterName(String name) {
cluster_name=name;
}
public String getClusterProperties() {
return cluster_props;
}
public void setClusterProperties(String cluster_props) {
this.cluster_props=cluster_props;
}
public TransactionTable getTransactionTable() {
return tx_table;
}
public HashMap getLockTable() {
return lock_table;
}
public String dumpTransactionTable() {
return tx_table.toString(true);
}
public boolean getDeadlockDetection() {
return deadlockDetection;
}
public void setDeadlockDetection(boolean dt) {
deadlockDetection=dt;
if(disp != null)
disp.setDeadlockDetection(dt);
}
public String getInterceptorChain() {
String retval=printInterceptorChain(interceptor_chain);
if(retval == null || retval.length() == 0)
return "<empty>";
else
return retval;
}
public List getInterceptors() {
if(interceptor_chain == null)
return null;
int num=1;
Interceptor tmp=interceptor_chain;
while((tmp=tmp.getNext()) != null) {
num++;
}
List retval=new ArrayList(num);
tmp=interceptor_chain;
num=0;
do {
retval.add(tmp);
tmp=tmp.getNext();
}
while(tmp != null);
return retval;
}
public String getCacheLoaderClass() {
return cache_loader_class;
}
public void setCacheLoaderClass(String cache_loader_class) {
this.cache_loader_class=cache_loader_class;
}
public Properties getCacheLoaderConfig() {
return cache_loader_config;
}
public void setCacheLoaderConfig(Properties cache_loader_config) {
this.cache_loader_config=cache_loader_config;
}
public CacheLoader getCacheLoader() {
return cache_loader;
}
public void setCacheLoader(CacheLoader cache_loader) {
this.cache_loader=cache_loader;
}
public boolean getCacheLoaderShared() {
return cache_loader_shared;
}
public void setCacheLoaderShared(boolean shared) {
this.cache_loader_shared=shared;
}
public void setCacheLoaderPreload(String list) {
if(list == null) return;
ArrayList l;
StringTokenizer st=new StringTokenizer(list, ",");
String tok;
Fqn fqn;
l=new ArrayList();
while(st.hasMoreTokens()) {
tok=st.nextToken();
fqn=Fqn.fromString(tok.trim());
l.add(fqn);
}
if(l.size() > 0)
this.cache_loader_preload=l;
}
public String getCacheLoaderPreload() {
return cache_loader_preload != null? cache_loader_preload.toString() : null;
}
public void setCacheLoaderFetchPersistentState(boolean flag) {
cache_loader_fetch_persistent_state=flag;
}
public boolean getCacheLoaderFetchPersistentState() {
return cache_loader_fetch_persistent_state;
}
public void setCacheLoaderFetchTransientState(boolean flag) {
cache_loader_fetch_transient_state=flag;
}
public boolean getCacheLoaderFetchTransientState() {
return cache_loader_fetch_transient_state;
}
public boolean getSyncCommitPhase() {
return sync_commit_phase;
}
public void setSyncCommitPhase(boolean sync_commit_phase) {
this.sync_commit_phase=sync_commit_phase;
}
public boolean getSyncRollbackPhase() {
return sync_rollback_phase;
}
public void setSyncRollbackPhase(boolean sync_rollback_phase) {
this.sync_rollback_phase=sync_rollback_phase;
}
public void setEvictionPolicyConfig(Element config) {
evictConfig_ = config;
log.info("setEvictionPolicyConfig(): " +config);
}
public Element getEvictionPolicyConfig() {
return evictConfig_;
}
public void setClusterConfig(Element config) {
StringBuffer buffer=new StringBuffer();
NodeList stack=config.getChildNodes();
int length=stack.getLength();
for(int s=0; s < length; s++) {
org.w3c.dom.Node node=stack.item(s);
if(node.getNodeType() != org.w3c.dom.Node.ELEMENT_NODE)
continue;
Element tag=(Element)node;
String protocol=tag.getTagName();
buffer.append(protocol);
NamedNodeMap attrs=tag.getAttributes();
int attrLength=attrs.getLength();
if(attrLength > 0)
buffer.append('(');
for(int a=0; a < attrLength; a++) {
Attr attr=(Attr)attrs.item(a);
String name=attr.getName();
String value=attr.getValue();
buffer.append(name);
buffer.append('=');
buffer.append(value);
if(a < attrLength - 1)
buffer.append(';');
}
if(attrLength > 0)
buffer.append(')');
buffer.append(':');
}
buffer.setLength(buffer.length() - 1);
setClusterProperties(buffer.toString());
log.info("setting cluster properties from xml to: " + cluster_props);
}
public long getInitialStateRetrievalTimeout() {
return state_fetch_timeout;
}
public void setInitialStateRetrievalTimeout(long timeout) {
state_fetch_timeout=timeout;
}
public String getCacheMode() {
return mode2String(cache_mode);
}
public int getCacheModeInternal() {
return cache_mode;
}
private String mode2String(int mode) {
switch(mode) {
case LOCAL:
return "LOCAL";
case REPL_ASYNC:
return "REPL_ASYNC";
case REPL_SYNC:
return "REPL_SYNC";
default:
throw new RuntimeException("setCacheMode(): caching mode " + mode + " is invalid");
}
}
public void setCacheMode(String mode) throws Exception {
int m=string2Mode(mode);
setCacheMode(m);
}
public void setCacheMode(int mode) {
if(mode == LOCAL || mode == REPL_ASYNC || mode == REPL_SYNC)
this.cache_mode=mode;
else
throw new IllegalArgumentException("setCacheMode(): caching mode " + mode + " is invalid");
}
public long getSyncReplTimeout() {
return sync_repl_timeout;
}
public void setSyncReplTimeout(long timeout) {
sync_repl_timeout=timeout;
}
public boolean getUseReplQueue() {
return use_repl_queue;
}
public void setUseReplQueue(boolean flag) {
use_repl_queue=flag;
if(flag) {
if(repl_queue == null) {
repl_queue=new ReplicationQueue(this, repl_queue_interval, repl_queue_max_elements);
if(repl_queue_interval >= 0)
repl_queue.start();
}
}
else {
if(repl_queue != null) {
repl_queue.stop();
repl_queue=null;
}
}
}
public long getReplQueueInterval() {
return repl_queue_interval;
}
public void setReplQueueInterval(long interval) {
this.repl_queue_interval=interval;
if(repl_queue != null)
repl_queue.setInterval(interval);
}
public int getReplQueueMaxElements() {
return repl_queue_max_elements;
}
public void setReplQueueMaxElements(int max_elements) {
this.repl_queue_max_elements=max_elements;
if(repl_queue != null)
repl_queue.setMax_elements(max_elements);
}
public ReplicationQueue getReplQueue() {
return repl_queue;
}
public String getIsolationLevel() {
return isolationLevel.toString();
}
public void setIsolationLevel(String level) {
IsolationLevel tmp_level=IsolationLevel.stringToIsolationLevel(level);
if(tmp_level == null) {
throw new IllegalArgumentException("TreeCache.setIsolationLevel(): level \"" + level + "\" is invalid");
}
setIsolationLevel(tmp_level);
}
public void setIsolationLevel(IsolationLevel level) {
isolationLevel=level;
LockStrategyFactory.setIsolationLevel(level);
}
public IsolationLevel getIsolationLevelClass() {
return isolationLevel;
}
public boolean getFetchStateOnStartup() {
return fetch_state_on_startup;
}
public void setFetchStateOnStartup(boolean flag) {
fetch_state_on_startup=flag;
}
public long getLockAcquisitionTimeout() {
return lock_acquisition_timeout;
}
public void setLockAcquisitionTimeout(long timeout) {
this.lock_acquisition_timeout=timeout;
}
public String getEvictionPolicyClass() {
return eviction_policy_class;
}
public void setEvictionPolicyClass(String eviction_policy_class) {
if(eviction_policy_class == null || eviction_policy_class.length() ==0)
return;
try {
this.eviction_policy_class=eviction_policy_class;
eviction_policy_provider =(TreeCacheListener)
getClass().getClassLoader().loadClass(eviction_policy_class).newInstance();
this.addTreeCacheListener(eviction_policy_provider );
}
catch(Throwable t) {
log.error("setEvictionPolicyClass(): failed creating instance of " + eviction_policy_class, t);
}
}
public int getEvictionThreadWakeupIntervalSeconds() {
if( eviction_policy_provider == null ) return -1;
else
return ((LRUPolicy)eviction_policy_provider).getWakeupIntervalSeconds();
}
public void setTransactionManagerLookup(TransactionManagerLookup l) {
this.tm_lookup=l;
}
public String getTransactionManagerLookupClass() {
return tm_lookup_class;
}
public void setTransactionManagerLookupClass(String cl) throws Exception {
this.tm_lookup_class=cl;
}
public TransactionManager getTransactionManager() {
return tm;
}
public TreeCache getInstance() {
return this;
}
public void setReplicationHandler(Replicatable handler) {
replication_handler=handler;
}
public Replicatable getReplicationHandler() {
return replication_handler;
}
public void fetchState(long timeout) throws ChannelClosedException, ChannelNotConnectedException {
if(channel == null)
throw new ChannelNotConnectedException();
boolean rc=channel.getState(null, timeout);
if(rc)
log.info("fetchState(): state was retrieved successfully");
else
log.info("fetchState(): state could not be retrieved (first member)");
}
public void addTreeCacheListener(TreeCacheListener listener) {
if(!listeners.contains(listener))
listeners.addElement(listener);
}
public void removeTreeCacheListener(TreeCacheListener listener) {
listeners.removeElement(listener);
}
public void createService() throws Exception {
}
public void destroyService() {
}
public void startService() throws Exception {
if(this.tm_lookup == null && this.tm_lookup_class != null) {
Class clazz=Thread.currentThread().getContextClassLoader().loadClass(this.tm_lookup_class);
this.tm_lookup=(TransactionManagerLookup)clazz.newInstance();
}
try {
if(tm_lookup != null)
tm=tm_lookup.getTransactionManager();
else
log.warn("No transaction manager lookup class has been defined. Transactions cannot be used");
}
catch(Exception e) {
log.debug("failed looking up TransactionManager, will not use transactions", e);
}
createCacheLoader();
createInterceptorChain();
createEvictionPolicy();
switch(cache_mode) {
case LOCAL:
log.info("cache mode is local, will not create the channel");
break;
case REPL_SYNC:
case REPL_ASYNC:
log.info("cache mode is " + mode2String(cache_mode));
if(channel != null) { log.info("channel is already running");
return;
}
if(cluster_props == null) {
cluster_props=getDefaultProperties();
log.debug("setting cluster properties to default value");
}
channel=new JChannel(cluster_props);
channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
if(log.isTraceEnabled())
log.trace("cache properties: " + cluster_props);
channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
disp=new RpcDispatcher(channel, ml, this, this);
disp.setDeadlockDetection(deadlockDetection);
channel.connect(cluster_name);
if(fetch_state_on_startup) {
fetchStateOnStartup();
}
break;
default:
throw new IllegalArgumentException("cache mode " + cache_mode + " is invalid");
}
cacheLoaderPreload();
coordinator=determineCoordinator();
notifyCacheStarted();
}
private void createEvictionPolicy() {
if(eviction_policy_provider != null)
((LRUPolicy)eviction_policy_provider).configure(this);
}
protected void createInterceptorChain() throws IllegalAccessException, InstantiationException, ClassNotFoundException {
Interceptor call_interceptor=null;
Interceptor lock_interceptor=null;
Interceptor repl_interceptor=null;
Interceptor cache_loader_interceptor=null;
Interceptor cache_store_interceptor=null;
Interceptor unlock_interceptor=null;
Interceptor first=null;
call_interceptor=createInterceptor("org.jboss.cache.interceptors.CallInterceptor");
call_interceptor.setCache(this);
lock_interceptor=createInterceptor("org.jboss.cache.interceptors.LockInterceptor");
lock_interceptor.setCache(this);
unlock_interceptor=createInterceptor("org.jboss.cache.interceptors.UnlockInterceptor");
unlock_interceptor.setCache(this);
if(cache_mode != LOCAL) {
repl_interceptor=createInterceptor("org.jboss.cache.interceptors.ReplicationInterceptor");
repl_interceptor.setCache(this);
}
if(cache_loader_class != null || cache_loader != null) {
cache_loader_interceptor=createInterceptor("org.jboss.cache.interceptors.CacheLoaderInterceptor");
cache_loader_interceptor.setCache(this);
cache_store_interceptor=createInterceptor("org.jboss.cache.interceptors.CacheStoreInterceptor");
cache_store_interceptor.setCache(this);
}
if(cache_loader_interceptor != null) {
if(cache_loader_shared == true) {
if(first == null)
first=cache_store_interceptor;
else
addInterceptor(first, cache_store_interceptor);
}
}
if(repl_interceptor != null) {
if(first == null)
first=repl_interceptor;
else
addInterceptor(first, repl_interceptor);
}
if(unlock_interceptor != null) {
if(first == null)
first=unlock_interceptor;
else
addInterceptor(first, unlock_interceptor);
}
if(cache_loader_interceptor != null) {
if(cache_loader_shared == true) {
if(first == null)
first=cache_loader_interceptor;
else
addInterceptor(first, cache_loader_interceptor);
}
else {
if(first == null)
first=cache_loader_interceptor;
else
addInterceptor(first, cache_loader_interceptor);
if(first == null)
first=cache_store_interceptor;
else
addInterceptor(first, cache_store_interceptor);
}
}
if(first == null)
first=lock_interceptor;
else
addInterceptor(first, lock_interceptor);
if(first == null)
first=call_interceptor;
else
addInterceptor(first, call_interceptor);
interceptor_chain=first;
if(log.isInfoEnabled())
log.info("interceptor chain is:\n" + printInterceptorChain(first));
}
private String printInterceptorChain(Interceptor i) {
StringBuffer sb=new StringBuffer();
if(i != null) {
if(i.getNext() != null) {
sb.append(printInterceptorChain(i.getNext())).append("\n");
}
sb.append(i.getClass());
}
return sb.toString();
}
private void addInterceptor(Interceptor first, Interceptor i) {
if(first == null) {
return;
}
do {
if(first.getNext() != null)
first=first.getNext();
else
break;
}
while(first != null); first.setNext(i);
}
private Interceptor createInterceptor(String classname) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
Class clazz=getClass().getClassLoader().loadClass(classname);
return (Interceptor)clazz.newInstance();
}
protected void createCacheLoader() throws Exception {
if(cache_loader == null && cache_loader_class != null) {
Class cl=Thread.currentThread().getContextClassLoader().loadClass(cache_loader_class);
cache_loader=(CacheLoader)cl.newInstance();
cache_loader.setConfig(cache_loader_config);
cache_loader.setCache(this);
cache_loader.create();
cache_loader.start();
}
}
protected void cacheLoaderPreload() throws Exception {
if(cache_loader != null) {
if(log.isTraceEnabled())
log.trace("preloading " + cache_loader_preload);
if(cache_loader_preload != null) {
for(Iterator it=cache_loader_preload.iterator(); it.hasNext();) {
Fqn fqn=(Fqn)it.next();
preload(fqn, true, true);
}
}
}
}
public void load(String fqn) throws Exception {
if(cache_loader != null)
preload(Fqn.fromString(fqn), true, true);
}
void preload(Fqn fqn, boolean preload_parents, boolean preload_children) throws Exception {
this.get(fqn, "bla");
if(preload_parents) {
Fqn tmp_fqn=new Fqn();
for(int i=0; i < fqn.size()-1; i++) {
tmp_fqn=new Fqn(tmp_fqn, fqn.get(i));
this.get(tmp_fqn, "bla");
}
}
if(preload_children == false)
return;
Set children=cache_loader.getChildrenNames(fqn);
if(children == null)
return;
for(Iterator it=children.iterator(); it.hasNext();) {
String child_name=(String)it.next();
Fqn child_fqn=new Fqn(fqn, child_name);
preload(child_fqn, false, true);
}
}
void destroyCacheLoader() {
if(cache_loader != null) {
cache_loader.stop();
cache_loader.destroy();
cache_loader=null;
}
}
protected boolean determineCoordinator() {
if(channel == null)
return false;
Object local_addr=getLocalAddress();
if(local_addr == null)
return false;
View view=channel.getView();
if(view == null) return false;
ViewId vid=view.getVid();
if(vid == null) return false;
Object coord=vid.getCoordAddress();
if(coord == null) return false;
return local_addr.equals(coord);
}
public Address getCoordinator() {
if(channel == null) return null;
View view=channel.getView();
if(view == null) return null;
ViewId vid=view.getVid();
if(vid == null) return null;
Address coord=vid.getCoordAddress();
return coord;
}
public byte[] getStateBytes() {
return this.getMessageListener().getState();
}
public void setStateBytes(byte[] state) {
this.getMessageListener().setState(state);
}
protected void fetchStateOnStartup() throws Exception {
long start, stop;
synchronized(stateLock) {
isStateSet=false;
start=System.currentTimeMillis();
boolean rc=channel.getState(null, state_fetch_timeout);
if(rc) {
while(!isStateSet) {
try {
stateLock.wait();
}
catch(InterruptedException iex) {
}
}
stop=System.currentTimeMillis();
log.info("state was retrieved successfully (in " + (stop-start) + " milliseconds)");
}
else
log.info("state could not be retrieved (must be first member in group)");
}
}
public void stopService() {
if(channel != null) {
log.info("stopService(): closing the channel");
channel.close();
channel=null;
}
if(disp != null) {
log.info("stopService(): stopping the dispatcher");
disp.stop();
disp=null;
}
if(members != null && members.size() > 0)
members.clear();
if(repl_queue != null)
repl_queue.stop();
destroyCacheLoader();
notifyCacheStopped();
listeners.clear();
}
public Node get(String fqn) throws CacheException {
return get(Fqn.fromString(fqn));
}
public Node get(Fqn fqn) throws CacheException {
MethodCall m=new MethodCall(getNodeMethodLocal, new Object[]{fqn});
return (Node)invokeMethod(m);
}
public Node _get(Fqn fqn) throws CacheException {
return findNode(fqn);
}
public Set getKeys(String fqn) throws CacheException {
return getKeys(Fqn.fromString(fqn));
}
public Set getKeys(Fqn fqn) throws CacheException {
MethodCall m=new MethodCall(getKeysMethodLocal, new Object[]{fqn});
return (Set)invokeMethod(m);
}
public Set _getKeys(Fqn fqn) throws CacheException {
Set retval=null;
Node n=findNode(fqn);
if(n == null)
return null;
retval=n.getDataKeys();
return retval != null? new LinkedHashSet(retval) : null;
}
public Object get(String fqn, Object key) throws CacheException {
return get(Fqn.fromString(fqn), key);
}
public Object get(Fqn fqn, Object key) throws CacheException {
return get(fqn, key, true);
}
public Object _get(Fqn fqn, Object key, boolean sendNodeEvent) throws CacheException {
if(log.isTraceEnabled())
log.trace("_get(" + ", \"" + fqn + "\", " + key + ", \"" +sendNodeEvent +"\")");
Node n=findNode(fqn);
if(n == null) return null;
if(sendNodeEvent)
notifyNodeVisisted(fqn);
return n.get(key);
}
protected Object get(Fqn fqn, Object key, boolean sendNodeEvent) throws CacheException {
MethodCall m=new MethodCall(getKeyValueMethodLocal, new Object[]{fqn, key, new Boolean(sendNodeEvent)});
return invokeMethod(m);
}
public Object peek(Fqn fqn, Object key) throws CacheException {
return get(fqn, key, false);
}
public boolean exists(String fqn) {
return exists(Fqn.fromString(fqn));
}
public boolean exists(Fqn fqn) {
Node n=findInternal(fqn);
return n != null;
}
private Node findInternal(Fqn fqn) {
if(fqn == null || fqn.size() == 0) return root;
Node n=root, retval=null;
Object obj;
for(int i=0; i < fqn.size(); i++) {
obj=fqn.get(i);
n=n.getChild(obj);
if(n == null)
return null;
else
retval=n;
}
return retval;
}
public boolean exists(String fqn, Object key) {
return exists(Fqn.fromString(fqn), key);
}
public boolean exists(Fqn fqn, Object key) {
Node n=findInternal(fqn);
if(n == null)
return false;
else
return n.containsKey(key);
}
public void put(String fqn, Map data) throws CacheException {
put(Fqn.fromString(fqn), data);
}
public void put(Fqn fqn, Map data) throws CacheException {
GlobalTransaction tx=getCurrentTransaction();
MethodCall m=new MethodCall(putDataMethodLocal, new Object[]{tx, fqn, data, Boolean.TRUE});
invokeMethod(m);
}
public Object put(String fqn, Object key, Object value) throws CacheException {
return put(Fqn.fromString(fqn), key, value);
}
public Object putFailFast(Fqn fqn, Object key, Object value, long timeout) throws CacheException {
GlobalTransaction tx=getCurrentTransaction();
MethodCall m=new MethodCall(putFailFastKeyValueMethodLocal,
new Object[]{tx, fqn, key, value, Boolean.TRUE, new Long(timeout)});
return invokeMethod(m);
}
public Object putFailFast(String fqn, Object key, Object value, long timeout) throws CacheException {
GlobalTransaction tx=getCurrentTransaction();
Fqn fqntmp=Fqn.fromString(fqn);
MethodCall m=new MethodCall(putFailFastKeyValueMethodLocal,
new Object[]{tx, fqntmp, key, value, Boolean.TRUE, new Long(timeout)});
return invokeMethod(m);
}
public Object put(Fqn fqn, Object key, Object value) throws CacheException {
GlobalTransaction tx=getCurrentTransaction();
MethodCall m=new MethodCall(putKeyValMethodLocal, new Object[]{tx, fqn, key, value, Boolean.TRUE});
return invokeMethod(m);
}
public void remove(String fqn) throws CacheException {
remove(Fqn.fromString(fqn));
}
public void remove(Fqn fqn) throws CacheException {
GlobalTransaction tx=getCurrentTransaction();
MethodCall m=new MethodCall(removeNodeMethodLocal, new Object[]{tx, fqn, Boolean.TRUE});
invokeMethod(m);
}
public void evict(Fqn fqn) throws CacheException {
MethodCall m=new MethodCall(evictNodeMethodLocal, new Object[]{fqn});
invokeMethod(m);
}