34 #ifndef GRAPHLAB_DELTA_DHT_HPP
35 #define GRAPHLAB_DELTA_DHT_HPP
38 #include <boost/unordered_map.hpp>
39 #include <boost/functional/hash.hpp>
42 #include <graphlab/rpc/dc.hpp>
43 #include <graphlab/parallel/pthread_tools.hpp>
44 #include <graphlab/util/cache.hpp>
49 #include <graphlab/macros_def.hpp>
52 namespace delta_dht_impl {
53 struct icache {
virtual ~icache() { } };
54 icache*& get_icache_ptr(
const void* dht_ptr);
58 namespace delta_predicate {
59 template<
typename ValueType,
typename DeltaType>
62 uses(
size_t max_uses = 100) : max_uses(max_uses) { }
64 bool operator()(
const ValueType& current,
65 const DeltaType& delta,
66 const size_t& uses)
const {
67 return uses < max_uses;
78 template<
typename KeyType,
typename ValueType,
79 typename DeltaType = ValueType>
82 typedef KeyType key_type;
83 typedef ValueType value_type;
84 typedef DeltaType delta_type;
86 typedef size_t size_type;
88 typedef boost::unordered_map<key_type, value_type> data_map_type;
94 cache_entry(
const value_type& value = value_type()) :
95 value(value), uses(0) { }
98 typedef cache::lru<key_type, cache_entry> cache_type;
103 mutable dc_dist_object<delta_dht> rpc;
107 data_map_type data_map;
119 size_t max_cache_size;
124 boost::hash<key_type> hash_function;
127 mutable atomic<size_t> local;
128 mutable atomic<size_t> hits;
129 mutable atomic<size_t> misses;
130 mutable atomic<size_t> background_updates;
134 delta_dht(distributed_control& dc,
135 size_t max_cache_size = 2056) :
137 max_cache_size(max_cache_size), max_uses(10) {
141 ~delta_dht() { rpc.full_barrier(); }
143 void set_max_uses(
size_t max) { max_uses = max; }
145 size_t cache_local()
const {
return local.value; }
146 size_t cache_hits()
const {
return hits.value; }
147 size_t cache_misses()
const {
return misses.value; }
148 size_t background_syncs()
const {
return background_updates.value; }
150 size_t cache_size()
const {
152 const size_t ret_val = cache.size();
157 bool is_cached(
const key_type& key)
const {
159 const bool ret_value = cache.contains(key);
165 value_type operator[](
const key_type& key) {
169 const value_type value = data_map[key];
175 if(cache.contains(key)) {
177 const value_type ret_value = cache[key].value;
183 while(cache.size() + 1 > max_cache_size) {
184 ASSERT_GT(cache.size(), 0);
185 const std::pair<key_type, cache_entry> pair = cache.evict();
186 const key_type& key = pair.first;
187 const cache_entry& entry = pair.second;
188 send_delta(key, entry.delta);
191 const value_type ret_value = (cache[key].value = get_master(key));
199 void apply_delta(
const key_type& key,
const delta_type& delta) {
202 data_map[key] += delta;
207 if(cache.contains(key)) {
208 cache_entry& entry = cache[key];
209 entry.value += delta;
210 entry.delta += delta;
211 if( entry.uses > max_uses ) {
212 const delta_type accum_delta = entry.delta;
213 entry.delta = delta_type();
216 send_delta(key, accum_delta);
229 while(cache.size() > 0) {
230 const std::pair<key_type, cache_entry> pair = cache.evict();
231 const key_type& key = pair.first;
232 const cache_entry& entry = pair.second;
233 send_delta(key, entry.delta);
240 void barrier_flush() {
247 typedef typename cache_type::pair_type pair_type;
249 foreach(pair_type& pair, cache) {
250 key_type& key = pair.first;
251 cache_entry& entry = pair.second;
253 const delta_type accum_delta = entry.delta;
254 entry.delta = delta_type();
256 send_delta(key, accum_delta);
263 void synchronize(
const key_type& key) {
264 if(is_local(key))
return;
266 if(cache.contains(key)) {
267 cache_entry& entry = cache[key];
268 const delta_type accum_delta = entry.delta;
269 entry.delta = delta_type();
272 send_delta(key, accum_delta);
273 }
else cache_lock.unlock();
277 size_t owning_cpu(
const key_type& key)
const {
278 const size_t hash_value = hash_function(key);
279 const size_t cpuid = hash_value % rpc.numprocs();
284 bool is_local(
const key_type& key)
const {
285 return owning_cpu(key) == rpc.procid();
289 delta_type delta(
const key_type& key)
const {
292 if(cache.contains(key)) {
293 const delta_type delta = cache[key].delta;
303 size_t local_size()
const {
305 const size_t result = data_map.size();
311 size_t size()
const {
313 for(
size_t i = 0; i < rpc.numprocs(); ++i) {
314 if(i == rpc.procid()) sum += local_size();
315 else sum += rpc.remote_request(i, &delta_dht::local_size);
320 size_t numprocs()
const {
return rpc.num_procs(); }
321 size_t procid()
const {
return rpc.procid(); }
324 value_type get_master(
const key_type& key) {
328 const value_type ret_value = data_map[key];
332 return rpc.remote_request(owning_cpu(key),
333 &delta_dht::get_master, key);
339 void send_delta(
const key_type& key,
const delta_type& delta) {
341 ASSERT_FALSE(is_local(key));
342 const size_t calling_procid = procid();
343 rpc.remote_call(owning_cpu(key),
344 &delta_dht::send_delta_rpc,
345 calling_procid, key, delta);
349 void send_delta_rpc(
const size_t& calling_procid,
350 const key_type& key,
const delta_type& delta) {
352 ASSERT_TRUE(is_local(key));
354 const value_type ret_value = (data_map[key] += delta);
356 rpc.remote_call(calling_procid,
357 &delta_dht::send_delta_rpc_callback, key, ret_value);
361 void send_delta_rpc_callback(
const key_type& key,
const value_type& new_value) {
363 ASSERT_FALSE(is_local(key));
365 if(cache.contains(key)) {
366 cache_entry& entry = cache[key];
367 entry.value = new_value;
368 entry.value += entry.delta;
370 ++background_updates;
400 #include <graphlab/macros_undef.hpp>