22 #ifndef GRAPHLAB_DISTRIBUTED_INGRESS_EDGE_DECISION_HPP
23 #define GRAPHLAB_DISTRIBUTED_INGRESS_EDGE_DECISION_HPP
25 #include <graphlab/graph/graph_basic_types.hpp>
26 #include <graphlab/rpc/distributed_event_log.hpp>
27 #include <graphlab/util/dense_bitset.hpp>
28 #include <graphlab/graph/distributed_graph.hpp>
29 #include <boost/random/uniform_int_distribution.hpp>
32 template<
typename VertexData,
typename EdgeData>
33 class distributed_graph;
35 template<
typename VertexData,
typename EdgeData>
36 class ingress_edge_decision {
40 typedef distributed_graph<VertexData, EdgeData>
graph_type;
41 typedef fixed_dense_bitset<RPC_MAX_N_PROCS> bin_counts_type;
43 boost::random::mt19937 gen;
44 boost::random::uniform_int_distribution<> edgernd;
45 static const size_t hashing_seed = 3;
49 ingress_edge_decision(distributed_control& dc) {
57 typedef std::pair<vertex_id_type, vertex_id_type> edge_pair_type;
58 const edge_pair_type edge_pair(std::min(source, target),
59 std::max(source, target));
60 return edge_hashing(edge_pair, hashing_seed) % (numprocs);
68 const std::vector<procid_t> & candidates) {
69 typedef std::pair<vertex_id_type, vertex_id_type> edge_pair_type;
70 const edge_pair_type edge_pair(std::min(source, target),
71 std::max(source, target));
73 return candidates[edge_hashing(edge_pair, hashing_seed) % (candidates.size())];
86 bin_counts_type& src_degree,
87 bin_counts_type& dst_degree,
88 std::vector<size_t>& proc_num_edges,
90 bool userecent =
false
92 size_t numprocs = proc_num_edges.size();
96 double maxscore = 0.0;
98 std::vector<double> proc_score(numprocs);
99 size_t minedges = *std::min_element(proc_num_edges.begin(), proc_num_edges.end());
100 size_t maxedges = *std::max_element(proc_num_edges.begin(), proc_num_edges.end());
102 for (
size_t i = 0; i < numprocs; ++i) {
103 size_t sd = src_degree.get(i) + (usehash && (source % numprocs == i));
104 size_t td = dst_degree.get(i) + (usehash && (target % numprocs == i));
105 double bal = (maxedges - proc_num_edges[i])/(epsilon + maxedges - minedges);
106 proc_score[i] = bal + ((sd > 0) + (td > 0));
108 maxscore = *std::max_element(proc_score.begin(), proc_score.end());
110 std::vector<procid_t> top_procs;
111 for (
size_t i = 0; i < numprocs; ++i)
112 if (std::fabs(proc_score[i] - maxscore) < 1e-5)
113 top_procs.push_back(i);
116 typedef std::pair<vertex_id_type, vertex_id_type> edge_pair_type;
117 const edge_pair_type edge_pair(std::min(source, target),
118 std::max(source, target));
119 best_proc = top_procs[edge_hashing(edge_pair) % top_procs.size()];
121 ASSERT_LT(best_proc, numprocs);
126 src_degree.set_bit(best_proc);
127 dst_degree.set_bit(best_proc);
128 ++proc_num_edges[best_proc];
139 bin_counts_type& src_degree,
140 bin_counts_type& dst_degree,
141 std::vector<procid_t>& candidates,
142 std::vector<size_t>& proc_num_edges,
143 bool usehash =
false,
144 bool userecent =
false
146 size_t numprocs = proc_num_edges.size();
150 double maxscore = 0.0;
151 double epsilon = 1.0;
152 std::vector<double> proc_score(candidates.size());
153 size_t minedges = *std::min_element(proc_num_edges.begin(), proc_num_edges.end());
154 size_t maxedges = *std::max_element(proc_num_edges.begin(), proc_num_edges.end());
156 for (
size_t j = 0; j < candidates.size(); ++j) {
157 size_t i = candidates[j];
158 size_t sd = src_degree.get(i) + (usehash && (source % numprocs == i));
159 size_t td = dst_degree.get(i) + (usehash && (target % numprocs == i));
160 double bal = (maxedges - proc_num_edges[i])/(epsilon + maxedges - minedges);
161 proc_score[j] = bal + ((sd > 0) + (td > 0));
163 maxscore = *std::max_element(proc_score.begin(), proc_score.end());
165 std::vector<procid_t> top_procs;
166 for (
size_t j = 0; j < candidates.size(); ++j)
167 if (std::fabs(proc_score[j] - maxscore) < 1e-5)
168 top_procs.push_back(candidates[j]);
171 typedef std::pair<vertex_id_type, vertex_id_type> edge_pair_type;
172 const edge_pair_type edge_pair(std::min(source, target),
173 std::max(source, target));
174 best_proc = top_procs[edge_hashing(edge_pair) % top_procs.size()];
176 ASSERT_LT(best_proc, numprocs);
181 src_degree.set_bit(best_proc);
182 dst_degree.set_bit(best_proc);
183 ++proc_num_edges[best_proc];
197 std::vector<size_t>& src_degree,
198 std::vector<size_t>& dst_degree,
199 std::vector<size_t>& proc_num_edges,
200 bool usehash =
false,
201 bool userecent =
false) {
202 size_t numprocs = proc_num_edges.size();
203 if (src_degree.size() == 0)
204 src_degree.resize(numprocs, 0);
205 if (dst_degree.size() == 0)
206 dst_degree.resize(numprocs, 0);
210 double maxscore = 0.0;
211 double epsilon = 1.0;
212 std::vector<double> proc_score(numprocs);
213 size_t minedges = *std::min_element(proc_num_edges.begin(), proc_num_edges.end());
214 size_t maxedges = *std::max_element(proc_num_edges.begin(), proc_num_edges.end());
215 for (
size_t i = 0; i < numprocs; ++i) {
216 size_t sd = src_degree[i] + (usehash && (source % numprocs == i));
217 size_t td = dst_degree[i] + (usehash && (target % numprocs == i));
218 double bal = (maxedges - proc_num_edges[i])/(epsilon + maxedges - minedges);
219 proc_score[i] = bal + ((sd > 0) + (td > 0));
221 maxscore = *std::max_element(proc_score.begin(), proc_score.end());
223 std::vector<procid_t> top_procs;
224 for (
size_t i = 0; i < numprocs; ++i)
225 if (std::fabs(proc_score[i] - maxscore) < 1e-5)
226 top_procs.push_back(i);
228 if (top_procs.size() > 1) {
231 }
else if (maxscore >= 1) {
239 }
else if (maxscore >= 1) {
247 typedef std::pair<vertex_id_type, vertex_id_type> edge_pair_type;
248 const edge_pair_type edge_pair(std::min(source, target),
249 std::max(source, target));
250 best_proc = top_procs[edge_hashing(edge_pair) % top_procs.size()];
252 ASSERT_LT(best_proc, numprocs);
260 ++src_degree[best_proc];
261 ++dst_degree[best_proc];
262 ++proc_num_edges[best_proc];
271 static size_t mix(
size_t state) {
272 state += (state << 12);
273 state ^= (state >> 22);
274 state += (state << 4);
275 state ^= (state >> 9);
276 state += (state << 10);
277 state ^= (state >> 2);
278 state += (state << 7);
279 state ^= (state >> 12);
283 static size_t edge_hashing (
const std::pair<vertex_id_type, vertex_id_type>& e,
const uint32_t
seed = 5) {
285 #if (__SIZEOF_PTRDIFF_T__ == 8)
286 static const size_t a[8] = {0x6306AA9DFC13C8E7,
295 static const size_t a[8] = {0xFC13C8E7,
306 return (mix(src^a[
seed%8]))^(mix(dst^a[(seed+1)%8]));