GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
coherent_dht.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef GRAPHLAB_COHERENT_DHT_HPP
25 #define GRAPHLAB_COHERENT_DHT_HPP
26 
27 #include <boost/unordered_map.hpp>
28 #include <boost/intrusive/list.hpp>
29 #include <boost/function.hpp>
30 
31 #include <graphlab/rpc/dc.hpp>
32 #include <graphlab/parallel/pthread_tools.hpp>
33 #include <graphlab/util/dense_bitset.hpp>
34 #include <graphlab/util/timer.hpp>
35 namespace graphlab {
36 
37  namespace dc_impl {
38 
39 #define COHERENT_DHT_COMPRESSED_HASH 32768
40 #define COHERENT_DHT_SUBSCRIBE_IF_ACCESSES_PER_INVALIDATE 10
41  /**
42  * \internal
43  * \ingroup rpc
44  A cache entry for the coherent_dht.
45  Boost intrusive is used to provide the LRU capabilities here
46  */
47  template<typename KeyType, typename ValueType>
48  class coherent_lru_list{
49  public:
50 
51  KeyType key; /// the key assiciated with this cache entry
52  ValueType value; /// the value associated with this cache entry
53  uint32_t accesses;
54  typedef boost::intrusive::list_member_hook<
55  boost::intrusive::link_mode<boost::intrusive::auto_unlink> >
56  lru_member_hook_type;
57 
58  lru_member_hook_type member_hook_;
59  ~coherent_lru_list() { }
60  explicit coherent_lru_list(const KeyType& k = KeyType(), const ValueType &v = ValueType())
61  : key(k), value(v),accesses(0) { }
62  };
63 
64  } // namespace dc_impl
65 
66  /**
67  * \ingroup rpc
68  This implements a cache coherent distributed hash table.
69 
70  Each machine has a part of the hash table as well as a cache. The system
71  implements automatic cache invalidation as well as automatic cache subscription
72  (currently through a rather poor heuristic).
73  \warning The implementation is extremely experimental. Use at your own risk
74 
75  */
76  template<typename KeyType, typename ValueType>
77  class coherent_dht{
78  public:
79 
80  /// \cond GRAPHLAB_INTERNAL
81 
82  typedef dc_impl::coherent_lru_list<KeyType, ValueType> lru_entry_type;
83  /** datatype of the data map. maps from key to the value */
84  typedef boost::unordered_map<KeyType, ValueType> map_type;
85  /// datatype of the local cache map
86  typedef boost::unordered_map<KeyType, lru_entry_type* > cache_type;
87 
88 
89  typedef boost::intrusive::member_hook<lru_entry_type,
90  typename lru_entry_type::lru_member_hook_type,
91  &lru_entry_type::member_hook_> MemberOption;
92  /// datatype of the intrusive LRU list embedded in the cache map
93  typedef boost::intrusive::list<lru_entry_type,
94  MemberOption,
95  boost::intrusive::constant_time_size<false> > lru_list_type;
96  /// \endcond
97 
98  private:
99 
101 
102  map_type data; /// The actual table data that is distributed
103  mutex datalock;
104 
105 
106 
107  mutex finegrained_lock[COHERENT_DHT_COMPRESSED_HASH];
108  dense_bitset subscription[COHERENT_DHT_COMPRESSED_HASH];
109 
110 
111 
112  mutex cachelock; /// lock for the cache datastructures
113  mutable cache_type cache; /// The cache table
114  mutable lru_list_type lruage; /// THe LRU linked list associated with the cache
115 
116 
117  size_t maxcache; /// Maximum cache size allowed
118 
119  mutable size_t reqs;
120  mutable size_t misses;
121 
122 
123 
124  boost::hash<KeyType> hasher;
125 
126 
127  /** Sets the key to the value
128  * if the key belongs to a remote machine.
129  * It is guaranteed that if the current machine sets a key to a new value
130  * subsequent reads will never return the previous value. (i.e. it will
131  * return the new value or later values set by other processors).
132  */
133  void set_impl(const KeyType& key, const ValueType &newval,
134  procid_t source) {
135  size_t hashvalue = hasher(key);
136  size_t compressedhash = hashvalue % COHERENT_DHT_COMPRESSED_HASH;
137  size_t owningmachine = hashvalue % rpc.dc().numprocs();
138  if (owningmachine == rpc.dc().procid()) {
139  // use the full lock to get the iterator
140  datalock.lock();
141  typename map_type::iterator iter = data.find(key);
142  // if the key does not exist, create the key.
143  if (iter == data.end()) {
144  finegrained_lock[compressedhash].lock();
145  data[key] = newval;
146  finegrained_lock[compressedhash].unlock();
147  datalock.unlock();
148  }
149  else {
150  // the key exists! switch to the fine grained lock and set the
151  // value
152  datalock.unlock();
153  finegrained_lock[compressedhash].lock();
154  iter->second = newval;
155  finegrained_lock[compressedhash].unlock();
156  }
157  push_changes(key, true, source);
158  }
159  else {
160  rpc.remote_call(owningmachine,
162  key,
163  newval,
164  source);
165  update_cache(key, newval);
166  }
167  }
168 
169 
170  /**
171  Forces synchronization of this key
172  This operation is synchronous. When this function returns
173  all machines are guarnateed to have the updated value
174  */
175  void set_synchronous_impl(const KeyType& key, const ValueType &newval,
176  procid_t source) {
177  size_t hashvalue = hasher(key);
178  size_t compressedhash = hashvalue % COHERENT_DHT_COMPRESSED_HASH;
179  size_t owningmachine = hashvalue % rpc.dc().numprocs();
180  if (owningmachine == rpc.dc().procid()) {
181  // use the full lock to get the iterator
182  datalock.lock();
183  typename map_type::iterator iter = data.find(key);
184  // if the key does not exist, create the key.
185  if (iter == data.end()) {
186  finegrained_lock[compressedhash].lock();
187  data[key] = newval;
188  finegrained_lock[compressedhash].unlock();
189  datalock.unlock();
190  }
191  else {
192  // the key exists! switch to the fine grained lock and set the
193  // value
194  datalock.unlock();
195  finegrained_lock[compressedhash].lock();
196  iter->second = newval;
197  finegrained_lock[compressedhash].unlock();
198  }
199  push_changes(key, false, source);
200  }
201  else {
202  rpc.remote_request(owningmachine,
204  key,
205  newval,
206  source);
207  update_cache(key, newval);
208  }
209  }
210 
211 
212  public:
213  /**
214  * \brief Creates a coherent distributed hash table
215  *
216  * \param dc distributed control to use for communication
217  * \param max_cache_size Size of cache on local machine
218  */
220  size_t max_cache_size = 1024):rpc(dc, this),data(11) {
221 
222  cache.rehash(max_cache_size);
223  maxcache = max_cache_size;
224  logger(LOG_INFO, "%d Creating distributed_hash_table. Cache Limit = %d",
225  dc.procid(), maxcache);
226  reqs = 0;
227  misses = 0;
228 
229  for (size_t i = 0;i < COHERENT_DHT_COMPRESSED_HASH; ++i) {
230  subscription[i].resize(dc.numprocs());
231  subscription[i].clear();
232  }
233  dc.barrier();
234  }
235 
236 
237  ~coherent_dht() {
238  data.clear();
239  typename cache_type::iterator i = cache.begin();
240  while (i != cache.end()) {
241  delete i->second;
242  ++i;
243  }
244  cache.clear();
245  }
246 
247  /**
248  * \brief Sets the value of a key in the background.
249  *
250  * This function sets the value of a key, but uses background communication
251  * to change the key value. When this function returns, it is not guaranteed
252  * that all machines have the updated value.
253  */
254  void set(const KeyType& key, const ValueType &newval) {
255  set_impl(key, newval, rpc.procid());
256  }
257 
258  /**
259  * \brief Sets the value of a key.
260  */
261  void set_synchronous(const KeyType& key, const ValueType &newval) {
262  set_synchronous_impl(key, newval, rpc.procid());
263  }
264 
265  /** Gets the value associated with the key. returns true on success.
266  * get will read from the cache if data is already available in the cache.
267  * If not, get will obtain the data from across the network
268  */
269  std::pair<bool, ValueType> get(const KeyType &key) const {
270  // if this is to my current machine, just get it and don't go to cache
271  procid_t owningmachine = owning_machine(key);
272  if (owningmachine == rpc.dc().procid()) return get_non_cached(key);
273 
274  reqs++;
275  cachelock.lock();
276  // check if it is in the cache
277  typename cache_type::iterator i = cache.find(key);
278  if (i == cache.end()) {
279  // nope. not in cache. Call the regular get
280  cachelock.unlock();
281  misses++;
282  return get_non_cached(key);
283  }
284  else {
285  // yup. in cache. return the value
286  std::pair<bool, ValueType> ret;
287  ret.first = true;
288  ret.second = i->second->value;
289  i->second->accesses++;
290  // shift the cache entry to the head of the LRU list
291  lruage.erase(lru_list_type::s_iterator_to(*(i->second)));
292  lruage.push_front(*(i->second));
293  cachelock.unlock();
294  return ret;
295  }
296  }
297 
298  /**
299  Returns the machine responsible for storing the key
300  */
301  procid_t owning_machine(const KeyType &key) const {
302  size_t hashvalue = hasher(key);
303  size_t owningmachine = hashvalue % rpc.dc().numprocs();
304  return owningmachine;
305  }
306 
307  /**
308  Returns true of the key is currently in the cache
309  */
310  bool in_cache(const KeyType &key) const {
311  // if this is to my current machine, just get it and don't go to cache
312  procid_t owningmachine = owning_machine(key);
313  if (owningmachine == rpc.dc().procid()) return true;
314  cachelock.lock();
315  // check if it is in the cache
316  typename cache_type::iterator i = cache.find(key);
317  if (i != cache.end()) {
318  cachelock.unlock();
319  return true;
320  }
321  cachelock.unlock();
322  return false;
323  }
324 
325  /**
326  Puts out a prefetch request for this key.
327  */
328  bool asynchronous_get(const KeyType &key) const {
329  if (in_cache(key)) return true;
330 
331  procid_t owningmachine = owning_machine(key);
332  rpc.remote_call(owningmachine,
334  key,
335  rpc.procid());
336  return false;
337  }
338 
339  /// Returns the number of misses divided by the number of requests
340  double cache_miss_rate() {
341  return double(misses) / double(reqs);
342  }
343  /// Returns the number of requests
344  size_t num_gets() const {
345  return reqs;
346  }
347  /// Returns the number of cache misses
348  size_t num_misses() const {
349  return misses;
350  }
351  /// Returns the current size of the cache
352  size_t cache_size() const {
353  return cache.size();
354  }
355 
356  /**
357  Subscribes to this key. This key will be a permanent entry
358  in the cache and can not be invalidated. Key modifications
359  are automatically sent to this machine.
360  */
361  void subscribe(const KeyType &key, bool async = false) const{
362  procid_t owningmachine = owning_machine(key);
363  // do not subscribe if this is my machine
364  if (owningmachine == rpc.dc().procid()) return;
365  if (async) {
366  rpc.remote_call(owningmachine,
368  key,
369  rpc.dc().procid());
370  }
371  else {
372  rpc.remote_request(owningmachine,
374  key,
375  rpc.dc().procid());
376  }
377  }
378 
379  /// Invalidates the cache entry associated with this key
380  void invalidate(const KeyType &key) const{
381  bool haschanges = false;
382  bool isincache = false;
383  cachelock.lock();
384  // is the key I am invalidating in the cache?
385  typename cache_type::iterator i = cache.find(key);
386  if (i != cache.end()) {
387  // drop it from the lru list
388  // if it is frequently accessed, don't invalidate it but subscribe.
389  if (i->second->accesses >= COHERENT_DHT_SUBSCRIBE_IF_ACCESSES_PER_INVALIDATE) {
390  subscribe(key, false);
391 
392  isincache = true;
393  haschanges = true;
394  }
395  else {
396  delete i->second;
397  haschanges = true;
398  cache.erase(i);
399  }
400  }
401  cachelock.unlock();
402  }
403 
404 
405 
406  private:
407 
408  /**
409  Push the current value of the key to all machines.
410  If async=true, when this call returns, all machines are guaranteed to have
411  the most up to date value of the key.
412  */
413  void push_changes(const KeyType& key, bool async, procid_t ignoreproc) {
414  size_t hashvalue = hasher(key);
415  size_t compressedhash = hashvalue % COHERENT_DHT_COMPRESSED_HASH;
416  size_t owningmachine = hashvalue % rpc.dc().numprocs();
417 
418  if (owningmachine == rpc.procid()) {
419  // switch to finegrained lock
420  finegrained_lock[compressedhash].lock();
421  typename map_type::iterator iter = data.find(key);
422  finegrained_lock[compressedhash].unlock();
423  assert(iter != data.end());
424  if (async) {
425  update_cache_coherency_set(compressedhash, key, iter->second, ignoreproc);
426  }
427  else {
428  update_cache_coherency_set_synchronous(compressedhash, key, iter->second, ignoreproc);
429  }
430  }
431  else {
432  // key is not on this machine. Get the owning machine to do it
433  if (async) {
434  rpc.remote_call(owningmachine,
435  &coherent_dht<KeyType, ValueType>::push_changes,
436  key,
437  async,
438  ignoreproc);
439  }
440  else {
441  rpc.remote_request(owningmachine,
442  &coherent_dht<KeyType, ValueType>::push_changes,
443  key,
444  async,
445  ignoreproc);
446  }
447  }
448  }
449 
450 
451  void update_cache_from_remote(const KeyType &key, const ValueType &val) const {
452  return update_cache(key, val);
453  }
454  /** Updates the internal cache with this new value. The cache
455  * entry is also moved to the head of the LRU list
456  */
457  void update_cache(const KeyType &key, const ValueType &val) const{
458 
459  cachelock.lock();
460  typename cache_type::iterator i = cache.find(key);
461  // create a new entry
462  if (i == cache.end()) {
463  cachelock.unlock();
464  // if we are out of room, remove the lru entry
465  if (cache.size() >= maxcache) remove_lru();
466  cachelock.lock();
467  // insert the element, remember the iterator so we can push it
468  // straight to the LRU list
469  std::pair<typename cache_type::iterator, bool> ret = cache.insert(std::make_pair(key, new lru_entry_type(key, val)));
470  if (ret.second) lruage.push_front(*(ret.first->second));
471  }
472  else {
473  // modify entry in place
474  i->second->value = val;
475  // swap to front of list
476  //boost::swap_nodes(lru_list_type::s_iterator_to(i->second), lruage.begin());
477  lruage.erase(lru_list_type::s_iterator_to(*(i->second)));
478  lruage.push_front(*(i->second));
479  }
480  cachelock.unlock();
481  }
482 
483 
484  /// Removes the least recently used element from the cache
485  void remove_lru() const{
486  cachelock.lock();
487  KeyType keytoerase = lruage.back().key;
488  // is the key I am invalidating in the cache?
489  typename cache_type::iterator i = cache.find(keytoerase);
490  if (i != cache.end()) {
491  // drop it from the lru list
492  delete i->second;
493  cache.erase(i);
494  }
495  cachelock.unlock();
496  }
497 
498 
499 
500 
501 
502 
503  /**
504  * Gets the true value of this key
505  */
506  std::pair<bool, ValueType> get_non_cached(const KeyType &key) const {
507  // figure out who owns the key
508  procid_t owningmachine = owning_machine(key);
509  std::pair<bool, ValueType> ret;
510  // if I own the key, get it from the map table
511  if (owningmachine == rpc.dc().procid()) {
512  datalock.lock();
513  typename map_type::const_iterator iter = data.find(key);
514  datalock.unlock();
515  if (iter == data.end()) {
516  ret.first = false;
517  }
518  else {
519  ret.first = true;
520  ret.second = iter->second;
521  }
522  }
523  else {
524  ret = rpc.remote_request(owningmachine,
525  &coherent_dht<KeyType,ValueType>::get_non_cached,
526  key);
527  if (ret.first) update_cache(key, ret.second);
528  else invalidate(key);
529  }
530  return ret;
531  }
532 
533 
534  /**
535  * Called when the value changes. This updates the cache of all machines.
536  * All machines subscribed to this key get updated, while all
537  * machines not subscribed to this key get invalidated
538  */
539  void update_cache_coherency_set(size_t compressedhash,
540  const KeyType &key,
541  const ValueType &value,
542  procid_t except = procid_t(-1)) {
543  // broadcast invalidate
544  for (procid_t i = 0;i < rpc.dc().numprocs(); ++i) {
545  if (i != rpc.dc().procid() && i != except) {
546  if (subscription[compressedhash].get(i)) {
547  rpc.remote_call(i,
548  &coherent_dht<KeyType,ValueType>::update_cache_from_remote,
549  key, value);
550  }
551  else {
552  rpc.remote_call(i, &coherent_dht<KeyType,ValueType>::invalidate, key);
553  }
554  }
555  }
556  }
557 
558  void invalidate_reply(const KeyType &key,
559  procid_t source, size_t reply) const {
560  // logstream(LOG_INFO) << "Invalidate of " << key << std::endl;
561  invalidate(key);
562  if (source != procid_t(-1)) {
563  rpc.dc().remote_call(source, reply_increment_counter,
564  reply, dc_impl::blob());
565  }
566  }
567 
568  void update_cache_reply(const KeyType &key, const ValueType &value,
569  procid_t source, size_t reply) const {
570  // logstream(LOG_INFO) << "Update of " << key << std::endl;
571  update_cache(key, value);
572  if (source != procid_t(-1)) {
573  rpc.dc().remote_call(source, reply_increment_counter,
574  reply, dc_impl::blob());
575  }
576  }
577 
578  void update_cache_coherency_set_synchronous(size_t compressedhash,
579  const KeyType &key,
580  const ValueType &value,
581  procid_t except = procid_t(-1)) {
582  // broadcast invalidate
583  dc_impl::reply_ret_type repret(true, rpc.numprocs() - 1);
584  if (except < rpc.numprocs() && except != rpc.procid()) repret.flag.dec();
585 
586  size_t r = reinterpret_cast<size_t>(&repret);
587  for (procid_t i = 0;i < rpc.numprocs(); ++i) {
588  if (i != rpc.procid() && i != except) {
589  if (subscription[compressedhash].get(i)) {
590  rpc.remote_call(i,
591  &coherent_dht<KeyType,ValueType>::update_cache_reply,
592  key, value,
593  rpc.procid(), r);
594  }
595  else {
596  rpc.remote_call(i, &coherent_dht<KeyType,ValueType>::invalidate_reply,
597  key, rpc.procid(), r);
598  }
599  }
600  }
601  repret.wait();
602  }
603 
604  void asychronous_get_handler(const KeyType &key, procid_t source) {
605  std::pair<bool, ValueType> ret = get_non_cached(key);
606  if (ret.first) {
607  rpc.remote_call(source,
608  &coherent_dht<KeyType, ValueType>::update_cache_reply,
609  key, ret.second,
610  procid_t(-1),
611  0);
612  }
613  }
614 
615  /**
616  * Subscribes the source machine to this key.
617  * Naturally this key must exist.
618  * We send an update to the source machine upon subscription
619  */
620  void register_subscription(const KeyType &key, procid_t source) {
621  size_t hashvalue = hasher(key);
622  size_t compressedhash = hashvalue % COHERENT_DHT_COMPRESSED_HASH;
623  subscription[compressedhash].set_bit(source);
624 
625  std::pair<bool, ValueType> val = get_non_cached(key);
626  ASSERT_TRUE(val.first);
627  rpc.remote_call(source,
628  &coherent_dht<KeyType,ValueType>::update_cache_from_remote,
629  key, val.second);
630  }
631 
632  /**
633  * Subscribes the source machine to this key.
634  * Naturally this key must exist.
635  * We send an update to the source machine upon subscription
636  */
637  void register_subscription_synchronous(const KeyType &key, procid_t source) {
638  size_t hashvalue = hasher(key);
639  size_t compressedhash = hashvalue % COHERENT_DHT_COMPRESSED_HASH;
640  subscription[compressedhash].set_bit(source);
641 
642  std::pair<bool, ValueType> val = get_non_cached(key);
643  ASSERT_TRUE(val.first);
644  rpc.remote_request(source,
645  &coherent_dht<KeyType,ValueType>::update_cache_from_remote,
646  key, val.second);
647  }
648  };
649 
650 }
651 #endif
652