23 #ifndef GRAPHLAB_DISTRIBUTED_CONSTRAINED_BATCH_INGRESS_HPP
24 #define GRAPHLAB_DISTRIBUTED_CONSTRAINED_BATCH_INGRESS_HPP
26 #include <boost/unordered_set.hpp>
27 #include <graphlab/graph/graph_basic_types.hpp>
28 #include <graphlab/graph/ingress/idistributed_ingress.hpp>
29 #include <graphlab/graph/ingress/distributed_ingress_base.hpp>
30 #include <graphlab/graph/distributed_graph.hpp>
31 #include <graphlab/rpc/buffered_exchange.hpp>
32 #include <graphlab/rpc/distributed_event_log.hpp>
33 #include <graphlab/util/dense_bitset.hpp>
34 #include <graphlab/graph/ingress/sharding_constraint.hpp>
35 #include <graphlab/macros_def.hpp>
37 template<
typename VertexData,
typename EdgeData>
38 class distributed_graph;
40 template<
typename VertexData,
typename EdgeData>
41 class distributed_constrained_batch_ingress :
42 public distributed_ingress_base<VertexData, EdgeData> {
44 typedef distributed_graph<VertexData, EdgeData>
graph_type;
46 typedef VertexData vertex_data_type;
48 typedef EdgeData edge_data_type;
50 typedef typename graph_type::vertex_record vertex_record;
53 dc_dist_object<distributed_constrained_batch_ingress> rpc;
54 typedef distributed_ingress_base<VertexData, EdgeData> base_type;
56 mutex local_graph_lock;
57 mutex lvid2record_lock;
59 typedef fixed_dense_bitset<RPC_MAX_N_PROCS> bin_counts_type;
63 typedef typename boost::unordered_map<vertex_id_type, bin_counts_type>
64 dht_degree_table_type;
67 std::vector<bin_counts_type > dht_degree_table;
72 size_t idx = (vid - rpc.procid()) / rpc.numprocs();
73 if (dht_degree_table.size() <= idx) {
74 dht_degree_table_lock.unlock();
75 dht_degree_table_lock.writelock();
77 if (dht_degree_table.size() <= idx) {
78 size_t newsize = std::max(dht_degree_table.size() * 2, idx + 1);
79 dht_degree_table.resize(newsize);
81 dht_degree_table_lock.unlock();
82 dht_degree_table_lock.readlock();
86 rwlock dht_degree_table_lock;
93 std::vector<std::pair<vertex_id_type, vertex_id_type> > edgesend;
94 std::vector<EdgeData> edatasend;
97 std::vector<boost::unordered_set<vertex_id_type> > query_set;
99 std::vector<size_t> proc_num_edges;
105 sharding_constraint* constraint;
106 boost::hash<vertex_id_type> hashvid;
109 distributed_constrained_batch_ingress(distributed_control& dc,
graph_type& graph,
110 size_t bufsize = 50000,
bool usehash =
false,
bool userecent =
false) :
111 base_type(dc, graph), rpc(dc, this),
112 num_edges(0), bufsize(bufsize), query_set(dc.numprocs()),
113 proc_num_edges(dc.numprocs()), usehash(usehash), userecent(userecent) {
114 constraint =
new sharding_constraint(dc.numprocs(),
"grid");
117 ~distributed_constrained_batch_ingress() {
123 edgesend_lock.lock();
124 ASSERT_LT(edgesend.size(), bufsize);
125 edgesend.push_back(std::make_pair(source, target));
126 edatasend.push_back(edata);
127 query_set[base_type::vertex_to_proc(source)].insert(source);
128 query_set[base_type::vertex_to_proc(target)].insert(target);
130 edgesend_lock.unlock();
131 if (is_full()) flush();
139 base_type::finalize();
147 void add_edges(
const std::vector<vertex_id_type>& source_arr,
148 const std::vector<vertex_id_type>& target_arr,
149 const std::vector<EdgeData>& edata_arr) {
151 ASSERT_TRUE((source_arr.size() == target_arr.size())
152 && (source_arr.size() == edata_arr.size()));
153 if (source_arr.size() == 0)
return;
155 std::vector<lvid_type> local_source_arr;
156 local_source_arr.reserve(source_arr.size());
157 std::vector<lvid_type> local_target_arr;
158 local_target_arr.reserve(source_arr.size());
161 std::vector<std::vector<vertex_id_type> > local_degree_count(rpc.numprocs());
165 lvid2record_lock.lock();
167 for (
size_t i = 0; i < source_arr.size(); ++i) {
174 typedef typename cuckoo_map_pow2<vertex_id_type, lvid_type, 3, uint32_t>::iterator
178 iter = base_type::graph.vid2lvid.find(source);
179 if (iter == base_type::graph.vid2lvid.end()) {
180 lvid_source = base_type::graph.vid2lvid.size();
181 base_type::graph.vid2lvid[source]=lvid_source;
182 base_type::graph.lvid2record.push_back(vertex_record(source));
184 lvid_source = iter->second;
187 iter = base_type::graph.vid2lvid.find(target);
188 if (iter == base_type::graph.vid2lvid.end()) {
189 lvid_target = base_type::graph.vid2lvid.size();
190 base_type::graph.vid2lvid[target]=lvid_target;
191 base_type::graph.lvid2record.push_back(vertex_record(target));
193 lvid_target = iter->second;
196 local_source_arr.push_back(lvid_source);
197 local_target_arr.push_back(lvid_target);
198 max_lvid = std::max(std::max(lvid_source, lvid_target),
201 local_degree_count[base_type::vertex_to_proc(source)].push_back(source);
202 local_degree_count[base_type::vertex_to_proc(target)].push_back(target);
204 lvid2record_lock.unlock();
207 for (
size_t i = 0; i < rpc.numprocs(); ++i) {
208 if (i != rpc.procid()) {
210 &distributed_constrained_batch_ingress::block_add_degree_counts,
212 local_degree_count[i]);
214 block_add_degree_counts(rpc.procid(), local_degree_count[i]);
216 local_degree_count[i].clear();
220 local_graph_lock.lock();
221 if (max_lvid >= base_type::graph.local_graph.num_vertices()) {
223 base_type::graph.local_graph.resize(max_lvid + 1);
225 base_type::graph.local_graph.add_edges(local_source_arr,
226 local_target_arr, edata_arr);
227 local_graph_lock.unlock();
232 void block_add_degree_counts (
procid_t pid, std::vector<vertex_id_type>& whohas) {
233 dht_degree_table_lock.readlock();
235 size_t idx = vid_to_dht_entry_with_readlock(vid);
236 dht_degree_table[idx].set_bit_unsync(pid);
238 dht_degree_table_lock.unlock();
242 dht_degree_table_type
243 block_get_degree_table(
const boost::unordered_set<vertex_id_type>& vid_query) {
244 dht_degree_table_type answer;
245 dht_degree_table_lock.readlock();
247 answer[qvid] = dht_degree_table[vid_to_dht_entry_with_readlock(qvid)];
249 dht_degree_table_lock.unlock();
255 void assign_edges(std::vector<std::vector<vertex_id_type> >& proc_src,
256 std::vector<std::vector<vertex_id_type> >& proc_dst,
257 std::vector<std::vector<EdgeData> >& proc_edata) {
258 ASSERT_EQ(num_edges, edgesend.size());
260 edgesend_lock.lock();
262 if (num_edges == 0) {
263 edgesend_lock.unlock();
266 std::vector<dht_degree_table_type> degree_table(rpc.numprocs());
269 for (
size_t i = 0; i < rpc.numprocs(); ++i) {
270 if (i == rpc.procid()) {
271 degree_table[i] = block_get_degree_table(query_set[i]);
274 rpc.remote_request(i,
275 &distributed_constrained_batch_ingress::block_get_degree_table,
278 query_set[i].clear();
282 for (
size_t i = 0; i < num_edges; ++i) {
283 std::pair<vertex_id_type, vertex_id_type>& e =
286 size_t src_proc = base_type::vertex_to_proc(e.first);
287 size_t dst_proc = base_type::vertex_to_proc(e.second);
288 bin_counts_type& src_degree = degree_table[src_proc][e.first];
289 bin_counts_type& dst_degree = degree_table[dst_proc][e.second];
291 std::vector<procid_t> candidates;
292 constraint->get_joint_neighbors(get_master(e.first), get_master(e.second), candidates);
294 procid_t proc = base_type::edge_decision.edge_to_proc_greedy(e.first, e.second,
295 src_degree, dst_degree, candidates, proc_num_edges, usehash, userecent);
297 ASSERT_LT(proc, proc_src.size());
298 proc_src[proc].push_back(e.first);
299 proc_dst[proc].push_back(e.second);
300 proc_edata[proc].push_back(edatasend[i]);
306 edgesend_lock.unlock();
311 std::vector< std::vector<vertex_id_type> > proc_src(rpc.numprocs());
312 std::vector< std::vector<vertex_id_type> > proc_dst(rpc.numprocs());
313 std::vector< std::vector<EdgeData> > proc_edata(rpc.numprocs());
314 assign_edges(proc_src, proc_dst, proc_edata);
315 for (
size_t i = 0; i < proc_src.size(); ++i) {
316 if (proc_src[i].size() == 0)
318 if (i == rpc.procid()) {
319 add_edges(proc_src[i], proc_dst[i], proc_edata[i]);
320 num_edges -= proc_src[i].size();
322 rpc.remote_call(i, &distributed_constrained_batch_ingress::add_edges,
323 proc_src[i], proc_dst[i], proc_edata[i]);
324 num_edges -= proc_src[i].size();
330 size_t size() {
return num_edges; }
333 bool is_full() {
return size() >= bufsize; }
337 return hashvid(vid) % base_type::rpc.numprocs();
345 #include <graphlab/macros_undef.hpp>