23 #ifndef GRAPHLAB_DISTRIBUTED_INGRESS_BASE_HPP
24 #define GRAPHLAB_DISTRIBUTED_INGRESS_BASE_HPP
26 #include <boost/functional/hash.hpp>
28 #include <graphlab/util/memory_info.hpp>
29 #include <graphlab/rpc/buffered_exchange.hpp>
30 #include <graphlab/graph/graph_basic_types.hpp>
31 #include <graphlab/graph/ingress/idistributed_ingress.hpp>
32 #include <graphlab/graph/ingress/ingress_edge_decision.hpp>
33 #include <graphlab/graph/distributed_graph.hpp>
34 #include <graphlab/util/cuckoo_map_pow2.hpp>
36 #include <graphlab/macros_def.hpp>
42 template<
typename VertexData,
typename EdgeData>
43 class distributed_graph;
45 template<
typename VertexData,
typename EdgeData>
46 class distributed_ingress_base :
47 public idistributed_ingress<VertexData, EdgeData> {
49 typedef distributed_graph<VertexData, EdgeData>
graph_type;
51 typedef VertexData vertex_data_type;
53 typedef EdgeData edge_data_type;
55 typedef typename graph_type::vertex_record vertex_record;
60 dc_dist_object<distributed_ingress_base> rpc;
67 vertex_data_type vdata;
69 vertex_data_type vdata = vertex_data_type()) :
70 vid(vid), vdata(vdata) { }
71 void load(
iarchive& arc) { arc >> vid >> vdata; }
72 void save(
oarchive& arc)
const { arc << vid << vdata; }
74 buffered_exchange<vertex_buffer_record> vertex_exchange;
82 const edge_data_type& edata = edge_data_type()) :
83 source(source), target(target), edata(edata) { }
84 void load(
iarchive& arc) { arc >> source >> target >> edata; }
85 void save(
oarchive& arc)
const { arc << source << target << edata; }
87 buffered_exchange<edge_buffer_record> edge_exchange;
95 vid(vid), num_in_edges(num_in_edges), num_out_edges(num_out_edges) { }
102 vertex_data_type vdata;
106 vdata(vertex_data_type()), num_in_edges(0), num_out_edges(0), owner(-1) { }
108 arc >> num_in_edges >> num_out_edges >> owner >> mirrors >> vdata;
111 arc << num_in_edges << num_out_edges << owner << mirrors << vdata;
116 ingress_edge_decision<VertexData, EdgeData> edge_decision;
120 rpc(dc, this), graph(graph), vertex_exchange(dc), edge_exchange(dc),
125 ~distributed_ingress_base() { }
129 const EdgeData& edata) {
131 edge_decision.edge_to_proc_random(source, target, rpc.numprocs());
132 const edge_buffer_record record(source, target, edata);
133 edge_exchange.send(owning_proc, record);
138 virtual void add_vertex(
vertex_id_type vid,
const VertexData& vdata) {
139 const procid_t owning_proc = vertex_to_proc(vid);
140 const vertex_buffer_record record(vid, vdata);
141 vertex_exchange.send(owning_proc, record);
163 virtual void finalize() {
165 if (rpc.procid() == 0) {
166 logstream(
LOG_EMPH) <<
"Finalizing Graph..." << std::endl;
171 uint32_t>::value_type
173 typedef typename buffered_exchange<edge_buffer_record>::buffer_type
175 typedef boost::unordered_map<vertex_id_type, vertex_negotiator_record>
177 typedef typename vrec_map_type::value_type vrec_pair_type;
178 typedef typename buffered_exchange<vertex_buffer_record>::buffer_type
180 typedef typename buffered_exchange<vertex_info>::buffer_type
185 edge_exchange.flush(); vertex_exchange.flush();
186 if(rpc.procid() == 0)
187 memory_info::log_usage(
"Post Flush");
189 logstream(
LOG_INFO) <<
"Graph Finalize: constructing local graph" << std::endl;
191 const size_t nedges = edge_exchange.size()+1;
192 graph.local_graph.reserve_edge_space(nedges + 1);
193 edge_buffer_type edge_buffer;
195 while(edge_exchange.recv(proc, edge_buffer)) {
196 foreach(
const edge_buffer_record& rec, edge_buffer) {
198 lvid_type source_lvid(-1);
199 if(graph.vid2lvid.find(rec.source) == graph.vid2lvid.end()) {
200 source_lvid = graph.vid2lvid.size();
201 graph.vid2lvid[rec.source] = source_lvid;
203 }
else source_lvid = graph.vid2lvid[rec.source];
205 lvid_type target_lvid(-1);
206 if(graph.vid2lvid.find(rec.target) == graph.vid2lvid.end()) {
207 target_lvid = graph.vid2lvid.size();
208 graph.vid2lvid[rec.target] = target_lvid;
210 }
else target_lvid = graph.vid2lvid[rec.target];
211 graph.local_graph.add_edge(source_lvid, target_lvid, rec.edata);
214 edge_exchange.clear();
217 if(rpc.procid() == 0)
218 memory_info::log_usage(
"Finished populating local graph.");
220 ASSERT_EQ(graph.vid2lvid.size(), graph.local_graph.num_vertices());
221 logstream(
LOG_INFO) <<
"Vid2lvid size: " << graph.vid2lvid.size() <<
"\t" <<
"Max lvid : " << graph.local_graph.maxlvid() << std::endl;
222 ASSERT_EQ(graph.vid2lvid.size(), graph.local_graph.maxlvid() + 1);
225 logstream(
LOG_INFO) <<
"Graph Finalize: finalizing local graph."
227 graph.local_graph.finalize();
228 logstream(
LOG_INFO) <<
"Local graph info: " << std::endl
229 <<
"\t nverts: " << graph.local_graph.num_vertices()
231 <<
"\t nedges: " << graph.local_graph.num_edges()
234 if(rpc.procid() == 0)
235 memory_info::log_usage(
"Finished finalizing local graph.");
239 vrec_map_type vrec_map;
241 vertex_buffer_type vertex_buffer;
procid_t sending_proc(-1);
242 while(vertex_exchange.recv(sending_proc, vertex_buffer)) {
243 foreach(
const vertex_buffer_record& rec, vertex_buffer) {
244 vertex_negotiator_record& negotiator_rec = vrec_map[rec.vid];
245 negotiator_rec.vdata = rec.vdata;
248 vertex_exchange.clear();
251 if(rpc.procid() == 0)
252 memory_info::log_usage(
"Emptied vertex data exchange");
258 buffered_exchange<vertex_info> vinfo_exchange(rpc.dc());
259 vinfo_buffer_type recv_buffer;
procid_t sending_proc = -1;
260 size_t iter = 0;
size_t last_iter = graph.vid2lvid.size() - 1;
263 if (graph.vid2lvid.size() == 0) {
264 vinfo_exchange.flush();
266 while(vinfo_exchange.recv(sending_proc, recv_buffer)) {
267 foreach(vertex_info vinfo, recv_buffer) {
268 vertex_negotiator_record& rec = vrec_map[vinfo.vid];
269 rec.num_in_edges += vinfo.num_in_edges;
270 rec.num_out_edges += vinfo.num_out_edges;
271 rec.mirrors.set_bit(sending_proc);
276 foreach(
const vid2lvid_pair_type& pair, graph.vid2lvid) {
278 const vertex_id_type vid = pair.first;
279 const vertex_id_type lvid = pair.second;
280 const procid_t negotiator = vertex_to_proc(vid);
281 const vertex_info vinfo(vid, graph.local_graph.num_in_edges(lvid),
282 graph.local_graph.num_out_edges(lvid));
283 vinfo_exchange.send(negotiator, vinfo);
284 if (iter == last_iter) vinfo_exchange.flush();
286 while(vinfo_exchange.recv(sending_proc, recv_buffer)) {
287 foreach(vertex_info vinfo, recv_buffer) {
288 vertex_negotiator_record& rec = vrec_map[vinfo.vid];
289 rec.num_in_edges += vinfo.num_in_edges;
290 rec.num_out_edges += vinfo.num_out_edges;
291 rec.mirrors.set_bit(sending_proc);
299 if(rpc.procid() == 0)
300 memory_info::log_usage(
"Exchanged basic vertex info");
304 <<
"Graph Finalize: Constructing and sending vertex assignments"
306 std::vector<size_t> counts(rpc.numprocs());
307 size_t num_singletons = 0;
309 foreach(vrec_pair_type& pair, vrec_map) {
310 vertex_negotiator_record& rec = pair.second;
313 if(rec.mirrors.popcount() == 0) {
318 master = rpc.procid(); ++num_singletons;
322 size_t first_mirror = 0;
323 const bool has_mirror =
324 rec.mirrors.first_bit(first_mirror);
325 ASSERT_TRUE(has_mirror);
326 std::pair<size_t, size_t>
327 best_asg(counts[first_mirror], first_mirror);
328 foreach(
size_t proc, rec.mirrors) {
329 best_asg = std::min(best_asg,
330 std::make_pair(counts[proc], proc));
332 master = best_asg.second;
337 rec.mirrors.clear_bit(master);
340 if(rpc.procid() == 0)
341 memory_info::log_usage(
"Finished computing masters");
344 graph.lvid2record.reserve(graph.vid2lvid.size() + num_singletons);
345 graph.lvid2record.resize(graph.vid2lvid.size() + num_singletons);
346 foreach(
const vid2lvid_pair_type& pair, graph.vid2lvid)
347 graph.lvid2record[pair.second].gvid = pair.first;
350 graph.local_graph.reserve(graph.local_graph.num_vertices() + num_singletons);
352 if(rpc.procid() == 0)
353 memory_info::log_usage(
"Finished lvid2record");
357 typedef std::pair<vertex_id_type, vertex_negotiator_record>
359 typedef buffered_exchange<exchange_pair_type>
360 negotiator_exchange_type;
361 negotiator_exchange_type negotiator_exchange(rpc.dc(), 1, 1000);
362 typename negotiator_exchange_type::buffer_type recv_buffer;
364 size_t iter = 0;
size_t last_iter = vrec_map.size() - 1;
367 if (vrec_map.size() == 0) {
368 negotiator_exchange.flush();
370 while(negotiator_exchange.recv(sending_proc, recv_buffer)) {
371 foreach(
const exchange_pair_type& pair, recv_buffer) {
372 const vertex_id_type& vid = pair.first;
373 const vertex_negotiator_record& negotiator_rec = pair.second;
376 if(graph.vid2lvid.find(vid) == graph.vid2lvid.end()) {
377 lvid = graph.vid2lvid.size();
378 graph.vid2lvid[vid] = lvid;
379 graph.local_graph.add_vertex(lvid, negotiator_rec.vdata);
380 ASSERT_LT(lvid, graph.lvid2record.size());
381 graph.lvid2record[lvid].gvid = vid;
383 lvid = graph.vid2lvid[vid];
384 ASSERT_LT(lvid, graph.local_graph.num_vertices());
385 graph.local_graph.vertex_data(lvid) = negotiator_rec.vdata;
387 ASSERT_LT(lvid, graph.lvid2record.size());
388 vertex_record& local_record = graph.lvid2record[lvid];
389 local_record.owner = negotiator_rec.owner;
390 ASSERT_EQ(local_record.num_in_edges, 0);
391 local_record.num_in_edges = negotiator_rec.num_in_edges;
392 ASSERT_EQ(local_record.num_out_edges, 0);
393 local_record.num_out_edges = negotiator_rec.num_out_edges;
394 local_record._mirrors = negotiator_rec.mirrors;
399 foreach(vrec_pair_type& pair, vrec_map) {
400 const vertex_id_type& vid = pair.first;
401 const vertex_negotiator_record& negotiator_rec = pair.second;
402 const exchange_pair_type exchange_pair(vid, negotiator_rec);
404 negotiator_exchange.send(negotiator_rec.owner, exchange_pair);
406 foreach(
size_t mirror, negotiator_rec.mirrors) {
407 negotiator_exchange.send(mirror, exchange_pair);
409 if (iter == last_iter) negotiator_exchange.flush();
411 while(negotiator_exchange.recv(sending_proc, recv_buffer)) {
412 foreach(
const exchange_pair_type& pair, recv_buffer) {
413 const vertex_id_type& vid = pair.first;
414 const vertex_negotiator_record& negotiator_rec = pair.second;
417 if(graph.vid2lvid.find(vid) == graph.vid2lvid.end()) {
418 lvid = graph.vid2lvid.size();
419 graph.vid2lvid[vid] = lvid;
420 graph.local_graph.add_vertex(lvid, negotiator_rec.vdata);
421 ASSERT_LT(lvid, graph.lvid2record.size());
422 graph.lvid2record[lvid].gvid = vid;
424 lvid = graph.vid2lvid[vid];
425 ASSERT_LT(lvid, graph.local_graph.num_vertices());
426 graph.local_graph.vertex_data(lvid) = negotiator_rec.vdata;
428 ASSERT_LT(lvid, graph.lvid2record.size());
429 vertex_record& local_record = graph.lvid2record[lvid];
430 local_record.owner = negotiator_rec.owner;
431 ASSERT_EQ(local_record.num_in_edges, 0);
432 local_record.num_in_edges = negotiator_rec.num_in_edges;
433 ASSERT_EQ(local_record.num_out_edges, 0);
434 local_record.num_out_edges = negotiator_rec.num_out_edges;
435 local_record._mirrors = negotiator_rec.mirrors;
443 if(rpc.procid() == 0)
444 memory_info::log_usage(
"Finished sending updating mirrors");
447 ASSERT_EQ(graph.vid2lvid.size(), graph.local_graph.num_vertices());
448 ASSERT_EQ(graph.lvid2record.size(), graph.local_graph.num_vertices());
450 exchange_global_info();
457 void exchange_global_info () {
459 graph.local_own_nverts = 0;
460 foreach(
const vertex_record& record, graph.lvid2record)
461 if(record.owner == rpc.procid()) ++graph.local_own_nverts;
465 <<
"Graph Finalize: exchange global statistics " << std::endl;
468 std::vector<size_t> swap_counts(rpc.numprocs());
469 swap_counts[rpc.procid()] = graph.num_local_edges();
470 rpc.all_gather(swap_counts);
472 foreach(
size_t count, swap_counts) graph.nedges += count;
476 for(
size_t i = 0; i < rpc.procid(); ++i)
477 graph.begin_eid += swap_counts[i];
480 swap_counts[rpc.procid()] = graph.num_local_own_vertices();
481 rpc.all_gather(swap_counts);
483 foreach(
size_t count, swap_counts) graph.nverts += count;
486 swap_counts[rpc.procid()] = graph.num_local_vertices();
487 rpc.all_gather(swap_counts);
489 foreach(
size_t count, swap_counts) graph.nreplicas += count;
492 if (rpc.procid() == 0) {
493 logstream(
LOG_EMPH) <<
"Graph info: "
496 <<
"\n\t nreplicas: " << graph.nreplicas
497 <<
"\n\t replication factor: " << (double)graph.nreplicas/graph.
num_vertices()
513 procid_t vertex_to_proc(
const vertex_id_type vid)
const {
514 return vid % rpc.numprocs();
519 #include <graphlab/macros_undef.hpp>