/*
 * JBoss, the OpenSource J2EE webOS
 *
 * Distributable under LGPL license.
 * See terms of license at gnu.org.
 */
package org.jboss.cache;

import org.jboss.cache.eviction.LRUPolicy;
import org.jboss.cache.interceptors.Interceptor;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.lock.IsolationLevel;
import org.jboss.cache.lock.LockStrategyFactory;
import org.jboss.cache.lock.LockingException;
import org.jboss.cache.lock.TimeoutException;
import org.jboss.logging.Logger;
import org.jboss.system.ServiceMBeanSupport;
import org.jboss.util.NestedRuntimeException;
import org.jgroups.*;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.MethodCall;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
import org.jgroups.util.Util;
import org.w3c.dom.Attr;
import org.w3c.dom.Element;
import org.w3c.dom.NamedNodeMap;
import org.w3c.dom.NodeList;

import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.lang.reflect.Method;
import java.util.*;

/**
 * A tree-like structure that is replicated across several members. Updates will
 * be multicast to all group members reliably and in the same order. User has the
 * option to set transaction isolation level now (e.g., <code>SERIALIZABLE</code>, or
 * <code>REPEATABLE_READ</code>.
 *
 * @author Bela Ban
 * @author Ben Wang
 * @version $Id: TreeCache.java,v 1.158.2.6 2005/04/04 05:44:13 bwang00 Exp $
 * @jmx.mbean extends="org.jboss.system.ServiceMBean"
 * <p/>
 */
public class TreeCache extends ServiceMBeanSupport implements TreeCacheMBean, Cloneable, MembershipListener {
   protected Node root=new Node(SEPARATOR, Fqn.fromString(SEPARATOR), null, null, this);
   protected final Vector listeners=new Vector();
   protected JChannel channel=null;

   /** Am I the coordinator ? */
   protected boolean coordinator=false;

   protected String cluster_name="TreeCache-Group";
   protected String cluster_props=null;
   protected final Vector members=new Vector();
   protected RpcDispatcher disp=null;
   protected MessageListener ml=new MessageListenerAdaptor(this, log);
   protected long state_fetch_timeout=5000;
   protected long sync_repl_timeout=15000;
   protected boolean use_repl_queue=false;
   protected int repl_queue_max_elements=1000;
   protected long repl_queue_interval=5000;

   /** Maintains mapping of transactions (keys) and Modifications/Undo-Operations */
   private final TransactionTable tx_table=new TransactionTable();

   /** HashMap<Thread, List<Lock>, maintains locks acquired by threads (used when no TXs are used) */
   private final HashMap lock_table=new HashMap();

   protected boolean fetch_state_on_startup=true;
   protected long lock_acquisition_timeout=10000;
   protected String eviction_policy_class=null;
   protected TreeCacheListener eviction_policy_provider = null;
   protected int cache_mode=LOCAL;


   public static Method putDataMethodLocal=null;
   public static Method putDataEraseMethodLocal=null;
   public static Method putKeyValMethodLocal=null;
   public static Method putFailFastKeyValueMethodLocal=null;
   public static Method removeNodeMethodLocal=null;
   public static Method removeKeyMethodLocal=null;
   public static Method removeDataMethodLocal=null;
   public static Method evictNodeMethodLocal=null;
   // public static Method evictKeyValueMethodLocal=null;
   public static Method prepareMethod=null;
   public static Method commitMethod=null;
   public static Method rollbackMethod=null;
   public static Method replicateMethod=null;
   public static Method replicateAllMethod=null;
   // public static Method addChildMethod=null;
   public static Method addChildMethodLocal=null;
   public static Method getKeyValueMethodLocal=null;
   public static Method getNodeMethodLocal=null;
   public static Method getKeysMethodLocal=null;
   public static Method getChildrenNamesMethodLocal=null;
   public static Method releaseAllLocksMethodLocal=null;
   public static Method printMethodLocal=null;
   public static Method lockMethodLocal=null;

   static LinkedList    crud_methods=new LinkedList();
   protected boolean    isStateSet=false;
   private final Object stateLock=new Object();
   protected IsolationLevel isolationLevel=IsolationLevel.REPEATABLE_READ;

   /** Eviction policy configuration in xml Element */
   protected Element evictConfig_ = null;



   public MessageListener getMessageListener() {
      return ml;
   }

   /** {@link #invokeMethod(MethodCall)} will dispatch to this chain of interceptors.
    * In the future, this will be replaced with JBossAop. This is a first step towards refactoring JBossCache.
    */
   protected Interceptor interceptor_chain=null;


   /**
    * Interceptor which handles invocations of {@link #_replicate(MethodCall)}. Any such method
    * invocation is forwarded to the invoke_handler.<br/>
    * This will go away in the future, as we're moving replication functionality into the
    * ReplicationInterceptor itself
    */
   protected Replicatable replication_handler=null;


   /** Method to acquire a TransactionManager. By default we use JBossTransactionManagerLookup. Has
    * to be set before calling {@link #start()} */
   protected TransactionManagerLookup tm_lookup=null;

   /** Class of the implementation of TransactionManagerLookup */
   protected String tm_lookup_class=null;

   /** Used to get the Transaction associated with the current thread */
   protected TransactionManager tm=null;

   /** The fully qualified name of the CacheLoader (has to implement the CacheLoader interface) */
   protected String cache_loader_class=null;

   /** A reference to the CacheLoader. If null, we don't have a CachedLoader */
   protected CacheLoader cache_loader=null;

   /** The properties from which to configure the CacheLoader */
   protected Properties cache_loader_config=null;

   /** Are the CacheLoaders sharing the same resource or not ? */
   protected boolean cache_loader_shared=true;

   /** List<Fqn> of nodes to preload (if cache loader is enabled) */
   protected List cache_loader_preload=null;

   /** Fetches the transient state. Attribute fetch_cache_on_startup has to be true */
   protected boolean cache_loader_fetch_transient_state=true;

   /** Fetches the entire persistent state from the underlying CacheLoader. Only used if cache_loader_shared=false.
    * Attribute fetch_cache_on_startup has to be true */
   protected boolean cache_loader_fetch_persistent_state=true;

   /** synchronous or asynchrous commit phase ? */
   protected boolean sync_commit_phase=false;

   /** synchronous or asynchrous rollback phase ? */
   protected boolean sync_rollback_phase=false;

   protected boolean deadlockDetection=false;

   /** Queue used to replicate updates when mode is repl-async */
   protected ReplicationQueue repl_queue=null;

   public static final String SEPARATOR="/";

   /** Entries in the cache are by default local; ie. not replicated */
   public static final int LOCAL=1;

   /** Entries in the cache are by default replicated (asynchronously) */
   public static final int REPL_ASYNC=2;

   /** Entries in the cache are by default replicated (synchronously) */
   public static final int REPL_SYNC=3;

   static public final String UNINITIALIZED="jboss:internal:uninitialized"; // todo: move to CacheLoaderInterceptor
   static final String JNDI_LOCATOR_URI="socket://localhost:6789";


   static {
      try {
         putDataMethodLocal=TreeCache.class.getDeclaredMethod("_put",
                                                   new Class[]{GlobalTransaction.class,
                                                               Fqn.class,
                                                               Map.class,
                                                               boolean.class});
         putDataEraseMethodLocal=TreeCache.class.getDeclaredMethod("_put",
                                                         new Class[]{GlobalTransaction.class,
                                                                     Fqn.class,
                                                                     Map.class,
                                                                     boolean.class,
                                                                     boolean.class});
         putKeyValMethodLocal=TreeCache.class.getDeclaredMethod("_put",
                                                      new Class[]{GlobalTransaction.class,
                                                                  Fqn.class,
                                                                  Object.class,
                                                                  Object.class,
                                                                  boolean.class});
         putFailFastKeyValueMethodLocal=TreeCache.class.getDeclaredMethod("_put",
                                                                          new Class[]{GlobalTransaction.class,
                                                                          Fqn.class,
                                                                          Object.class,
                                                                          Object.class,
                                                                          boolean.class,
                                                                          long.class});
         removeNodeMethodLocal=TreeCache.class.getDeclaredMethod("_remove",
                                                      new Class[]{GlobalTransaction.class,
                                                                  Fqn.class,
                                                                  boolean.class});
         removeKeyMethodLocal=TreeCache.class.getDeclaredMethod("_remove",
                                                     new Class[]{GlobalTransaction.class,
                                                                 Fqn.class,
                                                                 Object.class,
                                                                 boolean.class});
         removeDataMethodLocal=TreeCache.class.getDeclaredMethod("_removeData",
                                                      new Class[]{GlobalTransaction.class,
                                                                  Fqn.class,
                                                                  boolean.class});
         evictNodeMethodLocal=TreeCache.class.getDeclaredMethod("_evict", new Class[] {Fqn.class});
         // evictKeyValueMethodLocal=TreeCache.class.getDeclaredMethod("_evict", new Class[]{Fqn.class, Object.class});
         prepareMethod=TreeCache.class.getDeclaredMethod("prepare",
                                                  new Class[]{GlobalTransaction.class,
                                                              List.class,
                                                              Address.class,
                                                              boolean.class});
         commitMethod=TreeCache.class.getDeclaredMethod("commit",
                                                 new Class[]{GlobalTransaction.class});
         rollbackMethod=TreeCache.class.getDeclaredMethod("rollback",
                                                   new Class[]{GlobalTransaction.class});
         addChildMethodLocal=TreeCache.class.getDeclaredMethod("_addChild",
                                                   new Class[]{GlobalTransaction.class,
                                                               Fqn.class, Object.class, Node.class});
         getKeyValueMethodLocal=TreeCache.class.getDeclaredMethod("_get",
                                                                  new Class[]{Fqn.class, Object.class, boolean.class});
         getNodeMethodLocal=TreeCache.class.getDeclaredMethod("_get", new Class[]{Fqn.class});
         getKeysMethodLocal=TreeCache.class.getDeclaredMethod("_getKeys", new Class[]{Fqn.class});
         getChildrenNamesMethodLocal=TreeCache.class.getDeclaredMethod("_getChildrenNames", new Class[]{Fqn.class});
         replicateMethod=TreeCache.class.getDeclaredMethod("_replicate", new Class[]{MethodCall.class});
         replicateAllMethod=TreeCache.class.getDeclaredMethod("_replicate", new Class[]{List.class});
         releaseAllLocksMethodLocal=TreeCache.class.getDeclaredMethod("_releaseAllLocks", new Class[]{Fqn.class});
         printMethodLocal=TreeCache.class.getDeclaredMethod("_print", new Class[]{Fqn.class});
         lockMethodLocal=TreeCache.class.getDeclaredMethod("_lock", new Class[]{Fqn.class,
                                                                                int.class,
                                                                                boolean.class});
      }
      catch(NoSuchMethodException ex) {
         ex.printStackTrace();
         throw new ExceptionInInitializerError(ex.toString());
      }

      crud_methods.add(putDataMethodLocal);
      crud_methods.add(putDataEraseMethodLocal);
      crud_methods.add(putKeyValMethodLocal);
      crud_methods.add(putFailFastKeyValueMethodLocal);
      crud_methods.add(removeNodeMethodLocal);
      crud_methods.add(removeKeyMethodLocal);
      crud_methods.add(removeDataMethodLocal);
   }




   public static boolean isCrudMethod(Method m) {
      return m == null? false : crud_methods.contains(m);
   }



   /**
    * Creates a channel with the given properties. Connects to the channel, then creates a PullPushAdapter
    * and starts it
    */
   public TreeCache(String cluster_name, String props, long state_fetch_timeout) throws Exception {
      super();
      if(cluster_name != null)
         this.cluster_name=cluster_name;
      if(props != null)
         this.cluster_props=props;
      this.state_fetch_timeout=state_fetch_timeout;
   }

   public TreeCache() throws Exception {
      super();
//      try {
//         Naming.rebind("//localhost:1098/" + this.getClusterName(), new RemoteTreeCacheImpl(this));
//      }
//      catch(Throwable t) {
//         log.error("Unable to bind remote tree cache implementation to '" + this.getClusterName() + "'.", t);
//      }
   }

   /**
    * Expects an already connected channel. Creates a PullPushAdapter and starts it
    */
   public TreeCache(JChannel channel) throws Exception {
      super();
      this.channel=channel;
   }

   /**
    * Used by interceptors. Don't use as client, will go away: interceptors will use TreeCacheImpl and clients
    * will only be able to use TreeCache (which will become an interface)
    * @return
    */
   public Node getRoot() {
      return root;
   }

   /**
    * @return
    * @jmx.managed-attribute access="read-only"
    */
   public Object getLocalAddress() {
      return channel != null ? channel.getLocalAddress() : null;
   }

   /**
    * @return
    * @jmx.managed-attribute access="read-only"
    */
   public Vector getMembers() {
      return members;
   }

   /**
    *
    * @return
    * @jmx.managed-attribute
    */
   public boolean isCoordinator() {
      return coordinator;
   }

   /**
    * Get the name of the replication group
    * @jmx.managed-attribute
    */
   public String getClusterName() {
      return cluster_name;
   }

   /**
    * Set the name of the replication group
    * @jmx.managed-attribute
    */
   public void setClusterName(String name) {
      cluster_name=name;
   }

   /**
    * Get the cluster properties (e.g. the protocol stack specification in case of JGroups)
    * @jmx.managed-attribute
    */
   public String getClusterProperties() {
      return cluster_props;
   }

   /**
    * Set the cluster properties. If the cache is to use the new properties, it has to be redeployed
    * @param cluster_props The properties for the cluster (JGroups)
    * @jmx.managed-attribute
    */
   public void setClusterProperties(String cluster_props) {
      this.cluster_props=cluster_props;
   }


   public TransactionTable getTransactionTable() {
      return tx_table;
   }


   public HashMap getLockTable() {
      return lock_table;
   }

   /**
    * Dumps the contents of the TransactionTable
    * @return
    * @jmx.managed-attribute
    */
   public String dumpTransactionTable() {
      return tx_table.toString(true);
   }

   /**
    *
    * @return
    * @jmx.managed-attribute
    */
   public boolean getDeadlockDetection() {
      return deadlockDetection;
   }

   /**
    *
    * @param dt
    * @jmx.managed-attribute
    */
   public void setDeadlockDetection(boolean dt) {
      deadlockDetection=dt;
      if(disp != null)
         disp.setDeadlockDetection(dt);
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public String getInterceptorChain() {
      String retval=printInterceptorChain(interceptor_chain);
      if(retval == null || retval.length() == 0)
         return "<empty>";
      else
         return retval;
   }


   /**
    * @return List<Interceptor>
    * @jmx.managed-attribute
    */
   public List getInterceptors() {
      if(interceptor_chain == null)
         return null;
      int num=1;
      Interceptor tmp=interceptor_chain;
      while((tmp=tmp.getNext()) != null) {
         num++;
      }
      List retval=new ArrayList(num);
      tmp=interceptor_chain;
      num=0;
      do {
         retval.add(tmp);
         tmp=tmp.getNext();
      }
      while(tmp != null);
      return retval;
   }




   /**
    * @return
    * @jmx.managed-attribute
    */
   public String getCacheLoaderClass() {
      return cache_loader_class;
   }

   /**
    *
    * @param cache_loader_class
    * @jmx.managed-attribute
    */
   public void setCacheLoaderClass(String cache_loader_class) {
      this.cache_loader_class=cache_loader_class;
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public Properties getCacheLoaderConfig() {
      return cache_loader_config;
   }

   /**
    * @param cache_loader_config
    * @jmx.managed-attribute
    */
   public void setCacheLoaderConfig(Properties cache_loader_config) {
      this.cache_loader_config=cache_loader_config;
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public CacheLoader getCacheLoader() {
      return cache_loader;
   }

   /**
    * @param cache_loader
    * @jmx.managed-attribute
    */
   public void setCacheLoader(CacheLoader cache_loader) {
      this.cache_loader=cache_loader;
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public boolean getCacheLoaderShared() {
      return cache_loader_shared;
   }

   /**
    * @param shared
    * @jmx.managed-attribute
    */
   public void setCacheLoaderShared(boolean shared) {
      this.cache_loader_shared=shared;
   }


   /**
    * @param list
    * @jmx.managed-attribute
    */
   public void setCacheLoaderPreload(String list) {
      if(list == null) return;
      ArrayList l;
      StringTokenizer st=new StringTokenizer(list, ",");
      String tok;
      Fqn    fqn;
      l=new ArrayList();
      while(st.hasMoreTokens()) {
         tok=st.nextToken();
         fqn=Fqn.fromString(tok.trim());
         l.add(fqn);
      }
      if(l.size() > 0)
         this.cache_loader_preload=l;
   }



   /**
    * @return
    * @jmx.managed-attribute
    */
   public String getCacheLoaderPreload() {
      return cache_loader_preload != null? cache_loader_preload.toString() : null;
   }

   /**
    * @param flag
    * @jmx.managed-attribute
    */
   public void setCacheLoaderFetchPersistentState(boolean flag) {
      cache_loader_fetch_persistent_state=flag;
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public boolean getCacheLoaderFetchPersistentState() {
      return cache_loader_fetch_persistent_state;
   }

   /**
    * @param flag
    * @jmx.managed-attribute
    */
   public void setCacheLoaderFetchTransientState(boolean flag) {
      cache_loader_fetch_transient_state=flag;
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public boolean getCacheLoaderFetchTransientState() {
      return cache_loader_fetch_transient_state;
   }


   /**
    * @return
    * @jmx.managed-attribute
    */
   public boolean getSyncCommitPhase() {
      return sync_commit_phase;
   }

   /**
    * @param sync_commit_phase
    * @jmx.managed-attribute
    */
   public void setSyncCommitPhase(boolean sync_commit_phase) {
      this.sync_commit_phase=sync_commit_phase;
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public boolean getSyncRollbackPhase() {
      return sync_rollback_phase;
   }

   /**
    * @param sync_rollback_phase
    * @jmx.managed-attribute
    */
   public void setSyncRollbackPhase(boolean sync_rollback_phase) {
      this.sync_rollback_phase=sync_rollback_phase;
   }


   /**
    * Setup eviction policy configuration
    * @jmx.managed-attribute access="write-only"
    */
   public void setEvictionPolicyConfig(Element config) {
      evictConfig_ = config;
      log.info("setEvictionPolicyConfig(): " +config);
   }

   public Element getEvictionPolicyConfig() {
      return evictConfig_;
   }

   /**
    * Convert a list of elements to the JG property string
    * @jmx.managed-attribute access="write-only"
    */
   public void setClusterConfig(Element config) {
      StringBuffer buffer=new StringBuffer();
      NodeList stack=config.getChildNodes();
      int length=stack.getLength();

      for(int s=0; s < length; s++) {
         org.w3c.dom.Node node=stack.item(s);
         if(node.getNodeType() != org.w3c.dom.Node.ELEMENT_NODE)
            continue;

         Element tag=(Element)node;
         String protocol=tag.getTagName();
         buffer.append(protocol);
         NamedNodeMap attrs=tag.getAttributes();
         int attrLength=attrs.getLength();
         if(attrLength > 0)
            buffer.append('(');
         for(int a=0; a < attrLength; a++) {
            Attr attr=(Attr)attrs.item(a);
            String name=attr.getName();
            String value=attr.getValue();
            buffer.append(name);
            buffer.append('=');
            buffer.append(value);
            if(a < attrLength - 1)
               buffer.append(';');
         }
         if(attrLength > 0)
            buffer.append(')');
         buffer.append(':');
      }
      // Remove the trailing ':'
      buffer.setLength(buffer.length() - 1);
      setClusterProperties(buffer.toString());
      log.info("setting cluster properties from xml to: " + cluster_props);
   }




   /**
    * Get the max time to wait until the initial state is retrieved.
    * This is used in a replicating cache: when a new cache joins the cluster,
    * it needs to acquire the (replicated) state of the other members to
    * initialize itself. If no state has been received within <tt>timeout</tt>
    * milliseconds, the map will be empty.
    *
    * @return long Number of milliseconds to wait for the state. 0 means to wait forever.
    * @jmx.managed-attribute
    */
   public long getInitialStateRetrievalTimeout() {
      return state_fetch_timeout;
   }

   /**
    * Set the initial state transfer timeout
    * (see {@link #getInitialStateRetrievalTimeout()})
    * @jmx.managed-attribute
    */
   public void setInitialStateRetrievalTimeout(long timeout) {
      state_fetch_timeout=timeout;
   }

   /**
    * Returns the current caching mode. Valid values are
    * <ul>
    * <li>LOCAL
    * <li>REPL_ASYNC
    * <li>REPL_SYNC
    * <ul>
    * @return String The caching mode
    * @jmx.managed-attribute
    */
   public String getCacheMode() {
      return mode2String(cache_mode);
   }

   public int getCacheModeInternal() {
      return cache_mode;
   }

   private String mode2String(int mode) {
      switch(mode) {
         case LOCAL:
            return "LOCAL";
         case REPL_ASYNC:
            return "REPL_ASYNC";
         case REPL_SYNC:
            return "REPL_SYNC";
         default:
            throw new RuntimeException("setCacheMode(): caching mode " + mode + " is invalid");
      }
   }

   /**
    * Sets the default caching mode)
    * @jmx.managed-attribute
    */
   public void setCacheMode(String mode) throws Exception {
      int m=string2Mode(mode);
      setCacheMode(m);
   }


   /**
    * Sets the default cache mode. Valid arguments are
    * <ol>
    * <li>TreeCache.LOCAL
    * <li>TreeCache.REPL_ASYNC
    * <li>TreeCache.REPL_SYNC
    * </ol>
    * @param mode
    */
   public void setCacheMode(int mode) {
      if(mode == LOCAL || mode == REPL_ASYNC || mode == REPL_SYNC)
         this.cache_mode=mode;
      else
         throw new IllegalArgumentException("setCacheMode(): caching mode " + mode + " is invalid");
   }


   /**
    * Returns the default max timeout after which synchronous replication calls return.
    * @return long Number of milliseconds after which a sync repl call must return. 0 means to wait forever
    * @jmx.managed-attribute
    */
   public long getSyncReplTimeout() {
      return sync_repl_timeout;
   }

   /**
    * Sets the default maximum wait time for synchronous replication to receive all results
    * @jmx.managed-attribute
    */
   public void setSyncReplTimeout(long timeout) {
      sync_repl_timeout=timeout;
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public boolean getUseReplQueue() {
      return use_repl_queue;
   }

   /**
    * @param flag
    * @jmx.managed-attribute
    */
   public void setUseReplQueue(boolean flag) {
      use_repl_queue=flag;
      if(flag) {
         if(repl_queue == null) {
            repl_queue=new ReplicationQueue(this, repl_queue_interval, repl_queue_max_elements);
            if(repl_queue_interval >= 0)
               repl_queue.start();
         }
      }
      else {
         if(repl_queue != null) {
            repl_queue.stop();
            repl_queue=null;
         }
      }
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public long getReplQueueInterval() {
      return repl_queue_interval;
   }

   /**
    * @param interval
    * @jmx.managed-attribute
    */
   public void setReplQueueInterval(long interval) {
      this.repl_queue_interval=interval;
      if(repl_queue != null)
         repl_queue.setInterval(interval);
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public int getReplQueueMaxElements() {
      return repl_queue_max_elements;
   }

   /**
    * @param max_elements
    * @jmx.managed-attribute
    */
   public void setReplQueueMaxElements(int max_elements) {
      this.repl_queue_max_elements=max_elements;
      if(repl_queue != null)
         repl_queue.setMax_elements(max_elements);
   }


   public ReplicationQueue getReplQueue() {
      return repl_queue;
   }

   /**
    * Returns the transaction isolation level.
    * @jmx.managed-attribute
    */
   public String getIsolationLevel() {
      return isolationLevel.toString();
   }

   /**
    * Set the transaction isolation level. This determines the locking strategy to be used
    * @jmx.managed-attribute
    */
   public void setIsolationLevel(String level) {
      IsolationLevel tmp_level=IsolationLevel.stringToIsolationLevel(level);

      if(tmp_level == null) {
         throw new IllegalArgumentException("TreeCache.setIsolationLevel(): level \"" + level + "\" is invalid");
      }
      setIsolationLevel(tmp_level);
   }

   /**
    * @param level
    */
   public void setIsolationLevel(IsolationLevel level) {
      isolationLevel=level;
      LockStrategyFactory.setIsolationLevel(level);
   }

   public IsolationLevel getIsolationLevelClass() {
      return isolationLevel;
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public boolean getFetchStateOnStartup() {
      return fetch_state_on_startup;
   }


   /**
    * @param flag
    * @jmx.managed-attribute
    */
   public void setFetchStateOnStartup(boolean flag) {
      fetch_state_on_startup=flag;
   }


   /**
    * Default max time to wait for a lock. If the lock cannot be acquired within this time, a LockingException will be thrown.
    * @return long Max number of milliseconds to wait for a lock to be acquired
    * @jmx.managed-attribute
    */
   public long getLockAcquisitionTimeout() {
      return lock_acquisition_timeout;
   }

   /**
    * Set the max time for lock acquisition. A value of 0 means to wait forever (not recomended).
    * Note that lock acquisition timeouts may be removed in the future when we have deadlock detection.
    * @param timeout
    * @jmx.managed-attribute
    */
   public void setLockAcquisitionTimeout(long timeout) {
      this.lock_acquisition_timeout=timeout;
   }



   /**
    * Returns the name of the cache eviction policy (must be an implementation of EvictionPolicy)
    * @return Fully qualified name of a class implementing the EvictionPolicy interface
    * @jmx.managed-attribute
    */
   public String getEvictionPolicyClass() {
      return eviction_policy_class;
   }

   /**
    * Sets the classname of the eviction policy
    * @jmx.managed-attribute
    */
   public void setEvictionPolicyClass(String eviction_policy_class) {
      if(eviction_policy_class == null || eviction_policy_class.length() ==0)
         return;
      try {
         this.eviction_policy_class=eviction_policy_class;
         eviction_policy_provider =(TreeCacheListener)
               getClass().getClassLoader().loadClass(eviction_policy_class).newInstance();
         this.addTreeCacheListener(eviction_policy_provider );
      }
      catch(Throwable t) {
         log.error("setEvictionPolicyClass(): failed creating instance of  " + eviction_policy_class, t);
      }
   }

   /**
    * Obtain eviction thread (if any) wake up interval in seconds
    * @jmx.managed-attribute
    */
   public int getEvictionThreadWakeupIntervalSeconds() {
      if( eviction_policy_provider == null ) return -1;
      else
         return ((LRUPolicy)eviction_policy_provider).getWakeupIntervalSeconds();
   }


   /**
    * Sets the TransactionManagerLookup object
    * @param l
    * @jmx.managed-attribute
    */
   public void setTransactionManagerLookup(TransactionManagerLookup l) {
      this.tm_lookup=l;
   }


   /**
    * @return
    * @jmx.managed-attribute
    */
   public String getTransactionManagerLookupClass() {
      return tm_lookup_class;
   }

   /**
    * Sets the class of the TransactionManagerLookup impl. This will attempt to create an
    * instance, and will throw an exception if this fails.
    * @param cl
    * @throws Exception
    * @jmx.managed-attribute
    */
   public void setTransactionManagerLookupClass(String cl) throws Exception {
      this.tm_lookup_class=cl;
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public TransactionManager getTransactionManager() {
      return tm;
   }

   /**
    * @return
    * @jmx.managed-attribute
    */
   public TreeCache getInstance() {
      return this;
   }



   public void setReplicationHandler(Replicatable handler) {
      replication_handler=handler;
   }

   public Replicatable getReplicationHandler() {
      return replication_handler;
   }

   /**
    * Fetch the group state from the current coordinator. If successful, this will trigger setState().
    * @jmx.managed-operation
    */
   public void fetchState(long timeout) throws ChannelClosedException, ChannelNotConnectedException {
      if(channel == null)
         throw new ChannelNotConnectedException();
      boolean rc=channel.getState(null, timeout);
      if(rc)
         log.info("fetchState(): state was retrieved successfully");
      else
         log.info("fetchState(): state could not be retrieved (first member)");
   }

   /**
    * @param listener
    * @jmx.managed-operation
    */
   public void addTreeCacheListener(TreeCacheListener listener) {
      if(!listeners.contains(listener))
         listeners.addElement(listener);
   }

   /**
    * @param listener
    * @jmx.managed-operation
    */
   public void removeTreeCacheListener(TreeCacheListener listener) {
      listeners.removeElement(listener);
   }

   /* --------------------------- MBeanSupport ------------------------- */

   /**
    *
    * @throws Exception
    * @jmx.managed-operation
    */
   public void createService() throws Exception {
   }

   /**
    * @jmx.managed-operation
    */
   public void destroyService() {
   }


   /**
    *
    * @throws Exception
    * @jmx.managed-operation
    */
   public void startService() throws Exception {
      if(this.tm_lookup == null && this.tm_lookup_class != null) {
         Class clazz=Thread.currentThread().getContextClassLoader().loadClass(this.tm_lookup_class);
         this.tm_lookup=(TransactionManagerLookup)clazz.newInstance();
      }

      try {
         if(tm_lookup != null)
            tm=tm_lookup.getTransactionManager();
         else
            log.warn("No transaction manager lookup class has been defined. Transactions cannot be used");
      }
      catch(Exception e) {
         log.debug("failed looking up TransactionManager, will not use transactions", e);
      }

      // Create the cache loader (needs to be created regardless of mode (local/replicated). This method will
      // be a no-op if no cache loader has been configured
      createCacheLoader();

      // Create the interceptors in the correct order (later to be defined in XML file)
      createInterceptorChain();

      createEvictionPolicy();

      switch(cache_mode) {
         case LOCAL:
            log.info("cache mode is local, will not create the channel");
            break;
         case REPL_SYNC:
         case REPL_ASYNC:
            log.info("cache mode is " + mode2String(cache_mode));
            if(channel != null) { // already started
               log.info("channel is already running");
               return;
            }
            if(cluster_props == null) {
               cluster_props=getDefaultProperties();
               log.debug("setting cluster properties to default value");
            }

            channel=new JChannel(cluster_props);
            channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
            channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
            if(log.isTraceEnabled())
               log.trace("cache properties: " + cluster_props);
            channel.setOpt(Channel.GET_STATE_EVENTS, Boolean.TRUE);
            disp=new RpcDispatcher(channel, ml, this, this);
            disp.setDeadlockDetection(deadlockDetection);
            channel.connect(cluster_name);
            if(fetch_state_on_startup) {
               fetchStateOnStartup();
            }
            break;
         default:
            throw new IllegalArgumentException("cache mode " + cache_mode + " is invalid");
      }

      // does the preloading
      cacheLoaderPreload();

      coordinator=determineCoordinator();
      notifyCacheStarted();
   }

   private void createEvictionPolicy() {
      // Configure if eviction policy is set
      if(eviction_policy_provider != null)
         ((LRUPolicy)eviction_policy_provider).configure(this);
   }

   /**
    * Assembles the interceptor stack. Presence and order of interceptors is determined by looking at
    * the cache configuration. In the future, this will be accessible through XML. See refactoring.txt for
    * details. An alternative might be to use a simple rools engine to assemble the stack.
    * Creates either:
    * <pre>
    *
    * CallInterceptor
    * LockInterceptor
    * [CacheLoaderInterceptor]
    * [ReplicationInterceptor]
    * [CacheStoreInterceptor]
    *
    * or
    *
    * CallInterceptor
    * LockInterceptor
    * [CacheStoreInterceptor]
    * [CacheLoaderInterceptor]
    * [ReplicationInterceptor]
    *
    * </pre>
    * CallInterceptor is always present at the top, the others may or may not be present
    */
   protected void createInterceptorChain() throws IllegalAccessException, InstantiationException, ClassNotFoundException {
      Interceptor call_interceptor=null;
      Interceptor lock_interceptor=null;
      // Interceptor create_if_not_exists_interceptor=null;
      Interceptor repl_interceptor=null;
      Interceptor cache_loader_interceptor=null;
      Interceptor cache_store_interceptor=null;
      Interceptor unlock_interceptor=null;
      Interceptor first=null;

      call_interceptor=createInterceptor("org.jboss.cache.interceptors.CallInterceptor");
      call_interceptor.setCache(this);

      lock_interceptor=createInterceptor("org.jboss.cache.interceptors.LockInterceptor");
      lock_interceptor.setCache(this);

      //create_if_not_exists_interceptor=createInterceptor("org.jboss.cache.interceptors.CreateIfNotExistsInterceptor");
      //create_if_not_exists_interceptor.setCache(this);

      unlock_interceptor=createInterceptor("org.jboss.cache.interceptors.UnlockInterceptor");
      unlock_interceptor.setCache(this);

      if(cache_mode != LOCAL) {
         repl_interceptor=createInterceptor("org.jboss.cache.interceptors.ReplicationInterceptor");
         repl_interceptor.setCache(this);
      }

      if(cache_loader_class != null || cache_loader != null) {
         cache_loader_interceptor=createInterceptor("org.jboss.cache.interceptors.CacheLoaderInterceptor");
         cache_loader_interceptor.setCache(this);
         cache_store_interceptor=createInterceptor("org.jboss.cache.interceptors.CacheStoreInterceptor");
         cache_store_interceptor.setCache(this);
      }


      // create the stack from the bottom up
      if(cache_loader_interceptor != null) {
         if(cache_loader_shared == true) {
            if(first == null)
               first=cache_store_interceptor;
            else
               addInterceptor(first, cache_store_interceptor);
         }
      }

      if(repl_interceptor != null) {
         if(first == null)
            first=repl_interceptor;
         else
            addInterceptor(first, repl_interceptor);
      }

      if(unlock_interceptor != null) {
         if(first == null)
            first=unlock_interceptor;
         else
            addInterceptor(first, unlock_interceptor);
      }

      if(cache_loader_interceptor != null) {
         if(cache_loader_shared == true) {
            if(first == null)
               first=cache_loader_interceptor;
            else
               addInterceptor(first, cache_loader_interceptor);
         }
         else {
            if(first == null)
               first=cache_loader_interceptor;
            else
               addInterceptor(first, cache_loader_interceptor);
            if(first == null)
               first=cache_store_interceptor;
            else
               addInterceptor(first, cache_store_interceptor);
         }
      }

      //if(first == null)
        // first=create_if_not_exists_interceptor;
      //else
        // addInterceptor(first, create_if_not_exists_interceptor);

      if(first == null)
         first=lock_interceptor;
      else
         addInterceptor(first, lock_interceptor);

      if(first == null)
         first=call_interceptor;
      else
         addInterceptor(first, call_interceptor);

      interceptor_chain=first;
      if(log.isInfoEnabled())
         log.info("interceptor chain is:\n" + printInterceptorChain(first));
   }




   private String printInterceptorChain(Interceptor i) {
      StringBuffer sb=new StringBuffer();
      if(i != null) {
         if(i.getNext() != null) {
            sb.append(printInterceptorChain(i.getNext())).append("\n");
         }
         sb.append(i.getClass());
      }
      return sb.toString();
   }


   /** Adds an interceptor at the end of the chain */
   private void addInterceptor(Interceptor first, Interceptor i) {
      if(first == null) {
         return;
      }
      do {
         if(first.getNext() != null)
            first=first.getNext();
         else
            break;
      }
      while(first != null); // findbugs has a false positive for an NPE here...
      first.setNext(i);
   }


   private Interceptor createInterceptor(String classname) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
      Class clazz=getClass().getClassLoader().loadClass(classname);
      return (Interceptor)clazz.newInstance();
   }


//

   /** Creates an instance of a CacheLoader if and only if
    * <ul>
    * <li>The CacheLoader has not yet been created
    * <li>cache_loader_class is set
    * <li>The CacheLoader is shared and we are <em>not</em> the coordinator (only the coordinator
    *     is supposed to have a CacheLoader)
    * </ul>
    * @throws Exception
    */
   protected void createCacheLoader() throws Exception {
      if(cache_loader == null && cache_loader_class != null) {
         Class cl=Thread.currentThread().getContextClassLoader().loadClass(cache_loader_class);
         cache_loader=(CacheLoader)cl.newInstance();
         cache_loader.setConfig(cache_loader_config);
         cache_loader.setCache(this);
         cache_loader.create();
         cache_loader.start();
      }
   }

   protected void cacheLoaderPreload() throws Exception {
      if(cache_loader != null) {
         if(log.isTraceEnabled())
            log.trace("preloading " + cache_loader_preload);
         if(cache_loader_preload != null) {
            for(Iterator it=cache_loader_preload.iterator(); it.hasNext();) {
               Fqn fqn=(Fqn)it.next();
               preload(fqn, true, true);
            }
         }
      }
   }


   /**
    * Loads the indicated Fqn, plus all parents recursively from the CacheLoader. If no CacheLoader is present,
    * this is a no-op
    * @param fqn
    * @throws Exception
    * @jmx.managed-operation
    */
   public void load(String fqn) throws Exception {
      if(cache_loader != null)
         preload(Fqn.fromString(fqn), true, true);
   }

   void preload(Fqn fqn, boolean preload_parents, boolean preload_children) throws Exception {

      // 1. Load the attributes first
      this.get(fqn, "bla");

      // 2. Then load the parents
      if(preload_parents) {
         Fqn tmp_fqn=new Fqn();
         for(int i=0; i < fqn.size()-1; i++) {
            tmp_fqn=new Fqn(tmp_fqn, fqn.get(i));
            this.get(tmp_fqn, "bla");
         }
      }

      if(preload_children == false)
         return;

      // 3. Then recursively for all child nodes, preload them as well
      Set children=cache_loader.getChildrenNames(fqn);
      if(children == null)
         return;
      for(Iterator it=children.iterator(); it.hasNext();) {
         String child_name=(String)it.next();
         Fqn child_fqn=new Fqn(fqn, child_name);
         preload(child_fqn, false, true);
      }
   }

   void destroyCacheLoader() {
      if(cache_loader != null) {
         cache_loader.stop();
         cache_loader.destroy();
         cache_loader=null;
      }
   }

   protected boolean determineCoordinator() {
      if(channel == null)
         return false;
      Object local_addr=getLocalAddress();
      if(local_addr == null)
         return false;
      View view=channel.getView();
      if(view == null) return false;
      ViewId vid=view.getVid();
      if(vid == null) return false;
      Object coord=vid.getCoordAddress();
      if(coord == null) return false;
      return local_addr.equals(coord);
   }

   public Address getCoordinator() {
      if(channel == null) return null;
      View view=channel.getView();
      if(view == null) return null;
      ViewId vid=view.getVid();
      if(vid == null) return null;
      Address coord=vid.getCoordAddress();
      return coord;
   }

   public byte[] getStateBytes() {
      return this.getMessageListener().getState();
   }

   public void setStateBytes(byte[] state) {
      this.getMessageListener().setState(state);
   }

   protected void fetchStateOnStartup() throws Exception {
      long start, stop;
      synchronized(stateLock) {
         isStateSet=false;
         start=System.currentTimeMillis();
         boolean rc=channel.getState(null, state_fetch_timeout);
         if(rc) {
            while(!isStateSet) {
               try {
                  stateLock.wait();
               }
               catch(InterruptedException iex) {
               }
            }
            stop=System.currentTimeMillis();
            log.info("state was retrieved successfully (in " + (stop-start) + " milliseconds)");
         }
         else
            log.info("state could not be retrieved (must be first member in group)");
      }
   }

   /**
    * @jmx.managed-operation
    */
   public void stopService() {
      if(channel != null) {
         log.info("stopService(): closing the channel");
         channel.close();
         channel=null;
      }
      if(disp != null) {
         log.info("stopService(): stopping the dispatcher");
         disp.stop();
         disp=null;
      }
      if(members != null && members.size() > 0)
         members.clear();

      if(repl_queue != null)
         repl_queue.stop();

      destroyCacheLoader();

      notifyCacheStopped();

      // Need to clean up listeners as well
      listeners.clear();
   }

   /* ----------------------- End of MBeanSupport ----------------------- */

   /**
    * @param fqn fqn String name to retrieve from cache
    * @return Node corresponding to the fqn. Null if does not exist. No guarantees wrt replication,
    * cache loading are given if the underlying node is modified
    */
   public Node get(String fqn) throws CacheException {
      return get(Fqn.fromString(fqn));
   }

   /**
    * @param fqn fqn instance to retrieve from cache
    * @return Node corresponding to the fqn. Null if does not exist. No guarantees wrt replication,
    * cache loading are given if the underlying node is modified
    */
   public Node get(Fqn fqn) throws CacheException {
      MethodCall m=new MethodCall(getNodeMethodLocal, new Object[]{fqn});
      return (Node)invokeMethod(m);
   }

   public Node _get(Fqn fqn) throws CacheException {
      return findNode(fqn);
   }


   /**
    * @param fqn
    * @return
    * @jmx.managed-operation
    */
   public Set getKeys(String fqn) throws CacheException {
      return getKeys(Fqn.fromString(fqn));
   }

   /**
    * @param fqn
    * @return A Set<String> of keys. This is a copy of the key set, modifications will not be written to the original.
    * Returns null if the node is not found, or the node has no attributes
    * @jmx.managed-operation
    */
   public Set getKeys(Fqn fqn) throws CacheException {
      MethodCall m=new MethodCall(getKeysMethodLocal, new Object[]{fqn});
      return (Set)invokeMethod(m);
   }


   public Set _getKeys(Fqn fqn) throws CacheException {
      Set retval=null;
      Node n=findNode(fqn);
      if(n == null)
         return null;
      retval=n.getDataKeys();
      return retval != null? new LinkedHashSet(retval) : null;
   }

   /**
    * Finds a node given its name and returns the value associated with a given key in its <code>data</code>
    * map. Returns null if the node was not found in the tree or the key was not found in the hashmap.
    *
    * @param fqn The fully qualified name of the node.
    * @param key The key.
    * @jmx.managed-operation
    */
   public Object get(String fqn, Object key) throws CacheException {
      return get(Fqn.fromString(fqn), key);
   }


   /**
    * Finds a node given its name and returns the value associated with a given key in its <code>data</code>
    * map. Returns null if the node was not found in the tree or the key was not found in the hashmap.
    *
    * @param fqn The fully qualified name of the node.
    * @param key The key.
    * @jmx.managed-operation
    */
   public Object get(Fqn fqn, Object key) throws CacheException {
      return get(fqn, key, true);
   }

   public Object _get(Fqn fqn, Object key, boolean sendNodeEvent) throws CacheException {
      if(log.isTraceEnabled())
         log.trace("_get(" + ", \"" + fqn + "\", " + key + ", \"" +sendNodeEvent +"\")");
      Node n=findNode(fqn);
      if(n == null) return null;
      if(sendNodeEvent)
         notifyNodeVisisted(fqn);
      return n.get(key);
   }


   protected Object get(Fqn fqn, Object key, boolean sendNodeEvent) throws CacheException {
      MethodCall m=new MethodCall(getKeyValueMethodLocal, new Object[]{fqn, key, new Boolean(sendNodeEvent)});
      return invokeMethod(m);
   }

   /**
    * Like <code>get()</code> method but without triggering a node visit event. This is used
    * to prevent refresh of the cache data in the eviction policy.
    * @param fqn
    * @param key
    * @return
    */
   public Object peek(Fqn fqn, Object key) throws CacheException {
      return get(fqn, key, false);
   }



   /**
    * Checks whether a given node exists in the tree
    *
    * @param fqn The fully qualified name of the node
    * @return boolean Whether or not the node exists
    * @jmx.managed-operation
    */
   public boolean exists(String fqn) {
      return exists(Fqn.fromString(fqn));
   }


   /**
    * Checks whether a given node exists in the tree. Does not acquire any locks in doing so (result may be dirty read)
    * @param fqn The fully qualified name of the node
    * @return boolean Whether or not the node exists
    * @jmx.managed-operation
    */
   public boolean exists(Fqn fqn) {
      Node n=findInternal(fqn);
      return n != null;
   }

   /**
    * Gets node without attempt to load it from CacheLoader if not present
    * @param fqn
    * @return
    */
   private Node findInternal(Fqn fqn) {
      if(fqn == null || fqn.size() == 0) return root;
      Node n=root, retval=null;
      Object obj;
      for(int i=0; i < fqn.size(); i++) {
         obj=fqn.get(i);
         n=n.getChild(obj);
         if(n == null)
            return null;
         else
            retval=n;
      }
      return retval;
   }


   /**
    *
    * @param fqn
    * @param key
    * @return
    * @jmx.managed-operation
    */
   public boolean exists(String fqn, Object key) {
      return exists(Fqn.fromString(fqn), key);
   }


   /**
    * Checks whether a given key exists in the given node. Does not interact with CacheLoader, so the behavior is
    * different from {@link #get(Fqn,Object)}
    * @param fqn The fully qualified name of the node
    * @param key
    * @return boolean Whether or not the node exists
    * @jmx.managed-operation
    */
   public boolean exists(Fqn fqn, Object key) {
      Node n=findInternal(fqn);
      if(n == null)
         return false;
      else
         return n.containsKey(key);
    }


   /**
    * Adds a new node to the tree and sets its data. If the node doesn not yet exist, it will be created.
    * Also, parent nodes will be created if not existent. If the node already has data, then the new data
    * will override the old one. If the node already existed, a nodeModified() notification will be generated.
    * Otherwise a nodeCreated() motification will be emitted.
    *
    * @param fqn  The fully qualified name of the new node
    * @param data The new data. May be null if no data should be set in the node.
    * @jmx.managed-operation
    */
   public void put(String fqn, Map data) throws CacheException {
      put(Fqn.fromString(fqn), data);
   }

   /**
    * Adds a new node to the tree and sets its data. If the node doesn not yet exist, it will be created.
    * Also, parent nodes will be created if not existent. If the node already has data, then the new data
    * will override the old one. If the node already existed, a nodeModified() notification will be generated.
    * Otherwise a nodeCreated() motification will be emitted.
    *
    * @param fqn  The fully qualified name of the new node
    * @param data The new data. May be null if no data should be set in the node.
    * @jmx.managed-operation
    */
   public void put(Fqn fqn, Map data) throws CacheException {
      GlobalTransaction tx=getCurrentTransaction();
      MethodCall m=new MethodCall(putDataMethodLocal, new Object[]{tx, fqn, data, Boolean.TRUE});
      invokeMethod(m);
   }

   /**
    * Adds a key and value to a given node. If the node doesn't exist, it will be created. If the node
    * already existed, a nodeModified() notification will be generated. Otherwise a
    * nodeCreated() motification will be emitted.
    *
    * @param fqn   The fully qualified name of the node
    * @param key   The key
    * @param value The value
    * @return Object The previous value (if any), if node was present
    * @jmx.managed-operation
    */
   public Object put(String fqn, Object key, Object value) throws CacheException {
      return put(Fqn.fromString(fqn), key, value);
   }


   /**
    * Put with the following properties:
    * <ol>
    * <li>Fails fast (after timeout milliseconds)
    * <li>If replication is used: replicates <em>asynchronously</em>, overriding a potential synchronous mode
    * </ol>
    * This method should be used without running in a transaction (suspend()/resume() before calling it)
    * @param fqn The fully qualified name of the node
    * @param key
    * @param value
    * @param timeout Number of milliseconds to wait until a lock has been acquired. A TimeoutException will
    * be thrown if not successful. 0 means to wait forever
    * @return
    * @throws CacheException
    * @deprecated This is a kludge created specifically form the Hibernate 3.0 release. This method should
    * <em>not</em> be used by any application. The methodV will likely be removed in a future release
    */
   public Object putFailFast(Fqn fqn, Object key, Object value, long timeout) throws CacheException {
      GlobalTransaction tx=getCurrentTransaction();
      MethodCall m=new MethodCall(putFailFastKeyValueMethodLocal,
                                  new Object[]{tx, fqn, key, value, Boolean.TRUE, new Long(timeout)});
      return invokeMethod(m);
   }

   /**
    *
    * @param fqn
    * @param key
    * @param value
    * @param timeout
    * @return
    * @throws CacheException
    * @deprecated
    */
   public Object putFailFast(String fqn, Object key, Object value, long timeout) throws CacheException {
      GlobalTransaction tx=getCurrentTransaction();
      Fqn fqntmp=Fqn.fromString(fqn);
      MethodCall m=new MethodCall(putFailFastKeyValueMethodLocal,
                                  new Object[]{tx, fqntmp, key, value, Boolean.TRUE, new Long(timeout)});
      return invokeMethod(m);
   }

   /**
    * Adds a key and value to a given node. If the node doesn't exist, it will be created. If the node
    * already existed, a nodeModified() notification will be generated. Otherwise a
    * nodeCreated() motification will be emitted.
    *
    * @param fqn   The fully qualified name of the node
    * @param key   The key
    * @param value The value
    * @return Object The previous value (if any), if node was present
    * @jmx.managed-operation
    */
   public Object put(Fqn fqn, Object key, Object value) throws CacheException {
      GlobalTransaction tx=getCurrentTransaction();
      MethodCall m=new MethodCall(putKeyValMethodLocal, new Object[]{tx, fqn, key, value, Boolean.TRUE});
      return invokeMethod(m);
   }

   /**
    * Removes the node from the tree.
    *
    * @param fqn The fully qualified name of the node.
    * @jmx.managed-operation
    */
   public void remove(String fqn) throws CacheException {
      remove(Fqn.fromString(fqn));
   }

   /**
    * Removes the node from the tree.
    *
    * @param fqn The fully qualified name of the node.
    * @jmx.managed-operation
    */
   public void remove(Fqn fqn) throws CacheException {
      GlobalTransaction tx=getCurrentTransaction();
      MethodCall m=new MethodCall(removeNodeMethodLocal, new Object[]{tx, fqn, Boolean.TRUE});
      invokeMethod(m);
   }

   /**
    * Called by eviction policy provider. Note that eviction is done only in local mode,
    * that is, it doesn't replicate the node removal. This is will cause the replcation nodes
    * not synchronizing, but it is ok since user is supposed to add the node again when get is
    * null. After that, the contents will be in sync.
    * @param fqn Will remove everythign assoicated with this fqn.
    * @throws CacheException
    * @jmx.managed-operation
    */
   public void evict(Fqn fqn) throws CacheException {
      MethodCall m=new MethodCall(evictNodeMethodLocal, new Object[]{fqn});
      invokeMethod(m);
   }


   /**
    * Evicts a key/value pair from a node's attributes. Note that this is <em>local</em>, will not be replicated.
    * @param fqn
    * @param key
    * @throws CacheException
    * @jmx.managed-operation
    */
//   public void evict(Fqn fqn, Object key) throws CacheException {