GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
dht.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef GRAPHLAB_DHT_HPP
25 #define GRAPHLAB_DHT_HPP
26 
27 #include <boost/functional/hash.hpp>
28 #include <boost/unordered_map.hpp>
29 #include <graphlab/parallel/pthread_tools.hpp>
30 #include <graphlab/rpc/dc_dist_object.hpp>
31 
32 namespace graphlab {
33 
34  /**
35  * \ingroup rpc
36  * Implements a very rudimentary distributed key value store.
37  */
38  template <typename KeyType, typename ValueType>
39  class dht {
40 
41  public:
42  typedef boost::unordered_map<size_t, ValueType> storage_type;
43 
44 
45  private:
46  mutable dc_dist_object< dht > rpc;
47 
48  boost::hash<KeyType> hasher;
49  mutex lock;
50  storage_type storage;
51 
52  public:
53  dht(distributed_control &dc) : rpc(dc, this) { }
54 
55  /**
56  * Get the owner of the key
57  */
58  procid_t owner(const KeyType& key) const {
59  return hasher(key) % rpc.dc().numprocs();
60  }
61 
62  /**
63  * gets the value associated with a key.
64  * Returns (true, Value) if the entry is available.
65  * Returns (false, undefined) otherwise.
66  */
67  std::pair<bool, ValueType> get(const KeyType &key) const {
68  // who owns the data?
69 
70  const size_t hashvalue = hasher(key);
71  const size_t owningmachine = hashvalue % rpc.numprocs();
72  std::pair<bool, ValueType> retval;
73  // if it is me, we can return it
74  if (owningmachine == rpc.dc().procid()) {
75 
76  lock.lock();
77  typename storage_type::const_iterator iter = storage.find(hashvalue);
78  retval.first = iter != storage.end();
79  if (retval.first) retval.second = iter->second;
80  lock.unlock();
81  } else {
82  retval = rpc.remote_request(owningmachine,
84  key);
85  }
86  return retval;
87  }
88 
89  /**
90  * gets the value associated with a key.
91  * Returns (true, Value) if the entry is available.
92  * Returns (false, undefined) otherwise.
93  */
95  // who owns the data?
96 
97  const size_t hashvalue = hasher(key);
98  const size_t owningmachine = hashvalue % rpc.numprocs();
99  std::pair<bool, ValueType> retval;
100  // if it is me, we can return it
101  if (owningmachine == rpc.dc().procid()) {
102 
103  lock.lock();
104  typename storage_type::const_iterator iter = storage.find(hashvalue);
105  retval.first = iter != storage.end();
106  if (retval.first) retval.second = iter->second;
107  lock.unlock();
108  return retval;
109  } else {
110  return rpc.future_remote_request(owningmachine,
112  key);
113  }
114  }
115 
116 
117 
118 
119  /**
120  * Sets the newval to be the value associated with the key
121  */
122  void set(const KeyType &key, const ValueType &newval) {
123  // who owns the data?
124  const size_t hashvalue = hasher(key);
125  const size_t owningmachine = hashvalue % rpc.numprocs();
126 
127  // if it is me, set it
128  if (owningmachine == rpc.dc().procid()) {
129  lock.lock();
130  storage[hashvalue] = newval;
131  lock.unlock();
132  } else {
133  rpc.remote_call(owningmachine,
135  key, newval);
136  }
137  }
138 
139  void print_stats() const {
140  std::cerr << rpc.calls_sent() << " calls sent\n";
141  std::cerr << rpc.calls_received() << " calls received\n";
142  }
143 
144  /**
145  Must be called by all machines simultaneously
146  */
147  void clear() {
148  rpc.barrier();
149  storage.clear();
150  }
151 
152  };
153 
154 };
155 #endif
156