GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
distributed_constrained_batch_ingress.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 #ifndef GRAPHLAB_DISTRIBUTED_CONSTRAINED_BATCH_INGRESS_HPP
24 #define GRAPHLAB_DISTRIBUTED_CONSTRAINED_BATCH_INGRESS_HPP
25 
26 #include <boost/unordered_set.hpp>
27 #include <graphlab/graph/graph_basic_types.hpp>
28 #include <graphlab/graph/ingress/idistributed_ingress.hpp>
29 #include <graphlab/graph/ingress/distributed_ingress_base.hpp>
30 #include <graphlab/graph/distributed_graph.hpp>
31 #include <graphlab/rpc/buffered_exchange.hpp>
32 #include <graphlab/rpc/distributed_event_log.hpp>
33 #include <graphlab/util/dense_bitset.hpp>
34 #include <graphlab/graph/ingress/sharding_constraint.hpp>
35 #include <graphlab/macros_def.hpp>
36 namespace graphlab {
37  template<typename VertexData, typename EdgeData>
38  class distributed_graph;
39 
40  template<typename VertexData, typename EdgeData>
41  class distributed_constrained_batch_ingress :
42  public distributed_ingress_base<VertexData, EdgeData> {
43  public:
44  typedef distributed_graph<VertexData, EdgeData> graph_type;
45  /// The type of the vertex data stored in the graph
46  typedef VertexData vertex_data_type;
47  /// The type of the edge data stored in the graph
48  typedef EdgeData edge_data_type;
49 
50  typedef typename graph_type::vertex_record vertex_record;
51  typedef typename graph_type::mirror_type mirror_type;
52 
53  dc_dist_object<distributed_constrained_batch_ingress> rpc;
54  typedef distributed_ingress_base<VertexData, EdgeData> base_type;
55 
56  mutex local_graph_lock;
57  mutex lvid2record_lock;
58 
59  typedef fixed_dense_bitset<RPC_MAX_N_PROCS> bin_counts_type;
60 
61  /** Type of the degree hash table:
62  * a map from vertex id to a bitset of length num_procs. */
63  typedef typename boost::unordered_map<vertex_id_type, bin_counts_type>
64  dht_degree_table_type;
65 
66  /** distributed hash table stored on local machine */
67  std::vector<bin_counts_type > dht_degree_table;
68 
69  /** The map from vertex id to its DHT entry.
70  * Must be called with a readlock acquired on dht_degree_table_lock. */
71  size_t vid_to_dht_entry_with_readlock(vertex_id_type vid) {
72  size_t idx = (vid - rpc.procid()) / rpc.numprocs();
73  if (dht_degree_table.size() <= idx) {
74  dht_degree_table_lock.unlock();
75  dht_degree_table_lock.writelock();
76 
77  if (dht_degree_table.size() <= idx) {
78  size_t newsize = std::max(dht_degree_table.size() * 2, idx + 1);
79  dht_degree_table.resize(newsize);
80  }
81  dht_degree_table_lock.unlock();
82  dht_degree_table_lock.readlock();
83  }
84  return idx;
85  }
86  rwlock dht_degree_table_lock;
87 
88  /** Local minibatch buffer */
89  /** Number of edges in the current buffer. */
90  size_t num_edges;
91  /** Capacity of the local buffer. */
92  size_t bufsize;
93  std::vector<std::pair<vertex_id_type, vertex_id_type> > edgesend;
94  std::vector<EdgeData> edatasend;
95  mutex edgesend_lock;
96  /** A set of vertex in the current batch requiring query the DHT. */
97  std::vector<boost::unordered_set<vertex_id_type> > query_set;
98  /** The map from proc_id to num_edges on that proc */
99  std::vector<size_t> proc_num_edges;
100 
101  /** Ingress tratis. */
102  bool usehash;
103  bool userecent;
104 
105  sharding_constraint* constraint;
106  boost::hash<vertex_id_type> hashvid;
107 
108  public:
109  distributed_constrained_batch_ingress(distributed_control& dc, graph_type& graph,
110  size_t bufsize = 50000, bool usehash = false, bool userecent = false) :
111  base_type(dc, graph), rpc(dc, this),
112  num_edges(0), bufsize(bufsize), query_set(dc.numprocs()),
113  proc_num_edges(dc.numprocs()), usehash(usehash), userecent(userecent) {
114  constraint = new sharding_constraint(dc.numprocs(), "grid");
115  rpc.barrier();
116  }
117  ~distributed_constrained_batch_ingress() {
118  delete constraint;
119  }
120 
121  /** Adds an edge to the batch ingress buffer, and updates the query set. */
122  void add_edge(vertex_id_type source, vertex_id_type target, const EdgeData& edata) {
123  edgesend_lock.lock();
124  ASSERT_LT(edgesend.size(), bufsize);
125  edgesend.push_back(std::make_pair(source, target));
126  edatasend.push_back(edata);
127  query_set[base_type::vertex_to_proc(source)].insert(source);
128  query_set[base_type::vertex_to_proc(target)].insert(target);
129  ++num_edges;
130  edgesend_lock.unlock();
131  if (is_full()) flush();
132  } // end of add_edge
133 
134  /** Flush the buffer and call base finalize. */;
135  void finalize() {
136  rpc.full_barrier();
137  flush();
138  rpc.full_barrier();
139  base_type::finalize();
140  } // end of finalize
141 
142 
143  private:
144 
145  // HELPER ROUTINES =======================================================>
146  /** Add edges in block to the local current graph. */
147  void add_edges(const std::vector<vertex_id_type>& source_arr,
148  const std::vector<vertex_id_type>& target_arr,
149  const std::vector<EdgeData>& edata_arr) {
150 
151  ASSERT_TRUE((source_arr.size() == target_arr.size())
152  && (source_arr.size() == edata_arr.size()));
153  if (source_arr.size() == 0) return;
154 
155  std::vector<lvid_type> local_source_arr;
156  local_source_arr.reserve(source_arr.size());
157  std::vector<lvid_type> local_target_arr;
158  local_target_arr.reserve(source_arr.size());
159 
160  // The map from vertex_id to its degree on this proc.
161  std::vector<std::vector<vertex_id_type> > local_degree_count(rpc.numprocs());
162 
163  lvid_type max_lvid = 0;
164 
165  lvid2record_lock.lock();
166  // Lock and update the lvid2record.
167  for (size_t i = 0; i < source_arr.size(); ++i) {
168  vertex_id_type source = source_arr[i];
169  vertex_id_type target = target_arr[i];
170  lvid_type lvid_source(-1);
171  lvid_type lvid_target(-1);
172  // typedef typename boost::unordered_map<vertex_id_type, lvid_type>::iterator
173  // vid2lvid_iter;
174  typedef typename cuckoo_map_pow2<vertex_id_type, lvid_type, 3, uint32_t>::iterator
175  vid2lvid_iter;
176  vid2lvid_iter iter;
177 
178  iter = base_type::graph.vid2lvid.find(source);
179  if (iter == base_type::graph.vid2lvid.end()) {
180  lvid_source = base_type::graph.vid2lvid.size();
181  base_type::graph.vid2lvid[source]=lvid_source;
182  base_type::graph.lvid2record.push_back(vertex_record(source));
183  } else {
184  lvid_source = iter->second;
185  }
186 
187  iter = base_type::graph.vid2lvid.find(target);
188  if (iter == base_type::graph.vid2lvid.end()) {
189  lvid_target = base_type::graph.vid2lvid.size();
190  base_type::graph.vid2lvid[target]=lvid_target;
191  base_type::graph.lvid2record.push_back(vertex_record(target));
192  } else {
193  lvid_target = iter->second;
194  }
195 
196  local_source_arr.push_back(lvid_source);
197  local_target_arr.push_back(lvid_target);
198  max_lvid = std::max(std::max(lvid_source, lvid_target),
199  max_lvid);
200 
201  local_degree_count[base_type::vertex_to_proc(source)].push_back(source);
202  local_degree_count[base_type::vertex_to_proc(target)].push_back(target);
203  }
204  lvid2record_lock.unlock();
205 
206  // Send out local_degree count.
207  for (size_t i = 0; i < rpc.numprocs(); ++i) {
208  if (i != rpc.procid()) {
209  rpc.remote_call(i,
210  &distributed_constrained_batch_ingress::block_add_degree_counts,
211  rpc.procid(),
212  local_degree_count[i]);
213  } else {
214  block_add_degree_counts(rpc.procid(), local_degree_count[i]);
215  }
216  local_degree_count[i].clear();
217  }
218 
219  // Lock and add edges to local graph.
220  local_graph_lock.lock();
221  if (max_lvid >= base_type::graph.local_graph.num_vertices()) {
222  //std::cout << rpc.procid() << ": " << max_lvid << std::endl;
223  base_type::graph.local_graph.resize(max_lvid + 1);
224  }
225  base_type::graph.local_graph.add_edges(local_source_arr,
226  local_target_arr, edata_arr);
227  local_graph_lock.unlock();
228 
229  } // end of add edges
230 
231  /** Updates the local part of the distributed table. */
232  void block_add_degree_counts (procid_t pid, std::vector<vertex_id_type>& whohas) {
233  dht_degree_table_lock.readlock();
234  foreach (vertex_id_type& vid, whohas) {
235  size_t idx = vid_to_dht_entry_with_readlock(vid);
236  dht_degree_table[idx].set_bit_unsync(pid);
237  }
238  dht_degree_table_lock.unlock();
239  }
240 
241  /** Returns the degree counts by querying the distributed table. */
242  dht_degree_table_type
243  block_get_degree_table(const boost::unordered_set<vertex_id_type>& vid_query) {
244  dht_degree_table_type answer;
245  dht_degree_table_lock.readlock();
246  foreach (vertex_id_type qvid, vid_query) {
247  answer[qvid] = dht_degree_table[vid_to_dht_entry_with_readlock(qvid)];
248  }
249  dht_degree_table_lock.unlock();
250  return answer;
251  } // end of block get degree table
252 
253 
254  /** Assign edges in the buffer greedily using the recent query of DHT. */
255  void assign_edges(std::vector<std::vector<vertex_id_type> >& proc_src,
256  std::vector<std::vector<vertex_id_type> >& proc_dst,
257  std::vector<std::vector<EdgeData> >& proc_edata) {
258  ASSERT_EQ(num_edges, edgesend.size());
259 
260  edgesend_lock.lock();
261 
262  if (num_edges == 0) {
263  edgesend_lock.unlock();
264  return;
265  }
266  std::vector<dht_degree_table_type> degree_table(rpc.numprocs());
267 
268  // Query the DHT.
269  for (size_t i = 0; i < rpc.numprocs(); ++i) {
270  if (i == rpc.procid()) {
271  degree_table[i] = block_get_degree_table(query_set[i]);
272  } else {
273  degree_table[i] =
274  rpc.remote_request(i,
275  &distributed_constrained_batch_ingress::block_get_degree_table,
276  query_set[i]);
277  }
278  query_set[i].clear();
279  }
280 
281  // Make assigment.
282  for (size_t i = 0; i < num_edges; ++i) {
283  std::pair<vertex_id_type, vertex_id_type>& e =
284  edgesend[i];
285 
286  size_t src_proc = base_type::vertex_to_proc(e.first);
287  size_t dst_proc = base_type::vertex_to_proc(e.second);
288  bin_counts_type& src_degree = degree_table[src_proc][e.first];
289  bin_counts_type& dst_degree = degree_table[dst_proc][e.second];
290 
291  std::vector<procid_t> candidates;
292  constraint->get_joint_neighbors(get_master(e.first), get_master(e.second), candidates);
293 
294  procid_t proc = base_type::edge_decision.edge_to_proc_greedy(e.first, e.second,
295  src_degree, dst_degree, candidates, proc_num_edges, usehash, userecent);
296 
297  ASSERT_LT(proc, proc_src.size());
298  proc_src[proc].push_back(e.first);
299  proc_dst[proc].push_back(e.second);
300  proc_edata[proc].push_back(edatasend[i]);
301  }
302 
303  // Clear the sending buffer.
304  edgesend.clear();
305  edatasend.clear();
306  edgesend_lock.unlock();
307  } // end assign edge
308 
309  /** Flushes all edges in the buffer. */
310  void flush() {
311  std::vector< std::vector<vertex_id_type> > proc_src(rpc.numprocs());
312  std::vector< std::vector<vertex_id_type> > proc_dst(rpc.numprocs());
313  std::vector< std::vector<EdgeData> > proc_edata(rpc.numprocs());
314  assign_edges(proc_src, proc_dst, proc_edata);
315  for (size_t i = 0; i < proc_src.size(); ++i) {
316  if (proc_src[i].size() == 0)
317  continue;
318  if (i == rpc.procid()) {
319  add_edges(proc_src[i], proc_dst[i], proc_edata[i]);
320  num_edges -= proc_src[i].size();
321  } else {
322  rpc.remote_call(i, &distributed_constrained_batch_ingress::add_edges,
323  proc_src[i], proc_dst[i], proc_edata[i]);
324  num_edges -= proc_src[i].size();
325  } // end if
326  } // end for
327  } // end flush
328 
329  /** Returns the number of edges in the buffer. */
330  size_t size() { return num_edges; }
331 
332  /** Returns whether the buffer is full. */
333  bool is_full() { return size() >= bufsize; }
334 
335  /** Return the master shard of the vertex. */
336  procid_t get_master (vertex_id_type vid) {
337  return hashvid(vid) % base_type::rpc.numprocs();
338  }
339 
340 
341  }; // end of distributed_constrained_batch_ingress
342 
343 
344 }; // end of namespace graphlab
345 #include <graphlab/macros_undef.hpp>
346 
347 
348 #endif