GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
distributed_batch_ingress.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 #ifndef GRAPHLAB_DISTRIBUTED_BATCH_INGRESS_HPP
24 #define GRAPHLAB_DISTRIBUTED_BATCH_INGRESS_HPP
25 
26 #include <boost/unordered_set.hpp>
27 #include <graphlab/graph/graph_basic_types.hpp>
28 #include <graphlab/graph/ingress/idistributed_ingress.hpp>
29 #include <graphlab/graph/ingress/distributed_ingress_base.hpp>
30 #include <graphlab/graph/distributed_graph.hpp>
31 #include <graphlab/rpc/buffered_exchange.hpp>
32 #include <graphlab/rpc/distributed_event_log.hpp>
33 #include <graphlab/util/dense_bitset.hpp>
34 #include <graphlab/macros_def.hpp>
35 namespace graphlab {
36  template<typename VertexData, typename EdgeData>
37  class distributed_graph;
38 
39  template<typename VertexData, typename EdgeData>
40  class distributed_batch_ingress :
41  public distributed_ingress_base<VertexData, EdgeData> {
42  public:
43  typedef distributed_graph<VertexData, EdgeData> graph_type;
44  /// The type of the vertex data stored in the graph
45  typedef VertexData vertex_data_type;
46  /// The type of the edge data stored in the graph
47  typedef EdgeData edge_data_type;
48 
49  typedef typename graph_type::vertex_record vertex_record;
50  typedef typename graph_type::mirror_type mirror_type;
51 
52  dc_dist_object<distributed_batch_ingress> rpc;
53  typedef distributed_ingress_base<VertexData, EdgeData> base_type;
54 
55  mutex local_graph_lock;
56  mutex lvid2record_lock;
57 
58  typedef fixed_dense_bitset<RPC_MAX_N_PROCS> bin_counts_type;
59 
60  /** Type of the degree hash table:
61  * a map from vertex id to a bitset of length num_procs. */
62  typedef typename boost::unordered_map<vertex_id_type, bin_counts_type>
63  dht_degree_table_type;
64 
65  /** distributed hash table stored on local machine */
66  std::vector<bin_counts_type > dht_degree_table;
67 
68  /** The map from vertex id to its DHT entry.
69  * Must be called with a readlock acquired on dht_degree_table_lock. */
70  size_t vid_to_dht_entry_with_readlock(vertex_id_type vid) {
71  size_t idx = (vid - rpc.procid()) / rpc.numprocs();
72  if (dht_degree_table.size() <= idx) {
73  dht_degree_table_lock.unlock();
74  dht_degree_table_lock.writelock();
75 
76  if (dht_degree_table.size() <= idx) {
77  size_t newsize = std::max(dht_degree_table.size() * 2, idx + 1);
78  dht_degree_table.resize(newsize);
79  }
80  dht_degree_table_lock.unlock();
81  dht_degree_table_lock.readlock();
82  }
83  return idx;
84  }
85  rwlock dht_degree_table_lock;
86 
87  /** Local minibatch buffer */
88  /** Number of edges in the current buffer. */
89  size_t num_edges;
90  /** Capacity of the local buffer. */
91  size_t bufsize;
92  std::vector<std::pair<vertex_id_type, vertex_id_type> > edgesend;
93  std::vector<EdgeData> edatasend;
94  mutex edgesend_lock;
95  /** A set of vertex in the current batch requiring query the DHT. */
96  std::vector<boost::unordered_set<vertex_id_type> > query_set;
97  /** The map from proc_id to num_edges on that proc */
98  std::vector<size_t> proc_num_edges;
99 
100  DECLARE_TRACER(batch_ingress_add_edge);
101  DECLARE_TRACER(batch_ingress_add_edges);
102  DECLARE_TRACER(batch_ingress_compute_assignments);
103  DECLARE_TRACER(batch_ingress_request_degree_table);
104  DECLARE_TRACER(batch_ingress_get_degree_table);
105  DECLARE_TRACER(batch_ingress_update_degree_table);
106 
107  /** Ingress tratis. */
108  bool usehash;
109  bool userecent;
110 
111  public:
112  distributed_batch_ingress(distributed_control& dc, graph_type& graph,
113  size_t bufsize = 50000, bool usehash = false, bool userecent = false) :
114  base_type(dc, graph), rpc(dc, this),
115  num_edges(0), bufsize(bufsize), query_set(dc.numprocs()),
116  proc_num_edges(dc.numprocs()), usehash(usehash), userecent(userecent) {
117  rpc.barrier();
118 
119  INITIALIZE_TRACER(batch_ingress_add_edge, "Time spent in add edge");
120  INITIALIZE_TRACER(batch_ingress_add_edges, "Time spent in add block edges" );
121  INITIALIZE_TRACER(batch_ingress_compute_assignments, "Time spent in compute assignment");
122  INITIALIZE_TRACER(batch_ingress_request_degree_table, "Time spent in requesting assignment");
123  INITIALIZE_TRACER(batch_ingress_get_degree_table, "Time spent in retrieve degree table");
124  INITIALIZE_TRACER(batch_ingress_update_degree_table, "Time spent in update degree table");
125  }
126 
127  /** Adds an edge to the batch ingress buffer, and updates the query set. */
128  void add_edge(vertex_id_type source, vertex_id_type target, const EdgeData& edata) {
129  BEGIN_TRACEPOINT(batch_ingress_add_edge);
130  edgesend_lock.lock();
131  ASSERT_LT(edgesend.size(), bufsize);
132  edgesend.push_back(std::make_pair(source, target));
133  edatasend.push_back(edata);
134  query_set[base_type::vertex_to_proc(source)].insert(source);
135  query_set[base_type::vertex_to_proc(target)].insert(target);
136  ++num_edges;
137  edgesend_lock.unlock();
138  END_TRACEPOINT(batch_ingress_add_edge);
139  if (is_full()) flush();
140  } // end of add_edge
141 
142  /** Flush the buffer and call base finalize. */;
143  void finalize() {
144  rpc.full_barrier();
145  flush();
146  rpc.full_barrier();
147  base_type::finalize();
148  } // end of finalize
149 
150 
151  private:
152 
153  // HELPER ROUTINES =======================================================>
154  /** Add edges in block to the local current graph. */
155  void add_edges(const std::vector<vertex_id_type>& source_arr,
156  const std::vector<vertex_id_type>& target_arr,
157  const std::vector<EdgeData>& edata_arr) {
158 
159  BEGIN_TRACEPOINT(batch_ingress_add_edges);
160  ASSERT_TRUE((source_arr.size() == target_arr.size())
161  && (source_arr.size() == edata_arr.size()));
162  if (source_arr.size() == 0) return;
163 
164  std::vector<lvid_type> local_source_arr;
165  local_source_arr.reserve(source_arr.size());
166  std::vector<lvid_type> local_target_arr;
167  local_target_arr.reserve(source_arr.size());
168 
169  // The map from vertex_id to its degree on this proc.
170  std::vector<std::vector<vertex_id_type> > local_degree_count(rpc.numprocs());
171 
172  lvid_type max_lvid = 0;
173 
174  lvid2record_lock.lock();
175  // Lock and update the lvid2record.
176  for (size_t i = 0; i < source_arr.size(); ++i) {
177  vertex_id_type source = source_arr[i];
178  vertex_id_type target = target_arr[i];
179  lvid_type lvid_source(-1);
180  lvid_type lvid_target(-1);
181  // typedef typename boost::unordered_map<vertex_id_type, lvid_type>::iterator
182  // vid2lvid_iter;
183  typedef typename cuckoo_map_pow2<vertex_id_type, lvid_type, 3, uint32_t>::iterator
184  vid2lvid_iter;
185  vid2lvid_iter iter;
186 
187  iter = base_type::graph.vid2lvid.find(source);
188  if (iter == base_type::graph.vid2lvid.end()) {
189  lvid_source = base_type::graph.vid2lvid.size();
190  base_type::graph.vid2lvid[source]=lvid_source;
191  base_type::graph.lvid2record.push_back(vertex_record(source));
192  } else {
193  lvid_source = iter->second;
194  }
195 
196  iter = base_type::graph.vid2lvid.find(target);
197  if (iter == base_type::graph.vid2lvid.end()) {
198  lvid_target = base_type::graph.vid2lvid.size();
199  base_type::graph.vid2lvid[target]=lvid_target;
200  base_type::graph.lvid2record.push_back(vertex_record(target));
201  } else {
202  lvid_target = iter->second;
203  }
204 
205  local_source_arr.push_back(lvid_source);
206  local_target_arr.push_back(lvid_target);
207  max_lvid = std::max(std::max(lvid_source, lvid_target),
208  max_lvid);
209 
210  local_degree_count[base_type::vertex_to_proc(source)].push_back(source);
211  local_degree_count[base_type::vertex_to_proc(target)].push_back(target);
212  }
213  lvid2record_lock.unlock();
214 
215  // Send out local_degree count.
216  for (size_t i = 0; i < rpc.numprocs(); ++i) {
217  if (i != rpc.procid()) {
218  rpc.remote_call(i,
219  &distributed_batch_ingress::block_add_degree_counts,
220  rpc.procid(),
221  local_degree_count[i]);
222  } else {
223  block_add_degree_counts(rpc.procid(), local_degree_count[i]);
224  }
225  local_degree_count[i].clear();
226  }
227 
228  // Lock and add edges to local graph.
229  local_graph_lock.lock();
230  if (max_lvid >= base_type::graph.local_graph.num_vertices()) {
231  //std::cout << rpc.procid() << ": " << max_lvid << std::endl;
232  base_type::graph.local_graph.resize(max_lvid + 1);
233  }
234  base_type::graph.local_graph.add_edges(local_source_arr,
235  local_target_arr, edata_arr);
236  local_graph_lock.unlock();
237 
238  END_TRACEPOINT(batch_ingress_add_edges);
239  } // end of add edges
240 
241  /** Updates the local part of the distributed table. */
242  void block_add_degree_counts (procid_t pid, std::vector<vertex_id_type>& whohas) {
243  BEGIN_TRACEPOINT(batch_ingress_update_degree_table);
244  dht_degree_table_lock.readlock();
245  foreach (vertex_id_type& vid, whohas) {
246  size_t idx = vid_to_dht_entry_with_readlock(vid);
247  dht_degree_table[idx].set_bit_unsync(pid);
248  }
249  dht_degree_table_lock.unlock();
250  END_TRACEPOINT(batch_ingress_update_degree_table);
251  }
252 
253  /** Returns the degree counts by querying the distributed table. */
254  dht_degree_table_type
255  block_get_degree_table(const boost::unordered_set<vertex_id_type>& vid_query) {
256  BEGIN_TRACEPOINT(batch_ingress_get_degree_table);
257  dht_degree_table_type answer;
258  dht_degree_table_lock.readlock();
259  foreach (vertex_id_type qvid, vid_query) {
260  answer[qvid] = dht_degree_table[vid_to_dht_entry_with_readlock(qvid)];
261  }
262  dht_degree_table_lock.unlock();
263  END_TRACEPOINT(batch_ingress_get_degree_table);
264  return answer;
265  } // end of block get degree table
266 
267 
268  /** Assign edges in the buffer greedily using the recent query of DHT. */
269  void assign_edges(std::vector<std::vector<vertex_id_type> >& proc_src,
270  std::vector<std::vector<vertex_id_type> >& proc_dst,
271  std::vector<std::vector<EdgeData> >& proc_edata) {
272  ASSERT_EQ(num_edges, edgesend.size());
273 
274  edgesend_lock.lock();
275 
276  if (num_edges == 0) {
277  edgesend_lock.unlock();
278  return;
279  }
280  BEGIN_TRACEPOINT(batch_ingress_request_degree_table);
281  std::vector<dht_degree_table_type> degree_table(rpc.numprocs());
282 
283  // Query the DHT.
284  for (size_t i = 0; i < rpc.numprocs(); ++i) {
285  if (i == rpc.procid()) {
286  degree_table[i] = block_get_degree_table(query_set[i]);
287  } else {
288  degree_table[i] =
289  rpc.remote_request(i,
290  &distributed_batch_ingress::block_get_degree_table,
291  query_set[i]);
292  }
293  query_set[i].clear();
294  }
295  END_TRACEPOINT(batch_ingress_request_degree_table);
296 
297  // Make assigment.
298  for (size_t i = 0; i < num_edges; ++i) {
299  std::pair<vertex_id_type, vertex_id_type>& e =
300  edgesend[i];
301 
302  BEGIN_TRACEPOINT(batch_ingress_compute_assignments);
303  size_t src_proc = base_type::vertex_to_proc(e.first);
304  size_t dst_proc = base_type::vertex_to_proc(e.second);
305  bin_counts_type& src_degree = degree_table[src_proc][e.first];
306  bin_counts_type& dst_degree = degree_table[dst_proc][e.second];
307  procid_t proc = base_type::edge_decision.edge_to_proc_greedy(e.first, e.second,
308  src_degree, dst_degree, proc_num_edges, usehash, userecent);
309  END_TRACEPOINT(batch_ingress_compute_assignments);
310 
311  ASSERT_LT(proc, proc_src.size());
312  proc_src[proc].push_back(e.first);
313  proc_dst[proc].push_back(e.second);
314  proc_edata[proc].push_back(edatasend[i]);
315  }
316 
317  // Clear the sending buffer.
318  edgesend.clear();
319  edatasend.clear();
320  edgesend_lock.unlock();
321  } // end assign edge
322 
323  /** Flushes all edges in the buffer. */
324  void flush() {
325  std::vector< std::vector<vertex_id_type> > proc_src(rpc.numprocs());
326  std::vector< std::vector<vertex_id_type> > proc_dst(rpc.numprocs());
327  std::vector< std::vector<EdgeData> > proc_edata(rpc.numprocs());
328  assign_edges(proc_src, proc_dst, proc_edata);
329  for (size_t i = 0; i < proc_src.size(); ++i) {
330  if (proc_src[i].size() == 0)
331  continue;
332  if (i == rpc.procid()) {
333  add_edges(proc_src[i], proc_dst[i], proc_edata[i]);
334  num_edges -= proc_src[i].size();
335  } else {
336  rpc.remote_call(i, &distributed_batch_ingress::add_edges,
337  proc_src[i], proc_dst[i], proc_edata[i]);
338  num_edges -= proc_src[i].size();
339  } // end if
340  } // end for
341  } // end flush
342 
343  /** Returns the number of edges in the buffer. */
344  size_t size() { return num_edges; }
345 
346  /** Returns whether the buffer is full. */
347  bool is_full() { return size() >= bufsize; }
348  }; // end of distributed_batch_ingress
349 
350 }; // end of namespace graphlab
351 #include <graphlab/macros_undef.hpp>
352 
353 
354 #endif