GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
delta_dht.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 /**
25  * Also contains code that is Copyright 2011 Yahoo! Inc. All rights
26  * reserved.
27  *
28  * Contributed under the iCLA for:
29  * Joseph Gonzalez ([email protected])
30  *
31  */
32 
33 
34 #ifndef GRAPHLAB_DELTA_DHT_HPP
35 #define GRAPHLAB_DELTA_DHT_HPP
36 
37 
38 #include <boost/unordered_map.hpp>
39 #include <boost/functional/hash.hpp>
40 
41 
42 #include <graphlab/rpc/dc.hpp>
43 #include <graphlab/parallel/pthread_tools.hpp>
44 #include <graphlab/util/cache.hpp>
45 
46 
47 
48 
49 #include <graphlab/macros_def.hpp>
50 namespace graphlab {
51 
52  namespace delta_dht_impl {
53  struct icache { virtual ~icache() { } };
54  icache*& get_icache_ptr(const void* dht_ptr);
55  }; // end of namespace delta_dht_impl
56 
57 
58  namespace delta_predicate {
59  template<typename ValueType, typename DeltaType>
60  struct uses {
61  size_t max_uses;
62  uses(size_t max_uses = 100) : max_uses(max_uses) { }
63  //! returns true if the predicate
64  bool operator()(const ValueType& current,
65  const DeltaType& delta,
66  const size_t& uses) const {
67  return uses < max_uses;
68  }
69  }; // end of uses
70 
71  }; // end of eviction predicates
72 
73 
74 
75 
76 
77 
78  template<typename KeyType, typename ValueType,
79  typename DeltaType = ValueType>
80  class delta_dht {
81  public:
82  typedef KeyType key_type;
83  typedef ValueType value_type;
84  typedef DeltaType delta_type;
85 
86  typedef size_t size_type;
87 
88  typedef boost::unordered_map<key_type, value_type> data_map_type;
89 
90  struct cache_entry {
91  value_type value;
92  delta_type delta;
93  size_t uses;
94  cache_entry(const value_type& value = value_type()) :
95  value(value), uses(0) { }
96  };
97 
98  typedef cache::lru<key_type, cache_entry> cache_type;
99 
100  private:
101 
102  //! The remote procedure call manager
103  mutable dc_dist_object<delta_dht> rpc;
104 
105 
106  //! The data stored locally on this machine
107  data_map_type data_map;
108 
109  //! The lock for the data map
110  mutex data_lock;
111 
112  //! The master cache
113  cache_type cache;
114 
115  //! The master cash rw lock
116  mutex cache_lock;
117 
118  //! The maximum cache size
119  size_t max_cache_size;
120 
121  size_t max_uses;
122 
123  //! the hash function
124  boost::hash<key_type> hash_function;
125 
126  //! cache hits and misses
127  mutable atomic<size_t> local;
128  mutable atomic<size_t> hits;
129  mutable atomic<size_t> misses;
130  mutable atomic<size_t> background_updates;
131 
132  public:
133 
134  delta_dht(distributed_control& dc,
135  size_t max_cache_size = 2056) :
136  rpc(dc, this),
137  max_cache_size(max_cache_size), max_uses(10) {
138  rpc.barrier();
139  }
140 
141  ~delta_dht() { rpc.full_barrier(); }
142 
143  void set_max_uses(size_t max) { max_uses = max; }
144 
145  size_t cache_local() const { return local.value; }
146  size_t cache_hits() const { return hits.value; }
147  size_t cache_misses() const { return misses.value; }
148  size_t background_syncs() const { return background_updates.value; }
149 
150  size_t cache_size() const {
151  cache_lock.lock();
152  const size_t ret_val = cache.size();
153  cache_lock.unlock();
154  return ret_val;
155  }
156 
157  bool is_cached(const key_type& key) const {
158  cache_lock.lock();
159  const bool ret_value = cache.contains(key);
160  cache_lock.unlock();
161  return ret_value;
162  }
163 
164 
165  value_type operator[](const key_type& key) {
166  if(is_local(key)) {
167  ++local;
168  data_lock.lock();
169  const value_type value = data_map[key];
170  data_lock.unlock();
171  return value;
172  } else { // on a remote machine check the cache
173  // test for the key in the cache
174  cache_lock.lock();
175  if(cache.contains(key)) {
176  ++hits;
177  const value_type ret_value = cache[key].value;
178  cache_lock.unlock();
179  return ret_value;
180  } else { // need to create a cache entry
181  ++misses;
182  // Free space in the cache if necessary
183  while(cache.size() + 1 > max_cache_size) {
184  ASSERT_GT(cache.size(), 0);
185  const std::pair<key_type, cache_entry> pair = cache.evict();
186  const key_type& key = pair.first;
187  const cache_entry& entry = pair.second;
188  send_delta(key, entry.delta);
189  }
190  // get the new entry from the server
191  const value_type ret_value = (cache[key].value = get_master(key));
192  cache_lock.unlock();
193  return ret_value;
194  }
195  }
196  } // end of operator []
197 
198 
199  void apply_delta(const key_type& key, const delta_type& delta) {
200  if(is_local(key)) {
201  data_lock.lock();
202  data_map[key] += delta;
203  data_lock.unlock();
204  } else {
205  // update the cache entry if availablable
206  cache_lock.lock();
207  if(cache.contains(key)) {
208  cache_entry& entry = cache[key];
209  entry.value += delta;
210  entry.delta += delta;
211  if( entry.uses > max_uses ) {
212  const delta_type accum_delta = entry.delta;
213  entry.delta = delta_type();
214  entry.uses = 0;
215  cache_lock.unlock();
216  send_delta(key, accum_delta);
217  return;
218  }
219  }
220  cache_lock.unlock();
221  }
222  }
223 
224 
225 
226  //! empty the local cache
227  void flush() {
228  cache_lock.lock();
229  while(cache.size() > 0) {
230  const std::pair<key_type, cache_entry> pair = cache.evict();
231  const key_type& key = pair.first;
232  const cache_entry& entry = pair.second;
233  send_delta(key, entry.delta);
234  }
235  cache_lock.unlock();
236  }
237 
238 
239  //! empty the local cache
240  void barrier_flush() {
241  flush();
242  rpc.full_barrier();
243  }
244 
245 
246  void synchronize() {
247  typedef typename cache_type::pair_type pair_type;
248  cache_lock.lock();
249  foreach(pair_type& pair, cache) {
250  key_type& key = pair.first;
251  cache_entry& entry = pair.second;
252  if(entry.uses > 0) {
253  const delta_type accum_delta = entry.delta;
254  entry.delta = delta_type();
255  entry.uses = 0;
256  send_delta(key, accum_delta);
257  }
258  } // end of foreach
259  cache_lock.unlock();
260  }
261 
262 
263  void synchronize(const key_type& key) {
264  if(is_local(key)) return;
265  cache_lock.lock();
266  if(cache.contains(key)) {
267  cache_entry& entry = cache[key];
268  const delta_type accum_delta = entry.delta;
269  entry.delta = delta_type();
270  entry.uses = 0;
271  cache_lock.unlock();
272  send_delta(key, accum_delta);
273  } else cache_lock.unlock();
274  }
275 
276 
277  size_t owning_cpu(const key_type& key) const {
278  const size_t hash_value = hash_function(key);
279  const size_t cpuid = hash_value % rpc.numprocs();
280  return cpuid;
281  }
282 
283 
284  bool is_local(const key_type& key) const {
285  return owning_cpu(key) == rpc.procid();
286  } // end of is local
287 
288 
289  delta_type delta(const key_type& key) const {
290  if(!is_local(key)) {
291  cache_lock.lock();
292  if(cache.contains(key)) {
293  const delta_type delta = cache[key].delta;
294  cache_lock.unlock();
295  return delta;
296  }
297  cache_lock.unlock();
298  }
299  return delta_type();
300  }
301 
302 
303  size_t local_size() const {
304  data_lock.lock();
305  const size_t result = data_map.size();
306  data_lock.unlock();
307  return result;
308  }
309 
310 
311  size_t size() const {
312  size_t sum = 0;
313  for(size_t i = 0; i < rpc.numprocs(); ++i) {
314  if(i == rpc.procid()) sum += local_size();
315  else sum += rpc.remote_request(i, &delta_dht::local_size);
316  }
317  return sum;
318  }
319 
320  size_t numprocs() const { return rpc.num_procs(); }
321  size_t procid() const { return rpc.procid(); }
322 
323 
324  value_type get_master(const key_type& key) {
325  // If the data is stored locally just read and return
326  if(is_local(key)) {
327  data_lock.lock();
328  const value_type ret_value = data_map[key];
329  data_lock.unlock();
330  return ret_value;
331  } else {
332  return rpc.remote_request(owning_cpu(key),
333  &delta_dht::get_master, key);
334  }
335  } // end of direct get
336 
337  private:
338 
339  void send_delta(const key_type& key, const delta_type& delta) {
340  // If the data is stored locally just read and return
341  ASSERT_FALSE(is_local(key));
342  const size_t calling_procid = procid();
343  rpc.remote_call(owning_cpu(key),
344  &delta_dht::send_delta_rpc,
345  calling_procid, key, delta);
346 
347  } // end of send_delta
348 
349  void send_delta_rpc(const size_t& calling_procid,
350  const key_type& key, const delta_type& delta) {
351  // If the data is stored locally just read and return
352  ASSERT_TRUE(is_local(key));
353  data_lock.lock();
354  const value_type ret_value = (data_map[key] += delta);
355  data_lock.unlock();
356  rpc.remote_call(calling_procid,
357  &delta_dht::send_delta_rpc_callback, key, ret_value);
358 
359  } // end of send_delta_rpc
360 
361  void send_delta_rpc_callback(const key_type& key, const value_type& new_value) {
362  // If the data is stored locally just read and return
363  ASSERT_FALSE(is_local(key));
364  cache_lock.lock();
365  if(cache.contains(key)) {
366  cache_entry& entry = cache[key];
367  entry.value = new_value;
368  entry.value += entry.delta;
369  }
370  ++background_updates;
371  cache_lock.unlock();
372  } // end of send_delta_rpc_callback
373 
374 
375  // void synchronize(const key_type& key, cache_entry& entry) {
376  // const value_type delta = entry.current - entry.old;
377  // entry.old = synchronize_rpc(key, delta);
378  // entry.current = entry.old;
379  // } // end of synchronize
380 
381  // value_type synchronize_rpc(const key_type& key, const value_type& delta) {
382  // if(is_local(key)) {
383  // data_lock.lock();
384  // typename data_map_type::iterator iter = data_map.find(key);
385  // ASSERT_TRUE(iter != data_map.end());
386  // const value_type ret_value = (iter->second += delta);
387  // data_lock.unlock();
388  // return ret_value;
389  // } else {
390  // return rpc.remote_request(owning_cpu(key),
391  // &delta_dht::synchronize_rpc,
392  // key, delta);
393  // }
394  // } // end of synchronize_rpc
395 
396  }; // end of delta_dht
397 
398 
399 }; // end of namespace graphlab
400 #include <graphlab/macros_undef.hpp>
401 
402 
403 
404 #endif
405 
406