GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
lazy_dht.hpp
1 /*
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 /*
25  \author Yucheng Low (ylow)
26  An implementation of a distributed integer -> integer map with caching
27  capabilities.
28 
29 */
30 
31 #ifndef GRAPHLAB_LAZY_DHT_HPP
32 #define GRAPHLAB_LAZY_DHT_HPP
33 
34 #include <boost/unordered_map.hpp>
35 #include <boost/intrusive/list.hpp>
36 
37 #include <graphlab/rpc/dc.hpp>
38 #include <graphlab/parallel/pthread_tools.hpp>
39 #include <graphlab/util/synchronized_unordered_map.hpp>
40 #include <graphlab/util/dense_bitset.hpp>
41 
42 
43 
44 namespace graphlab {
45 
46  /**
47  \internal
48  \ingroup rpc
49 
50 
51  This implements a distributed key -> value map with caching
52  capabilities. It is up to the user to determine cache
53  invalidation policies. User explicitly calls the invalidate()
54  function to clear local cache entries. This is an extremely lazy
55  DHT in that it is up to the user to guarantee that the keys are
56  unique. Any machine can call set on any key, and the result of
57  the key will be stored locally. Reads on any unknown keys will be
58  resolved using a broadcast operation.
59  */
60 
61  template<typename KeyType, typename ValueType>
62  class lazy_dht{
63  public:
64 
65  typedef dc_impl::lru_list<KeyType, ValueType> lru_entry_type;
66  /// datatype of the data map
67  typedef boost::unordered_map<KeyType, ValueType> map_type;
68  /// datatype of the local cache map
69  typedef boost::unordered_map<KeyType, lru_entry_type* > cache_type;
70 
71  struct wait_struct {
72  mutex mut;
73  conditional cond;
74  ValueType val;
75  size_t numreplies;
76  bool hasvalue;
77  };
78 
79  typedef boost::intrusive::member_hook<lru_entry_type,
80  typename lru_entry_type::lru_member_hook_type,
81  &lru_entry_type::member_hook_> MemberOption;
82  /// datatype of the intrusive LRU list embedded in the cache map
83  typedef boost::intrusive::list<lru_entry_type,
84  MemberOption,
85  boost::intrusive::constant_time_size<false> > lru_list_type;
86 
87  /// Constructor. Creates the integer map.
88  lazy_dht(distributed_control &dc,
89  size_t max_cache_size = 65536):rmi(dc, this),data(11) {
90  cache.rehash(max_cache_size);
91  maxcache = max_cache_size;
92  logger(LOG_INFO, "%d Creating distributed_hash_table. Cache Limit = %d",
93  dc.procid(), maxcache);
94  reqs = 0;
95  misses = 0;
96  dc.barrier();
97  }
98 
99 
100  ~lazy_dht() {
101  data.clear();
102  typename cache_type::iterator i = cache.begin();
103  while (i != cache.end()) {
104  delete i->second;
105  ++i;
106  }
107  cache.clear();
108  }
109 
110 
111  /// Sets the key to the value
112  void set(const KeyType& key, const ValueType &newval) {
113  datalock.lock();
114  data[key] = newval;
115  datalock.unlock();
116  }
117 
118 
119  std::pair<bool, ValueType> get_owned(const KeyType &key) const {
120  std::pair<bool, ValueType> ret;
121  datalock.lock();
122  typename map_type::const_iterator iter = data.find(key);
123  if (iter == data.end()) {
124  ret.first = false;
125  }
126  else {
127  ret.first = true;
128  ret.second = iter->second;
129  }
130  datalock.unlock();
131  return ret;
132  }
133 
134  void remote_get_owned(const KeyType &key, procid_t source, size_t ptr) const {
135  std::pair<bool, ValueType> ret;
136  datalock.lock();
137  typename map_type::const_iterator iter = data.find(key);
138  if (iter == data.end()) {
139  ret.first = false;
140  }
141  else {
142  ret.first = true;
143  ret.second = iter->second;
144  }
145  datalock.unlock();
146  rmi.remote_call(source, &lazy_dht<KeyType,ValueType>::get_reply, ptr, ret.second, ret.first);
147  }
148 
149  void get_reply(size_t ptr, ValueType& val, bool hasvalue) {
150  wait_struct* w = reinterpret_cast<wait_struct*>(ptr);
151  w->mut.lock();
152  if (hasvalue) {
153  w->val = val;
154  w->hasvalue = true;
155  }
156  w->numreplies--;
157  if (w->numreplies == 0) w->cond.signal();
158  w->mut.unlock();
159 
160  }
161 
162  /** Gets the value associated with the key. returns true on success.. */
163  std::pair<bool, ValueType> get(const KeyType &key) const {
164  std::pair<bool, ValueType> ret = get_owned(key);
165  if (ret.first) return ret;
166 
167  wait_struct w;
168  w.numreplies = rmi.numprocs() - 1;
169  size_t ptr = reinterpret_cast<size_t>(&w);
170  // otherwise I need to find someone with the key
171  for (size_t i = 0;i < rmi.numprocs(); ++i) {
172  if (i != rmi.procid()) {
173  rmi.remote_call(i, &lazy_dht<KeyType,ValueType>::remote_get_owned, key, rmi.procid(), ptr);
174  }
175  }
176  w.mut.lock();
177  while (w.numreplies > 0) w.cond.wait(w.mut);
178  w.mut.unlock();
179  ret.first = w.hasvalue;
180  ret.second = w.val;
181  if (ret.first) update_cache(key, ret.second);
182  return ret;
183  }
184 
185 
186  /** Gets the value associated with the key, reading from cache if available
187  Note that the cache may be out of date. */
188  std::pair<bool, ValueType> get_cached(const KeyType &key) const {
189  std::pair<bool, ValueType> ret = get_owned(key);
190  if (ret.first) return ret;
191 
192  reqs++;
193  cachelock.lock();
194  // check if it is in the cache
195  typename cache_type::iterator i = cache.find(key);
196  if (i == cache.end()) {
197  // nope. not in cache. Call the regular get
198  cachelock.unlock();
199  misses++;
200  return get(key);
201  }
202  else {
203  // yup. in cache. return the value
204  ret.first = true;
205  ret.second = i->second->value;
206  // shift the cache entry to the head of the LRU list
207  lruage.erase(lru_list_type::s_iterator_to(*(i->second)));
208  lruage.push_front(*(i->second));
209  cachelock.unlock();
210  return ret;
211  }
212  }
213 
214  /// Invalidates the cache entry associated with this key
215  void invalidate(const KeyType &key) const{
216  cachelock.lock();
217  // is the key I am invalidating in the cache?
218  typename cache_type::iterator i = cache.find(key);
219  if (i != cache.end()) {
220  // drop it from the lru list
221  delete i->second;
222  cache.erase(i);
223  }
224  cachelock.unlock();
225  }
226 
227 
228  double cache_miss_rate() {
229  return double(misses) / double(reqs);
230  }
231 
232  size_t num_gets() const {
233  return reqs;
234  }
235  size_t num_misses() const {
236  return misses;
237  }
238 
239  size_t cache_size() const {
240  return cache.size();
241  }
242 
243  private:
244 
245  mutable dc_dist_object<lazy_dht<KeyType, ValueType> > rmi;
246 
247  mutex datalock;
248  map_type data; /// The actual table data that is distributed
249 
250 
251  mutex cachelock; /// lock for the cache datastructures
252  mutable cache_type cache; /// The cache table
253  mutable lru_list_type lruage; /// THe LRU linked list associated with the cache
254 
255 
256  procid_t numprocs; /// NUmber of processors
257  size_t maxcache; /// Maximum cache size allowed
258 
259  mutable size_t reqs;
260  mutable size_t misses;
261 
262 
263 
264 
265 
266  /// Updates the cache with this new value
267  void update_cache(const KeyType &key, const ValueType &val) const{
268  cachelock.lock();
269  typename cache_type::iterator i = cache.find(key);
270  // create a new entry
271  if (i == cache.end()) {
272  cachelock.unlock();
273  // if we are out of room, remove the lru entry
274  if (cache.size() >= maxcache) remove_lru();
275  cachelock.lock();
276  // insert the element, remember the iterator so we can push it
277  // straight to the LRU list
278  std::pair<typename cache_type::iterator, bool> ret = cache.insert(std::make_pair(key, new lru_entry_type(key, val)));
279  if (ret.second) lruage.push_front(*(ret.first->second));
280  }
281  else {
282  // modify entry in place
283  i->second->value = val;
284  // swap to front of list
285  //boost::swap_nodes(lru_list_type::s_iterator_to(i->second), lruage.begin());
286  lruage.erase(lru_list_type::s_iterator_to(*(i->second)));
287  lruage.push_front(*(i->second));
288  }
289  cachelock.unlock();
290  }
291 
292  /// Removes the least recently used element from the cache
293  void remove_lru() const{
294  cachelock.lock();
295  KeyType keytoerase = lruage.back().key;
296  // is the key I am invalidating in the cache?
297  typename cache_type::iterator i = cache.find(keytoerase);
298  if (i != cache.end()) {
299  // drop it from the lru list
300  delete i->second;
301  cache.erase(i);
302  }
303  cachelock.unlock();
304  }
305 
306  };
307 
308 }
309 #endif
310