23 #ifndef GRAPHLAB_DISTRIBUTED_BATCH_INGRESS_HPP
24 #define GRAPHLAB_DISTRIBUTED_BATCH_INGRESS_HPP
26 #include <boost/unordered_set.hpp>
27 #include <graphlab/graph/graph_basic_types.hpp>
28 #include <graphlab/graph/ingress/idistributed_ingress.hpp>
29 #include <graphlab/graph/ingress/distributed_ingress_base.hpp>
30 #include <graphlab/graph/distributed_graph.hpp>
31 #include <graphlab/rpc/buffered_exchange.hpp>
32 #include <graphlab/rpc/distributed_event_log.hpp>
33 #include <graphlab/util/dense_bitset.hpp>
34 #include <graphlab/macros_def.hpp>
36 template<
typename VertexData,
typename EdgeData>
37 class distributed_graph;
39 template<
typename VertexData,
typename EdgeData>
40 class distributed_batch_ingress :
41 public distributed_ingress_base<VertexData, EdgeData> {
43 typedef distributed_graph<VertexData, EdgeData>
graph_type;
45 typedef VertexData vertex_data_type;
47 typedef EdgeData edge_data_type;
49 typedef typename graph_type::vertex_record vertex_record;
52 dc_dist_object<distributed_batch_ingress> rpc;
53 typedef distributed_ingress_base<VertexData, EdgeData> base_type;
55 mutex local_graph_lock;
56 mutex lvid2record_lock;
58 typedef fixed_dense_bitset<RPC_MAX_N_PROCS> bin_counts_type;
62 typedef typename boost::unordered_map<vertex_id_type, bin_counts_type>
63 dht_degree_table_type;
66 std::vector<bin_counts_type > dht_degree_table;
71 size_t idx = (vid - rpc.procid()) / rpc.numprocs();
72 if (dht_degree_table.size() <= idx) {
73 dht_degree_table_lock.unlock();
74 dht_degree_table_lock.writelock();
76 if (dht_degree_table.size() <= idx) {
77 size_t newsize = std::max(dht_degree_table.size() * 2, idx + 1);
78 dht_degree_table.resize(newsize);
80 dht_degree_table_lock.unlock();
81 dht_degree_table_lock.readlock();
85 rwlock dht_degree_table_lock;
92 std::vector<std::pair<vertex_id_type, vertex_id_type> > edgesend;
93 std::vector<EdgeData> edatasend;
96 std::vector<boost::unordered_set<vertex_id_type> > query_set;
98 std::vector<size_t> proc_num_edges;
100 DECLARE_TRACER(batch_ingress_add_edge);
101 DECLARE_TRACER(batch_ingress_add_edges);
102 DECLARE_TRACER(batch_ingress_compute_assignments);
103 DECLARE_TRACER(batch_ingress_request_degree_table);
104 DECLARE_TRACER(batch_ingress_get_degree_table);
105 DECLARE_TRACER(batch_ingress_update_degree_table);
112 distributed_batch_ingress(distributed_control& dc,
graph_type& graph,
113 size_t bufsize = 50000,
bool usehash =
false,
bool userecent =
false) :
114 base_type(dc, graph), rpc(dc, this),
115 num_edges(0), bufsize(bufsize), query_set(dc.numprocs()),
116 proc_num_edges(dc.numprocs()), usehash(usehash), userecent(userecent) {
119 INITIALIZE_TRACER(batch_ingress_add_edge,
"Time spent in add edge");
120 INITIALIZE_TRACER(batch_ingress_add_edges,
"Time spent in add block edges" );
121 INITIALIZE_TRACER(batch_ingress_compute_assignments,
"Time spent in compute assignment");
122 INITIALIZE_TRACER(batch_ingress_request_degree_table,
"Time spent in requesting assignment");
123 INITIALIZE_TRACER(batch_ingress_get_degree_table,
"Time spent in retrieve degree table");
124 INITIALIZE_TRACER(batch_ingress_update_degree_table,
"Time spent in update degree table");
129 BEGIN_TRACEPOINT(batch_ingress_add_edge);
130 edgesend_lock.lock();
131 ASSERT_LT(edgesend.size(), bufsize);
132 edgesend.push_back(std::make_pair(source, target));
133 edatasend.push_back(edata);
134 query_set[base_type::vertex_to_proc(source)].insert(source);
135 query_set[base_type::vertex_to_proc(target)].insert(target);
137 edgesend_lock.unlock();
138 END_TRACEPOINT(batch_ingress_add_edge);
139 if (is_full()) flush();
147 base_type::finalize();
155 void add_edges(
const std::vector<vertex_id_type>& source_arr,
156 const std::vector<vertex_id_type>& target_arr,
157 const std::vector<EdgeData>& edata_arr) {
159 BEGIN_TRACEPOINT(batch_ingress_add_edges);
160 ASSERT_TRUE((source_arr.size() == target_arr.size())
161 && (source_arr.size() == edata_arr.size()));
162 if (source_arr.size() == 0)
return;
164 std::vector<lvid_type> local_source_arr;
165 local_source_arr.reserve(source_arr.size());
166 std::vector<lvid_type> local_target_arr;
167 local_target_arr.reserve(source_arr.size());
170 std::vector<std::vector<vertex_id_type> > local_degree_count(rpc.numprocs());
174 lvid2record_lock.lock();
176 for (
size_t i = 0; i < source_arr.size(); ++i) {
183 typedef typename cuckoo_map_pow2<vertex_id_type, lvid_type, 3, uint32_t>::iterator
187 iter = base_type::graph.vid2lvid.find(source);
188 if (iter == base_type::graph.vid2lvid.end()) {
189 lvid_source = base_type::graph.vid2lvid.size();
190 base_type::graph.vid2lvid[source]=lvid_source;
191 base_type::graph.lvid2record.push_back(vertex_record(source));
193 lvid_source = iter->second;
196 iter = base_type::graph.vid2lvid.find(target);
197 if (iter == base_type::graph.vid2lvid.end()) {
198 lvid_target = base_type::graph.vid2lvid.size();
199 base_type::graph.vid2lvid[target]=lvid_target;
200 base_type::graph.lvid2record.push_back(vertex_record(target));
202 lvid_target = iter->second;
205 local_source_arr.push_back(lvid_source);
206 local_target_arr.push_back(lvid_target);
207 max_lvid = std::max(std::max(lvid_source, lvid_target),
210 local_degree_count[base_type::vertex_to_proc(source)].push_back(source);
211 local_degree_count[base_type::vertex_to_proc(target)].push_back(target);
213 lvid2record_lock.unlock();
216 for (
size_t i = 0; i < rpc.numprocs(); ++i) {
217 if (i != rpc.procid()) {
219 &distributed_batch_ingress::block_add_degree_counts,
221 local_degree_count[i]);
223 block_add_degree_counts(rpc.procid(), local_degree_count[i]);
225 local_degree_count[i].clear();
229 local_graph_lock.lock();
230 if (max_lvid >= base_type::graph.local_graph.num_vertices()) {
232 base_type::graph.local_graph.resize(max_lvid + 1);
234 base_type::graph.local_graph.add_edges(local_source_arr,
235 local_target_arr, edata_arr);
236 local_graph_lock.unlock();
238 END_TRACEPOINT(batch_ingress_add_edges);
242 void block_add_degree_counts (
procid_t pid, std::vector<vertex_id_type>& whohas) {
243 BEGIN_TRACEPOINT(batch_ingress_update_degree_table);
244 dht_degree_table_lock.readlock();
246 size_t idx = vid_to_dht_entry_with_readlock(vid);
247 dht_degree_table[idx].set_bit_unsync(pid);
249 dht_degree_table_lock.unlock();
250 END_TRACEPOINT(batch_ingress_update_degree_table);
254 dht_degree_table_type
255 block_get_degree_table(
const boost::unordered_set<vertex_id_type>& vid_query) {
256 BEGIN_TRACEPOINT(batch_ingress_get_degree_table);
257 dht_degree_table_type answer;
258 dht_degree_table_lock.readlock();
260 answer[qvid] = dht_degree_table[vid_to_dht_entry_with_readlock(qvid)];
262 dht_degree_table_lock.unlock();
263 END_TRACEPOINT(batch_ingress_get_degree_table);
269 void assign_edges(std::vector<std::vector<vertex_id_type> >& proc_src,
270 std::vector<std::vector<vertex_id_type> >& proc_dst,
271 std::vector<std::vector<EdgeData> >& proc_edata) {
272 ASSERT_EQ(num_edges, edgesend.size());
274 edgesend_lock.lock();
276 if (num_edges == 0) {
277 edgesend_lock.unlock();
280 BEGIN_TRACEPOINT(batch_ingress_request_degree_table);
281 std::vector<dht_degree_table_type> degree_table(rpc.numprocs());
284 for (
size_t i = 0; i < rpc.numprocs(); ++i) {
285 if (i == rpc.procid()) {
286 degree_table[i] = block_get_degree_table(query_set[i]);
289 rpc.remote_request(i,
290 &distributed_batch_ingress::block_get_degree_table,
293 query_set[i].clear();
295 END_TRACEPOINT(batch_ingress_request_degree_table);
298 for (
size_t i = 0; i < num_edges; ++i) {
299 std::pair<vertex_id_type, vertex_id_type>& e =
302 BEGIN_TRACEPOINT(batch_ingress_compute_assignments);
303 size_t src_proc = base_type::vertex_to_proc(e.first);
304 size_t dst_proc = base_type::vertex_to_proc(e.second);
305 bin_counts_type& src_degree = degree_table[src_proc][e.first];
306 bin_counts_type& dst_degree = degree_table[dst_proc][e.second];
307 procid_t proc = base_type::edge_decision.edge_to_proc_greedy(e.first, e.second,
308 src_degree, dst_degree, proc_num_edges, usehash, userecent);
309 END_TRACEPOINT(batch_ingress_compute_assignments);
311 ASSERT_LT(proc, proc_src.size());
312 proc_src[proc].push_back(e.first);
313 proc_dst[proc].push_back(e.second);
314 proc_edata[proc].push_back(edatasend[i]);
320 edgesend_lock.unlock();
325 std::vector< std::vector<vertex_id_type> > proc_src(rpc.numprocs());
326 std::vector< std::vector<vertex_id_type> > proc_dst(rpc.numprocs());
327 std::vector< std::vector<EdgeData> > proc_edata(rpc.numprocs());
328 assign_edges(proc_src, proc_dst, proc_edata);
329 for (
size_t i = 0; i < proc_src.size(); ++i) {
330 if (proc_src[i].size() == 0)
332 if (i == rpc.procid()) {
333 add_edges(proc_src[i], proc_dst[i], proc_edata[i]);
334 num_edges -= proc_src[i].size();
336 rpc.remote_call(i, &distributed_batch_ingress::add_edges,
337 proc_src[i], proc_dst[i], proc_edata[i]);
338 num_edges -= proc_src[i].size();
344 size_t size() {
return num_edges; }
347 bool is_full() {
return size() >= bufsize; }
351 #include <graphlab/macros_undef.hpp>