33 #ifndef GRAPHLAB_CACHING_DHT_HPP
34 #define GRAPHLAB_CACHING_DHT_HPP
35 #include <boost/unordered_map.hpp>
36 #include <boost/intrusive/list.hpp>
37 #include <boost/functional/hash.hpp>
39 #include <graphlab/rpc/dc.hpp>
40 #include <graphlab/parallel/pthread_tools.hpp>
41 #include <graphlab/util/synchronized_unordered_map.hpp>
42 #include <graphlab/util/dense_bitset.hpp>
53 template<
typename KeyType,
typename ValueType>
59 typedef boost::intrusive::list_member_hook<
60 boost::intrusive::link_mode<boost::intrusive::auto_unlink> >
63 lru_member_hook_type member_hook_;
65 explicit lru_list(
const KeyType& k = KeyType(),
const ValueType &v = ValueType()) : key(k), value(v) {
78 template<
typename KeyType,
typename ValueType>
82 typedef dc_impl::lru_list<KeyType, ValueType> lru_entry_type;
84 typedef boost::unordered_map<KeyType, ValueType> map_type;
86 typedef boost::unordered_map<KeyType, lru_entry_type* > cache_type;
89 typedef boost::intrusive::member_hook<lru_entry_type,
90 typename lru_entry_type::lru_member_hook_type,
91 &lru_entry_type::member_hook_> MemberOption;
93 typedef boost::intrusive::list<lru_entry_type,
95 boost::intrusive::constant_time_size<false> > lru_list_type;
99 mutable dc_dist_object<caching_dht<KeyType, ValueType> > rpc;
105 mutable cache_type cache;
106 mutable lru_list_type lruage;
113 mutable size_t misses;
115 boost::hash<KeyType> hasher;
120 caching_dht(distributed_control &dc,
121 size_t max_cache_size = 1024):rpc(dc, this),data(11) {
122 cache.rehash(max_cache_size);
123 maxcache = max_cache_size;
124 logger(
LOG_INFO,
"%d Creating distributed_hash_table. Cache Limit = %d",
125 dc.procid(), maxcache);
133 typename cache_type::iterator i = cache.begin();
134 while (i != cache.end()) {
143 void set(
const KeyType& key,
const ValueType &newval) {
144 size_t hashvalue = hasher(key);
145 size_t owningmachine = hashvalue % rpc.dc().numprocs();
146 if (owningmachine == rpc.dc().procid()) {
151 rpc.remote_call(owningmachine,
152 &caching_dht<KeyType,ValueType>::set,
155 update_cache(key, newval);
161 std::pair<bool, ValueType>
get(
const KeyType &key)
const {
163 size_t hashvalue = hasher(key);
164 size_t owningmachine = hashvalue % rpc.dc().numprocs();
166 std::pair<bool, ValueType> ret;
168 if (owningmachine == rpc.dc().procid()) {
170 typename map_type::const_iterator iter = data.find(key);
171 if (iter == data.end()) {
175 ret.second = iter->second;
179 ret = rpc.remote_request(owningmachine,
180 &caching_dht<KeyType,ValueType>::get,
182 if (ret.first) update_cache(key, ret.second);
183 else invalidate(key);
191 std::pair<bool, ValueType> get_cached(
const KeyType &key)
const {
193 size_t hashvalue = hasher(key);
194 size_t owningmachine = hashvalue % rpc.dc().numprocs();
195 if (owningmachine == rpc.dc().procid())
return get(key);
201 typename cache_type::iterator i = cache.find(key);
202 if (i == cache.end()) {
210 std::pair<bool, ValueType> ret;
212 ret.second = i->second->value;
214 lruage.erase(lru_list_type::s_iterator_to(*(i->second)));
215 lruage.push_front(*(i->second));
222 void invalidate(
const KeyType &key)
const{
225 typename cache_type::iterator i = cache.find(key);
226 if (i != cache.end()) {
235 double cache_miss_rate() {
236 return double(misses) / double(reqs);
239 size_t num_gets()
const {
242 size_t num_misses()
const {
246 size_t cache_size()
const {
255 void update_cache(
const KeyType &key,
const ValueType &val)
const{
257 typename cache_type::iterator i = cache.find(key);
259 if (i == cache.end()) {
262 if (cache.size() >= maxcache) remove_lru();
266 std::pair<typename cache_type::iterator, bool> ret = cache.insert(std::make_pair(key,
new lru_entry_type(key, val)));
267 if (ret.second) lruage.push_front(*(ret.first->second));
270 i->second->value = val;
273 lruage.erase(lru_list_type::s_iterator_to(*(i->second)));
274 lruage.push_front(*(i->second));
280 void remove_lru()
const{
282 KeyType keytoerase = lruage.back().key;
284 typename cache_type::iterator i = cache.find(keytoerase);
285 if (i != cache.end()) {