31 #ifndef GRAPHLAB_LAZY_DHT_HPP
32 #define GRAPHLAB_LAZY_DHT_HPP
34 #include <boost/unordered_map.hpp>
35 #include <boost/intrusive/list.hpp>
37 #include <graphlab/rpc/dc.hpp>
38 #include <graphlab/parallel/pthread_tools.hpp>
39 #include <graphlab/util/synchronized_unordered_map.hpp>
40 #include <graphlab/util/dense_bitset.hpp>
61 template<
typename KeyType,
typename ValueType>
65 typedef dc_impl::lru_list<KeyType, ValueType> lru_entry_type;
67 typedef boost::unordered_map<KeyType, ValueType> map_type;
69 typedef boost::unordered_map<KeyType, lru_entry_type* > cache_type;
79 typedef boost::intrusive::member_hook<lru_entry_type,
80 typename lru_entry_type::lru_member_hook_type,
81 &lru_entry_type::member_hook_> MemberOption;
83 typedef boost::intrusive::list<lru_entry_type,
85 boost::intrusive::constant_time_size<false> > lru_list_type;
88 lazy_dht(distributed_control &dc,
89 size_t max_cache_size = 65536):rmi(dc, this),data(11) {
90 cache.rehash(max_cache_size);
91 maxcache = max_cache_size;
92 logger(
LOG_INFO,
"%d Creating distributed_hash_table. Cache Limit = %d",
93 dc.procid(), maxcache);
102 typename cache_type::iterator i = cache.begin();
103 while (i != cache.end()) {
112 void set(
const KeyType& key,
const ValueType &newval) {
119 std::pair<bool, ValueType> get_owned(
const KeyType &key)
const {
120 std::pair<bool, ValueType> ret;
122 typename map_type::const_iterator iter = data.find(key);
123 if (iter == data.end()) {
128 ret.second = iter->second;
134 void remote_get_owned(
const KeyType &key,
procid_t source,
size_t ptr)
const {
135 std::pair<bool, ValueType> ret;
137 typename map_type::const_iterator iter = data.find(key);
138 if (iter == data.end()) {
143 ret.second = iter->second;
146 rmi.remote_call(source, &lazy_dht<KeyType,ValueType>::get_reply, ptr, ret.second, ret.first);
149 void get_reply(
size_t ptr, ValueType& val,
bool hasvalue) {
150 wait_struct* w =
reinterpret_cast<wait_struct*
>(ptr);
157 if (w->numreplies == 0) w->cond.signal();
163 std::pair<bool, ValueType>
get(
const KeyType &key)
const {
164 std::pair<bool, ValueType> ret = get_owned(key);
165 if (ret.first)
return ret;
168 w.numreplies = rmi.numprocs() - 1;
169 size_t ptr =
reinterpret_cast<size_t>(&w);
171 for (
size_t i = 0;i < rmi.numprocs(); ++i) {
172 if (i != rmi.procid()) {
173 rmi.remote_call(i, &lazy_dht<KeyType,ValueType>::remote_get_owned, key, rmi.procid(), ptr);
177 while (w.numreplies > 0) w.cond.wait(w.mut);
179 ret.first = w.hasvalue;
181 if (ret.first) update_cache(key, ret.second);
188 std::pair<bool, ValueType> get_cached(
const KeyType &key)
const {
189 std::pair<bool, ValueType> ret = get_owned(key);
190 if (ret.first)
return ret;
195 typename cache_type::iterator i = cache.find(key);
196 if (i == cache.end()) {
205 ret.second = i->second->value;
207 lruage.erase(lru_list_type::s_iterator_to(*(i->second)));
208 lruage.push_front(*(i->second));
215 void invalidate(
const KeyType &key)
const{
218 typename cache_type::iterator i = cache.find(key);
219 if (i != cache.end()) {
228 double cache_miss_rate() {
229 return double(misses) / double(reqs);
232 size_t num_gets()
const {
235 size_t num_misses()
const {
239 size_t cache_size()
const {
245 mutable dc_dist_object<lazy_dht<KeyType, ValueType> > rmi;
252 mutable cache_type cache;
253 mutable lru_list_type lruage;
260 mutable size_t misses;
267 void update_cache(
const KeyType &key,
const ValueType &val)
const{
269 typename cache_type::iterator i = cache.find(key);
271 if (i == cache.end()) {
274 if (cache.size() >= maxcache) remove_lru();
278 std::pair<typename cache_type::iterator, bool> ret = cache.insert(std::make_pair(key,
new lru_entry_type(key, val)));
279 if (ret.second) lruage.push_front(*(ret.first->second));
283 i->second->value = val;
286 lruage.erase(lru_list_type::s_iterator_to(*(i->second)));
287 lruage.push_front(*(i->second));
293 void remove_lru()
const{
295 KeyType keytoerase = lruage.back().key;
297 typename cache_type::iterator i = cache.find(keytoerase);
298 if (i != cache.end()) {