GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
caching_dht.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 
25 
26 /*
27  \author Yucheng Low (ylow), Joseph Gonzalez (jegonzal)
28  An implementation of a distributed integer -> integer map with caching
29  capabilities.
30 
31 */
32 
33 #ifndef GRAPHLAB_CACHING_DHT_HPP
34 #define GRAPHLAB_CACHING_DHT_HPP
35 #include <boost/unordered_map.hpp>
36 #include <boost/intrusive/list.hpp>
37 #include <boost/functional/hash.hpp>
38 
39 #include <graphlab/rpc/dc.hpp>
40 #include <graphlab/parallel/pthread_tools.hpp>
41 #include <graphlab/util/synchronized_unordered_map.hpp>
42 #include <graphlab/util/dense_bitset.hpp>
43 
44 namespace graphlab {
45 
46  namespace dc_impl {
47  /**
48  * \internal
49  * \ingroup rpc
50  A cache entry for the caching_dht.
51  Boost intrusive is used to provide the LRU capabilities here
52  */
53  template<typename KeyType, typename ValueType>
54  class lru_list {
55  public:
56 
57  KeyType key; /// the key assiciated with this cache entry
58  ValueType value; /// the value assiciated with this cache entry
59  typedef boost::intrusive::list_member_hook<
60  boost::intrusive::link_mode<boost::intrusive::auto_unlink> >
61  lru_member_hook_type;
62 
63  lru_member_hook_type member_hook_;
64  ~lru_list() { }
65  explicit lru_list(const KeyType& k = KeyType(), const ValueType &v = ValueType()) : key(k), value(v) {
66  }
67  };
68 
69  } // namespace dc_impl
70 
71  /**
72  * \internal
73  * \ingroup rpc
74  This implements a limited distributed key -> value map with caching capabilities
75  It is up to the user to determine cache invalidation policies. User explicitly
76  calls the invalidate() function to clear local cache entries
77  */
78  template<typename KeyType, typename ValueType>
79  class caching_dht{
80  public:
81 
82  typedef dc_impl::lru_list<KeyType, ValueType> lru_entry_type;
83  /// datatype of the data map
84  typedef boost::unordered_map<KeyType, ValueType> map_type;
85  /// datatype of the local cache map
86  typedef boost::unordered_map<KeyType, lru_entry_type* > cache_type;
87 
88 
89  typedef boost::intrusive::member_hook<lru_entry_type,
90  typename lru_entry_type::lru_member_hook_type,
91  &lru_entry_type::member_hook_> MemberOption;
92  /// datatype of the intrusive LRU list embedded in the cache map
93  typedef boost::intrusive::list<lru_entry_type,
94  MemberOption,
95  boost::intrusive::constant_time_size<false> > lru_list_type;
96 
97 
98  private:
99  mutable dc_dist_object<caching_dht<KeyType, ValueType> > rpc;
100 
101  mutex datalock;
102  map_type data; /// The actual table data that is distributed
103 
104  mutex cachelock; /// lock for the cache datastructures
105  mutable cache_type cache; /// The cache table
106  mutable lru_list_type lruage; /// THe LRU linked list associated with the cache
107 
108 
109  procid_t numprocs; /// NUmber of processors
110  size_t maxcache; /// Maximum cache size allowed
111 
112  mutable size_t reqs;
113  mutable size_t misses;
114 
115  boost::hash<KeyType> hasher;
116 
117  public:
118 
119  /// Constructor. Creates the integer map.
120  caching_dht(distributed_control &dc,
121  size_t max_cache_size = 1024):rpc(dc, this),data(11) {
122  cache.rehash(max_cache_size);
123  maxcache = max_cache_size;
124  logger(LOG_INFO, "%d Creating distributed_hash_table. Cache Limit = %d",
125  dc.procid(), maxcache);
126  reqs = 0;
127  misses = 0;
128  }
129 
130 
131  ~caching_dht() {
132  data.clear();
133  typename cache_type::iterator i = cache.begin();
134  while (i != cache.end()) {
135  delete i->second;
136  ++i;
137  }
138  cache.clear();
139  }
140 
141 
142  /// Sets the key to the value
143  void set(const KeyType& key, const ValueType &newval) {
144  size_t hashvalue = hasher(key);
145  size_t owningmachine = hashvalue % rpc.dc().numprocs();
146  if (owningmachine == rpc.dc().procid()) {
147  datalock.lock();
148  data[key] = newval;
149  datalock.unlock();
150  } else {
151  rpc.remote_call(owningmachine,
152  &caching_dht<KeyType,ValueType>::set,
153  key,
154  newval);
155  update_cache(key, newval);
156  }
157  }
158 
159 
160  /** Gets the value associated with the key. returns true on success.. */
161  std::pair<bool, ValueType> get(const KeyType &key) const {
162  // figure out who owns the key
163  size_t hashvalue = hasher(key);
164  size_t owningmachine = hashvalue % rpc.dc().numprocs();
165 
166  std::pair<bool, ValueType> ret;
167  // if I own the key, get it from the map table
168  if (owningmachine == rpc.dc().procid()) {
169  datalock.lock();
170  typename map_type::const_iterator iter = data.find(key);
171  if (iter == data.end()) {
172  ret.first = false;
173  } else {
174  ret.first = true;
175  ret.second = iter->second;
176  }
177  datalock.unlock();
178  } else {
179  ret = rpc.remote_request(owningmachine,
180  &caching_dht<KeyType,ValueType>::get,
181  key);
182  if (ret.first) update_cache(key, ret.second);
183  else invalidate(key);
184  }
185  return ret;
186  }
187 
188 
189  /** Gets the value associated with the key, reading from cache if available
190  Note that the cache may be out of date. */
191  std::pair<bool, ValueType> get_cached(const KeyType &key) const {
192  // if this is to my current machine, just get it and don't go to cache
193  size_t hashvalue = hasher(key);
194  size_t owningmachine = hashvalue % rpc.dc().numprocs();
195  if (owningmachine == rpc.dc().procid()) return get(key);
196 
197 
198  reqs++;
199  cachelock.lock();
200  // check if it is in the cache
201  typename cache_type::iterator i = cache.find(key);
202  if (i == cache.end()) {
203  // nope. not in cache. Call the regular get
204  cachelock.unlock();
205  misses++;
206  return get(key);
207  }
208  else {
209  // yup. in cache. return the value
210  std::pair<bool, ValueType> ret;
211  ret.first = true;
212  ret.second = i->second->value;
213  // shift the cache entry to the head of the LRU list
214  lruage.erase(lru_list_type::s_iterator_to(*(i->second)));
215  lruage.push_front(*(i->second));
216  cachelock.unlock();
217  return ret;
218  }
219  }
220 
221  /// Invalidates the cache entry associated with this key
222  void invalidate(const KeyType &key) const{
223  cachelock.lock();
224  // is the key I am invalidating in the cache?
225  typename cache_type::iterator i = cache.find(key);
226  if (i != cache.end()) {
227  // drop it from the lru list
228  delete i->second;
229  cache.erase(i);
230  }
231  cachelock.unlock();
232  }
233 
234 
235  double cache_miss_rate() {
236  return double(misses) / double(reqs);
237  }
238 
239  size_t num_gets() const {
240  return reqs;
241  }
242  size_t num_misses() const {
243  return misses;
244  }
245 
246  size_t cache_size() const {
247  return cache.size();
248  }
249 
250  private:
251 
252 
253 
254  /// Updates the cache with this new value
255  void update_cache(const KeyType &key, const ValueType &val) const{
256  cachelock.lock();
257  typename cache_type::iterator i = cache.find(key);
258  // create a new entry
259  if (i == cache.end()) {
260  cachelock.unlock();
261  // if we are out of room, remove the lru entry
262  if (cache.size() >= maxcache) remove_lru();
263  cachelock.lock();
264  // insert the element, remember the iterator so we can push it
265  // straight to the LRU list
266  std::pair<typename cache_type::iterator, bool> ret = cache.insert(std::make_pair(key, new lru_entry_type(key, val)));
267  if (ret.second) lruage.push_front(*(ret.first->second));
268  } else {
269  // modify entry in place
270  i->second->value = val;
271  // swap to front of list
272  //boost::swap_nodes(lru_list_type::s_iterator_to(i->second), lruage.begin());
273  lruage.erase(lru_list_type::s_iterator_to(*(i->second)));
274  lruage.push_front(*(i->second));
275  }
276  cachelock.unlock();
277  }
278 
279  /// Removes the least recently used element from the cache
280  void remove_lru() const{
281  cachelock.lock();
282  KeyType keytoerase = lruage.back().key;
283  // is the key I am invalidating in the cache?
284  typename cache_type::iterator i = cache.find(keytoerase);
285  if (i != cache.end()) {
286  // drop it from the lru list
287  delete i->second;
288  cache.erase(i);
289  }
290  cachelock.unlock();
291  }
292 
293  };
294 
295 }
296 #endif
297