24 #ifndef GRAPHLAB_COHERENT_DHT_HPP
25 #define GRAPHLAB_COHERENT_DHT_HPP
27 #include <boost/unordered_map.hpp>
28 #include <boost/intrusive/list.hpp>
29 #include <boost/function.hpp>
31 #include <graphlab/rpc/dc.hpp>
32 #include <graphlab/parallel/pthread_tools.hpp>
33 #include <graphlab/util/dense_bitset.hpp>
34 #include <graphlab/util/timer.hpp>
39 #define COHERENT_DHT_COMPRESSED_HASH 32768
40 #define COHERENT_DHT_SUBSCRIBE_IF_ACCESSES_PER_INVALIDATE 10
47 template<
typename KeyType,
typename ValueType>
48 class coherent_lru_list{
54 typedef boost::intrusive::list_member_hook<
55 boost::intrusive::link_mode<boost::intrusive::auto_unlink> >
58 lru_member_hook_type member_hook_;
59 ~coherent_lru_list() { }
60 explicit coherent_lru_list(
const KeyType& k = KeyType(),
const ValueType &v = ValueType())
61 : key(k), value(v),accesses(0) { }
76 template<
typename KeyType,
typename ValueType>
82 typedef dc_impl::coherent_lru_list<KeyType, ValueType> lru_entry_type;
84 typedef boost::unordered_map<KeyType, ValueType> map_type;
86 typedef boost::unordered_map<KeyType, lru_entry_type* > cache_type;
89 typedef boost::intrusive::member_hook<lru_entry_type,
90 typename lru_entry_type::lru_member_hook_type,
91 &lru_entry_type::member_hook_> MemberOption;
93 typedef boost::intrusive::list<lru_entry_type,
95 boost::intrusive::constant_time_size<false> > lru_list_type;
107 mutex finegrained_lock[COHERENT_DHT_COMPRESSED_HASH];
108 dense_bitset subscription[COHERENT_DHT_COMPRESSED_HASH];
113 mutable cache_type cache;
114 mutable lru_list_type lruage;
120 mutable size_t misses;
124 boost::hash<KeyType> hasher;
133 void set_impl(
const KeyType& key,
const ValueType &newval,
135 size_t hashvalue = hasher(key);
136 size_t compressedhash = hashvalue % COHERENT_DHT_COMPRESSED_HASH;
137 size_t owningmachine = hashvalue % rpc.
dc().
numprocs();
138 if (owningmachine == rpc.
dc().
procid()) {
141 typename map_type::iterator iter = data.find(key);
143 if (iter == data.end()) {
144 finegrained_lock[compressedhash].
lock();
146 finegrained_lock[compressedhash].
unlock();
153 finegrained_lock[compressedhash].
lock();
154 iter->second = newval;
155 finegrained_lock[compressedhash].
unlock();
157 push_changes(key,
true, source);
165 update_cache(key, newval);
175 void set_synchronous_impl(
const KeyType& key,
const ValueType &newval,
177 size_t hashvalue = hasher(key);
178 size_t compressedhash = hashvalue % COHERENT_DHT_COMPRESSED_HASH;
179 size_t owningmachine = hashvalue % rpc.
dc().
numprocs();
180 if (owningmachine == rpc.
dc().
procid()) {
183 typename map_type::iterator iter = data.find(key);
185 if (iter == data.end()) {
186 finegrained_lock[compressedhash].
lock();
188 finegrained_lock[compressedhash].
unlock();
195 finegrained_lock[compressedhash].
lock();
196 iter->second = newval;
197 finegrained_lock[compressedhash].
unlock();
199 push_changes(key,
false, source);
207 update_cache(key, newval);
220 size_t max_cache_size = 1024):rpc(dc, this),data(11) {
222 cache.rehash(max_cache_size);
223 maxcache = max_cache_size;
224 logger(
LOG_INFO,
"%d Creating distributed_hash_table. Cache Limit = %d",
229 for (
size_t i = 0;i < COHERENT_DHT_COMPRESSED_HASH; ++i) {
231 subscription[i].
clear();
239 typename cache_type::iterator i = cache.begin();
240 while (i != cache.end()) {
254 void set(
const KeyType& key,
const ValueType &newval) {
255 set_impl(key, newval, rpc.
procid());
262 set_synchronous_impl(key, newval, rpc.
procid());
269 std::pair<bool, ValueType>
get(
const KeyType &key)
const {
272 if (owningmachine == rpc.
dc().
procid())
return get_non_cached(key);
277 typename cache_type::iterator i = cache.find(key);
278 if (i == cache.end()) {
282 return get_non_cached(key);
286 std::pair<bool, ValueType> ret;
288 ret.second = i->second->value;
289 i->second->accesses++;
291 lruage.erase(lru_list_type::s_iterator_to(*(i->second)));
292 lruage.push_front(*(i->second));
302 size_t hashvalue = hasher(key);
303 size_t owningmachine = hashvalue % rpc.
dc().
numprocs();
304 return owningmachine;
313 if (owningmachine == rpc.
dc().
procid())
return true;
316 typename cache_type::iterator i = cache.find(key);
317 if (i != cache.end()) {
341 return double(misses) / double(reqs);
361 void subscribe(
const KeyType &key,
bool async =
false)
const{
364 if (owningmachine == rpc.
dc().
procid())
return;
381 bool haschanges =
false;
382 bool isincache =
false;
385 typename cache_type::iterator i = cache.find(key);
386 if (i != cache.end()) {
389 if (i->second->accesses >= COHERENT_DHT_SUBSCRIBE_IF_ACCESSES_PER_INVALIDATE) {
413 void push_changes(
const KeyType& key,
bool async,
procid_t ignoreproc) {
414 size_t hashvalue = hasher(key);
415 size_t compressedhash = hashvalue % COHERENT_DHT_COMPRESSED_HASH;
416 size_t owningmachine = hashvalue % rpc.
dc().
numprocs();
418 if (owningmachine == rpc.
procid()) {
420 finegrained_lock[compressedhash].
lock();
421 typename map_type::iterator iter = data.find(key);
422 finegrained_lock[compressedhash].
unlock();
423 assert(iter != data.end());
425 update_cache_coherency_set(compressedhash, key, iter->second, ignoreproc);
428 update_cache_coherency_set_synchronous(compressedhash, key, iter->second, ignoreproc);
435 &coherent_dht<KeyType, ValueType>::push_changes,
442 &coherent_dht<KeyType, ValueType>::push_changes,
451 void update_cache_from_remote(
const KeyType &key,
const ValueType &val)
const {
452 return update_cache(key, val);
457 void update_cache(
const KeyType &key,
const ValueType &val)
const{
460 typename cache_type::iterator i = cache.find(key);
462 if (i == cache.end()) {
465 if (cache.size() >= maxcache) remove_lru();
469 std::pair<typename cache_type::iterator, bool> ret = cache.insert(std::make_pair(key,
new lru_entry_type(key, val)));
470 if (ret.second) lruage.push_front(*(ret.first->second));
474 i->second->value = val;
477 lruage.erase(lru_list_type::s_iterator_to(*(i->second)));
478 lruage.push_front(*(i->second));
485 void remove_lru()
const{
487 KeyType keytoerase = lruage.back().key;
489 typename cache_type::iterator i = cache.find(keytoerase);
490 if (i != cache.end()) {
506 std::pair<bool, ValueType> get_non_cached(
const KeyType &key)
const {
509 std::pair<bool, ValueType> ret;
511 if (owningmachine == rpc.
dc().
procid()) {
513 typename map_type::const_iterator iter = data.find(key);
515 if (iter == data.end()) {
520 ret.second = iter->second;
525 &coherent_dht<KeyType,ValueType>::get_non_cached,
527 if (ret.first) update_cache(key, ret.second);
539 void update_cache_coherency_set(
size_t compressedhash,
541 const ValueType &value,
545 if (i != rpc.
dc().
procid() && i != except) {
546 if (subscription[compressedhash].
get(i)) {
548 &coherent_dht<KeyType,ValueType>::update_cache_from_remote,
552 rpc.
remote_call(i, &coherent_dht<KeyType,ValueType>::invalidate, key);
558 void invalidate_reply(
const KeyType &key,
559 procid_t source,
size_t reply)
const {
564 reply, dc_impl::blob());
568 void update_cache_reply(
const KeyType &key,
const ValueType &value,
569 procid_t source,
size_t reply)
const {
571 update_cache(key, value);
574 reply, dc_impl::blob());
578 void update_cache_coherency_set_synchronous(
size_t compressedhash,
580 const ValueType &value,
583 dc_impl::reply_ret_type repret(
true, rpc.
numprocs() - 1);
584 if (except < rpc.
numprocs() && except != rpc.
procid()) repret.flag.dec();
586 size_t r =
reinterpret_cast<size_t>(&repret);
588 if (i != rpc.
procid() && i != except) {
589 if (subscription[compressedhash].
get(i)) {
591 &coherent_dht<KeyType,ValueType>::update_cache_reply,
596 rpc.
remote_call(i, &coherent_dht<KeyType,ValueType>::invalidate_reply,
604 void asychronous_get_handler(
const KeyType &key,
procid_t source) {
605 std::pair<bool, ValueType> ret = get_non_cached(key);
608 &coherent_dht<KeyType, ValueType>::update_cache_reply,
620 void register_subscription(
const KeyType &key,
procid_t source) {
621 size_t hashvalue = hasher(key);
622 size_t compressedhash = hashvalue % COHERENT_DHT_COMPRESSED_HASH;
623 subscription[compressedhash].
set_bit(source);
625 std::pair<bool, ValueType> val = get_non_cached(key);
626 ASSERT_TRUE(val.first);
628 &coherent_dht<KeyType,ValueType>::update_cache_from_remote,
637 void register_subscription_synchronous(
const KeyType &key,
procid_t source) {
638 size_t hashvalue = hasher(key);
639 size_t compressedhash = hashvalue % COHERENT_DHT_COMPRESSED_HASH;
640 subscription[compressedhash].
set_bit(source);
642 std::pair<bool, ValueType> val = get_non_cached(key);
643 ASSERT_TRUE(val.first);
645 &coherent_dht<KeyType,ValueType>::update_cache_from_remote,