GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
distributed_ingress_base.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 #ifndef GRAPHLAB_DISTRIBUTED_INGRESS_BASE_HPP
24 #define GRAPHLAB_DISTRIBUTED_INGRESS_BASE_HPP
25 
26 #include <boost/functional/hash.hpp>
27 
28 #include <graphlab/util/memory_info.hpp>
29 #include <graphlab/rpc/buffered_exchange.hpp>
30 #include <graphlab/graph/graph_basic_types.hpp>
31 #include <graphlab/graph/ingress/idistributed_ingress.hpp>
32 #include <graphlab/graph/ingress/ingress_edge_decision.hpp>
33 #include <graphlab/graph/distributed_graph.hpp>
34 #include <graphlab/util/cuckoo_map_pow2.hpp>
35 
36 #include <graphlab/macros_def.hpp>
37 namespace graphlab {
38 
39  /**
40  * \brief Implementation of the basic ingress functionality.
41  */
42  template<typename VertexData, typename EdgeData>
43  class distributed_graph;
44 
45  template<typename VertexData, typename EdgeData>
46  class distributed_ingress_base :
47  public idistributed_ingress<VertexData, EdgeData> {
48  public:
49  typedef distributed_graph<VertexData, EdgeData> graph_type;
50  /// The type of the vertex data stored in the graph
51  typedef VertexData vertex_data_type;
52  /// The type of the edge data stored in the graph
53  typedef EdgeData edge_data_type;
54 
55  typedef typename graph_type::vertex_record vertex_record;
56  typedef typename graph_type::mirror_type mirror_type;
57 
58 
59  /// The rpc interface for this object
60  dc_dist_object<distributed_ingress_base> rpc;
61  /// The underlying distributed graph object that is being loaded
62  graph_type& graph;
63 
64  /// Temporary buffers used to store vertex data on ingress
66  vertex_id_type vid;
67  vertex_data_type vdata;
69  vertex_data_type vdata = vertex_data_type()) :
70  vid(vid), vdata(vdata) { }
71  void load(iarchive& arc) { arc >> vid >> vdata; }
72  void save(oarchive& arc) const { arc << vid << vdata; }
73  };
74  buffered_exchange<vertex_buffer_record> vertex_exchange;
75 
76  /// Temporar buffers used to store edge data on ingress
78  vertex_id_type source, target;
79  edge_data_type edata;
81  const vertex_id_type& target = vertex_id_type(-1),
82  const edge_data_type& edata = edge_data_type()) :
83  source(source), target(target), edata(edata) { }
84  void load(iarchive& arc) { arc >> source >> target >> edata; }
85  void save(oarchive& arc) const { arc << source << target << edata; }
86  };
87  buffered_exchange<edge_buffer_record> edge_exchange;
88 
89 
90  /// Struct of minimal vertex information for the first pass coordination.
92  vertex_id_type vid, num_in_edges, num_out_edges;
93  vertex_info(vertex_id_type vid = 0, vertex_id_type num_in_edges = 0,
94  vertex_id_type num_out_edges = 0) :
95  vid(vid), num_in_edges(num_in_edges), num_out_edges(num_out_edges) { }
96  }; // end of vertex_info
97 
98 
99  /// Detail vertex record for the second pass coordination.
101  mirror_type mirrors;
102  vertex_data_type vdata;
103  vertex_id_type num_in_edges, num_out_edges;
104  procid_t owner;
106  vdata(vertex_data_type()), num_in_edges(0), num_out_edges(0), owner(-1) { }
107  void load(iarchive& arc) {
108  arc >> num_in_edges >> num_out_edges >> owner >> mirrors >> vdata;
109  }
110  void save(oarchive& arc) const {
111  arc << num_in_edges << num_out_edges << owner << mirrors << vdata;
112  }
113  };
114 
115  /// Ingress decision object for computing the edge destination.
116  ingress_edge_decision<VertexData, EdgeData> edge_decision;
117 
118  public:
119  distributed_ingress_base(distributed_control& dc, graph_type& graph) :
120  rpc(dc, this), graph(graph), vertex_exchange(dc), edge_exchange(dc),
121  edge_decision(dc) {
122  rpc.barrier();
123  } // end of constructor
124 
125  ~distributed_ingress_base() { }
126 
127  /** \brief Add an edge to the ingress object. */
128  virtual void add_edge(vertex_id_type source, vertex_id_type target,
129  const EdgeData& edata) {
130  const procid_t owning_proc =
131  edge_decision.edge_to_proc_random(source, target, rpc.numprocs());
132  const edge_buffer_record record(source, target, edata);
133  edge_exchange.send(owning_proc, record);
134  } // end of add edge
135 
136 
137  /** \brief Add an vertex to the ingress object. */
138  virtual void add_vertex(vertex_id_type vid, const VertexData& vdata) {
139  const procid_t owning_proc = vertex_to_proc(vid);
140  const vertex_buffer_record record(vid, vdata);
141  vertex_exchange.send(owning_proc, record);
142  } // end of add vertex
143 
144 
145  /** \brief Finalize completes the local graph data structure
146  * and the vertex record information.
147  *
148  * \internal
149  * The finalization goes through 5 steps:
150  *
151  * 1. Construct local graph using the received edges, during which
152  * the vid2lvid map is built.
153  *
154  * 2. Construct lvid2record map (of empty entries) using the received vertices.
155  *
156  * 3. Complete lvid2record map by exchanging the vertex_info.
157  *
158  * 4. Exchange the negotiation records, including singletons. (Local graph
159  *
160  * handling singletons).
161  * 5. Exchange global graph statistics.
162  */
163  virtual void finalize() {
164  rpc.full_barrier();
165  if (rpc.procid() == 0) {
166  logstream(LOG_EMPH) << "Finalizing Graph..." << std::endl;
167  }
168  // typedef typename boost::unordered_map<vertex_id_type, lvid_type>::value_type
169  // vid2lvid_pair_type;
170  typedef typename cuckoo_map_pow2<vertex_id_type, lvid_type, 3,
171  uint32_t>::value_type
172  vid2lvid_pair_type;
173  typedef typename buffered_exchange<edge_buffer_record>::buffer_type
174  edge_buffer_type;
175  typedef boost::unordered_map<vertex_id_type, vertex_negotiator_record>
176  vrec_map_type;
177  typedef typename vrec_map_type::value_type vrec_pair_type;
178  typedef typename buffered_exchange<vertex_buffer_record>::buffer_type
179  vertex_buffer_type;
180  typedef typename buffered_exchange<vertex_info>::buffer_type
181  vinfo_buffer_type;
182 
183 
184  // Flush any additional data
185  edge_exchange.flush(); vertex_exchange.flush();
186  if(rpc.procid() == 0)
187  memory_info::log_usage("Post Flush");
188 
189  logstream(LOG_INFO) << "Graph Finalize: constructing local graph" << std::endl;
190  { // Add all the edges to the local graph
191  const size_t nedges = edge_exchange.size()+1;
192  graph.local_graph.reserve_edge_space(nedges + 1);
193  edge_buffer_type edge_buffer;
194  procid_t proc;
195  while(edge_exchange.recv(proc, edge_buffer)) {
196  foreach(const edge_buffer_record& rec, edge_buffer) {
197  // Get the source_vlid;
198  lvid_type source_lvid(-1);
199  if(graph.vid2lvid.find(rec.source) == graph.vid2lvid.end()) {
200  source_lvid = graph.vid2lvid.size();
201  graph.vid2lvid[rec.source] = source_lvid;
202  // graph.local_graph.resize(source_lvid + 1);
203  } else source_lvid = graph.vid2lvid[rec.source];
204  // Get the target_lvid;
205  lvid_type target_lvid(-1);
206  if(graph.vid2lvid.find(rec.target) == graph.vid2lvid.end()) {
207  target_lvid = graph.vid2lvid.size();
208  graph.vid2lvid[rec.target] = target_lvid;
209  // graph.local_graph.resize(target_lvid + 1);
210  } else target_lvid = graph.vid2lvid[rec.target];
211  graph.local_graph.add_edge(source_lvid, target_lvid, rec.edata);
212  } // end of loop over add edges
213  } // end for loop over buffers
214  edge_exchange.clear();
215  }
216 
217  if(rpc.procid() == 0)
218  memory_info::log_usage("Finished populating local graph.");
219 
220  ASSERT_EQ(graph.vid2lvid.size(), graph.local_graph.num_vertices());
221  logstream(LOG_INFO) << "Vid2lvid size: " << graph.vid2lvid.size() << "\t" << "Max lvid : " << graph.local_graph.maxlvid() << std::endl;
222  ASSERT_EQ(graph.vid2lvid.size(), graph.local_graph.maxlvid() + 1);
223 
224  // Finalize local graph
225  logstream(LOG_INFO) << "Graph Finalize: finalizing local graph."
226  << std::endl;
227  graph.local_graph.finalize();
228  logstream(LOG_INFO) << "Local graph info: " << std::endl
229  << "\t nverts: " << graph.local_graph.num_vertices()
230  << std::endl
231  << "\t nedges: " << graph.local_graph.num_edges()
232  << std::endl;
233 
234  if(rpc.procid() == 0)
235  memory_info::log_usage("Finished finalizing local graph.");
236 
237  // Setup the map containing all the vertices being negotiated by
238  // this machine
239  vrec_map_type vrec_map;
240  { // Receive any vertex data sent by other machines
241  vertex_buffer_type vertex_buffer; procid_t sending_proc(-1);
242  while(vertex_exchange.recv(sending_proc, vertex_buffer)) {
243  foreach(const vertex_buffer_record& rec, vertex_buffer) {
244  vertex_negotiator_record& negotiator_rec = vrec_map[rec.vid];
245  negotiator_rec.vdata = rec.vdata;
246  }
247  }
248  vertex_exchange.clear();
249  } // end of loop to populate vrecmap
250 
251  if(rpc.procid() == 0)
252  memory_info::log_usage("Emptied vertex data exchange");
253 
254 
255 
256  { // Compute the mirroring information for all vertices
257  // negotiated by this machine
258  buffered_exchange<vertex_info> vinfo_exchange(rpc.dc());
259  vinfo_buffer_type recv_buffer; procid_t sending_proc = -1;
260  size_t iter = 0; size_t last_iter = graph.vid2lvid.size() - 1;
261 
262  // special case: flush there is no vertex info to send.
263  if (graph.vid2lvid.size() == 0) {
264  vinfo_exchange.flush();
265  // recv any buffers
266  while(vinfo_exchange.recv(sending_proc, recv_buffer)) {
267  foreach(vertex_info vinfo, recv_buffer) {
268  vertex_negotiator_record& rec = vrec_map[vinfo.vid];
269  rec.num_in_edges += vinfo.num_in_edges;
270  rec.num_out_edges += vinfo.num_out_edges;
271  rec.mirrors.set_bit(sending_proc);
272  } // end of for loop over all vertex info
273  } // end of recv while loop
274  } else {
275  // usual case : send vertex info and receive when necessary.
276  foreach(const vid2lvid_pair_type& pair, graph.vid2lvid) {
277  // Send a vertex
278  const vertex_id_type vid = pair.first;
279  const vertex_id_type lvid = pair.second;
280  const procid_t negotiator = vertex_to_proc(vid);
281  const vertex_info vinfo(vid, graph.local_graph.num_in_edges(lvid),
282  graph.local_graph.num_out_edges(lvid));
283  vinfo_exchange.send(negotiator, vinfo);
284  if (iter == last_iter) vinfo_exchange.flush();
285  // recv any buffers if necessary
286  while(vinfo_exchange.recv(sending_proc, recv_buffer)) {
287  foreach(vertex_info vinfo, recv_buffer) {
288  vertex_negotiator_record& rec = vrec_map[vinfo.vid];
289  rec.num_in_edges += vinfo.num_in_edges;
290  rec.num_out_edges += vinfo.num_out_edges;
291  rec.mirrors.set_bit(sending_proc);
292  } // end of for loop over all vertex info
293  } // end of recv while loop
294  ++iter;
295  } // end of loop over edge info
296  } // end of compute mirror information
297  }
298 
299  if(rpc.procid() == 0)
300  memory_info::log_usage("Exchanged basic vertex info");
301 
302  { // Determine masters for all negotiated vertices
303  logstream(LOG_INFO)
304  << "Graph Finalize: Constructing and sending vertex assignments"
305  << std::endl;
306  std::vector<size_t> counts(rpc.numprocs());
307  size_t num_singletons = 0;
308  // Compute the master assignments
309  foreach(vrec_pair_type& pair, vrec_map) {
310  vertex_negotiator_record& rec = pair.second;
311  // Determine the master
312  procid_t master(-1);
313  if(rec.mirrors.popcount() == 0) {
314  // // random assign a singleton vertex to a proc
315  // const vertex_id_type vid = pair.first;
316  // master = vid % rpc.numprocs();
317  // For simplicity simply assign it to this machine
318  master = rpc.procid(); ++num_singletons;
319  } else {
320  // Find the best (least loaded) processor to assign the
321  // vertex.
322  size_t first_mirror = 0;
323  const bool has_mirror =
324  rec.mirrors.first_bit(first_mirror);
325  ASSERT_TRUE(has_mirror);
326  std::pair<size_t, size_t>
327  best_asg(counts[first_mirror], first_mirror);
328  foreach(size_t proc, rec.mirrors) {
329  best_asg = std::min(best_asg,
330  std::make_pair(counts[proc], proc));
331  }
332  master = best_asg.second;
333  }
334  // update the counts and set the master assignment
335  counts[master]++;
336  rec.owner = master;
337  rec.mirrors.clear_bit(master); // Master is not a mirror
338  } // end of loop over all vertex negotiation records
339 
340  if(rpc.procid() == 0)
341  memory_info::log_usage("Finished computing masters");
342 
343  { // Initialize vertex records
344  graph.lvid2record.reserve(graph.vid2lvid.size() + num_singletons);
345  graph.lvid2record.resize(graph.vid2lvid.size() + num_singletons);
346  foreach(const vid2lvid_pair_type& pair, graph.vid2lvid)
347  graph.lvid2record[pair.second].gvid = pair.first;
348  // Check conditions on graph
349  // ASSERT_EQ(graph.local_graph.num_vertices(), graph.lvid2record.size());
350  graph.local_graph.reserve(graph.local_graph.num_vertices() + num_singletons);
351  }
352  if(rpc.procid() == 0)
353  memory_info::log_usage("Finished lvid2record");
354 
355 
356  // Exchange the negotiation records
357  typedef std::pair<vertex_id_type, vertex_negotiator_record>
358  exchange_pair_type;
359  typedef buffered_exchange<exchange_pair_type>
360  negotiator_exchange_type;
361  negotiator_exchange_type negotiator_exchange(rpc.dc(), 1, 1000);
362  typename negotiator_exchange_type::buffer_type recv_buffer;
363  procid_t sending_proc(-1);
364  size_t iter = 0; size_t last_iter = vrec_map.size() - 1;
365 
366  // special case : flush when there is no negotiate record to send.
367  if (vrec_map.size() == 0) {
368  negotiator_exchange.flush();
369  // receive records.
370  while(negotiator_exchange.recv(sending_proc, recv_buffer)) {
371  foreach(const exchange_pair_type& pair, recv_buffer) {
372  const vertex_id_type& vid = pair.first;
373  const vertex_negotiator_record& negotiator_rec = pair.second;
374  // Determine the local vid
375  lvid_type lvid(-1);
376  if(graph.vid2lvid.find(vid) == graph.vid2lvid.end()) {
377  lvid = graph.vid2lvid.size();
378  graph.vid2lvid[vid] = lvid;
379  graph.local_graph.add_vertex(lvid, negotiator_rec.vdata);
380  ASSERT_LT(lvid, graph.lvid2record.size());
381  graph.lvid2record[lvid].gvid = vid;
382  } else {
383  lvid = graph.vid2lvid[vid];
384  ASSERT_LT(lvid, graph.local_graph.num_vertices());
385  graph.local_graph.vertex_data(lvid) = negotiator_rec.vdata;
386  }
387  ASSERT_LT(lvid, graph.lvid2record.size());
388  vertex_record& local_record = graph.lvid2record[lvid];
389  local_record.owner = negotiator_rec.owner;
390  ASSERT_EQ(local_record.num_in_edges, 0);
391  local_record.num_in_edges = negotiator_rec.num_in_edges;
392  ASSERT_EQ(local_record.num_out_edges, 0);
393  local_record.num_out_edges = negotiator_rec.num_out_edges;
394  local_record._mirrors = negotiator_rec.mirrors;
395  }
396  } // end of while loop over negotiator_exchange.recv
397  } else {
398  // usual case : send and receive negotiate record.
399  foreach(vrec_pair_type& pair, vrec_map) {
400  const vertex_id_type& vid = pair.first;
401  const vertex_negotiator_record& negotiator_rec = pair.second;
402  const exchange_pair_type exchange_pair(vid, negotiator_rec);
403  // send to the owner
404  negotiator_exchange.send(negotiator_rec.owner, exchange_pair);
405  // send to all the mirrors
406  foreach(size_t mirror, negotiator_rec.mirrors) {
407  negotiator_exchange.send(mirror, exchange_pair);
408  }
409  if (iter == last_iter) negotiator_exchange.flush();
410  // Recevie any records
411  while(negotiator_exchange.recv(sending_proc, recv_buffer)) {
412  foreach(const exchange_pair_type& pair, recv_buffer) {
413  const vertex_id_type& vid = pair.first;
414  const vertex_negotiator_record& negotiator_rec = pair.second;
415  // Determine the local vid
416  lvid_type lvid(-1);
417  if(graph.vid2lvid.find(vid) == graph.vid2lvid.end()) {
418  lvid = graph.vid2lvid.size();
419  graph.vid2lvid[vid] = lvid;
420  graph.local_graph.add_vertex(lvid, negotiator_rec.vdata);
421  ASSERT_LT(lvid, graph.lvid2record.size());
422  graph.lvid2record[lvid].gvid = vid;
423  } else {
424  lvid = graph.vid2lvid[vid];
425  ASSERT_LT(lvid, graph.local_graph.num_vertices());
426  graph.local_graph.vertex_data(lvid) = negotiator_rec.vdata;
427  }
428  ASSERT_LT(lvid, graph.lvid2record.size());
429  vertex_record& local_record = graph.lvid2record[lvid];
430  local_record.owner = negotiator_rec.owner;
431  ASSERT_EQ(local_record.num_in_edges, 0);
432  local_record.num_in_edges = negotiator_rec.num_in_edges;
433  ASSERT_EQ(local_record.num_out_edges, 0);
434  local_record.num_out_edges = negotiator_rec.num_out_edges;
435  local_record._mirrors = negotiator_rec.mirrors;
436  }
437  } // end of while loop over negotiator_exchange.recv
438  ++iter;
439  } // end of for loop over local vertex records
440  }
441  } // end of master exchange
442 
443  if(rpc.procid() == 0)
444  memory_info::log_usage("Finished sending updating mirrors");
445 
446 
447  ASSERT_EQ(graph.vid2lvid.size(), graph.local_graph.num_vertices());
448  ASSERT_EQ(graph.lvid2record.size(), graph.local_graph.num_vertices());
449 
450  exchange_global_info();
451 
452  } // end of finalize
453 
454 
455  /* Exchange graph statistics among all nodes and compute
456  * global statistics for the distributed graph. */
457  void exchange_global_info () {
458  // Count the number of vertices owned locally
459  graph.local_own_nverts = 0;
460  foreach(const vertex_record& record, graph.lvid2record)
461  if(record.owner == rpc.procid()) ++graph.local_own_nverts;
462 
463  // Finalize global graph statistics.
464  logstream(LOG_INFO)
465  << "Graph Finalize: exchange global statistics " << std::endl;
466 
467  // Compute edge counts
468  std::vector<size_t> swap_counts(rpc.numprocs());
469  swap_counts[rpc.procid()] = graph.num_local_edges();
470  rpc.all_gather(swap_counts);
471  graph.nedges = 0;
472  foreach(size_t count, swap_counts) graph.nedges += count;
473 
474  // compute begin edge id
475  graph.begin_eid = 0;
476  for(size_t i = 0; i < rpc.procid(); ++i)
477  graph.begin_eid += swap_counts[i];
478 
479  // compute vertex count
480  swap_counts[rpc.procid()] = graph.num_local_own_vertices();
481  rpc.all_gather(swap_counts);
482  graph.nverts = 0;
483  foreach(size_t count, swap_counts) graph.nverts += count;
484 
485  // compute replicas
486  swap_counts[rpc.procid()] = graph.num_local_vertices();
487  rpc.all_gather(swap_counts);
488  graph.nreplicas = 0;
489  foreach(size_t count, swap_counts) graph.nreplicas += count;
490 
491 
492  if (rpc.procid() == 0) {
493  logstream(LOG_EMPH) << "Graph info: "
494  << "\n\t nverts: " << graph.num_vertices()
495  << "\n\t nedges: " << graph.num_edges()
496  << "\n\t nreplicas: " << graph.nreplicas
497  << "\n\t replication factor: " << (double)graph.nreplicas/graph.num_vertices()
498  << std::endl;
499  }
500  }
501 
502 
503 
504 
505 
506 
507 
508 
509  protected:
510  // HELPER ROUTINES
511  // =======================================================>
512  /** \brief Returns the random hashed pid of a vertex. */
513  procid_t vertex_to_proc(const vertex_id_type vid) const {
514  return vid % rpc.numprocs();
515  }
516  }; // end of distributed_ingress_base
517 
518 }; // end of namespace graphlab
519 #include <graphlab/macros_undef.hpp>
520 
521 
522 #endif