GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
ingress_edge_decision.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 #ifndef GRAPHLAB_DISTRIBUTED_INGRESS_EDGE_DECISION_HPP
23 #define GRAPHLAB_DISTRIBUTED_INGRESS_EDGE_DECISION_HPP
24 
25 #include <graphlab/graph/graph_basic_types.hpp>
26 #include <graphlab/rpc/distributed_event_log.hpp>
27 #include <graphlab/util/dense_bitset.hpp>
28 #include <graphlab/graph/distributed_graph.hpp>
29 #include <boost/random/uniform_int_distribution.hpp>
30 
31 namespace graphlab {
32  template<typename VertexData, typename EdgeData>
33  class distributed_graph;
34 
35  template<typename VertexData, typename EdgeData>
36  class ingress_edge_decision {
37 
38  public:
40  typedef distributed_graph<VertexData, EdgeData> graph_type;
41  typedef fixed_dense_bitset<RPC_MAX_N_PROCS> bin_counts_type;
42 
43  boost::random::mt19937 gen;
44  boost::random::uniform_int_distribution<> edgernd;
45  static const size_t hashing_seed = 3;
46 
47  public:
48  /** \brief A decision object for computing the edge assingment. */
49  ingress_edge_decision(distributed_control& dc) {
50  }
51 
52 
53  /** Random assign (source, target) to a machine p in {0, ... numprocs-1} */
54  procid_t edge_to_proc_random (const vertex_id_type source,
55  const vertex_id_type target,
56  size_t numprocs) {
57  typedef std::pair<vertex_id_type, vertex_id_type> edge_pair_type;
58  const edge_pair_type edge_pair(std::min(source, target),
59  std::max(source, target));
60  return edge_hashing(edge_pair, hashing_seed) % (numprocs);
61  // return edge_hashing(edge_pair) % (numprocs);
62  // return edgernd(gen) % (numprocs);
63  };
64 
65  /** Random assign (source, target) to a machine p in a list of candidates */
66  procid_t edge_to_proc_random (const vertex_id_type source,
67  const vertex_id_type target,
68  const std::vector<procid_t> & candidates) {
69  typedef std::pair<vertex_id_type, vertex_id_type> edge_pair_type;
70  const edge_pair_type edge_pair(std::min(source, target),
71  std::max(source, target));
72 
73  return candidates[edge_hashing(edge_pair, hashing_seed) % (candidates.size())];
74  // return candidates[edge_hashing(edge_pair) % (candidates.size())];
75  // return candidates[edgernd(gen) % (candidates.size())];
76  };
77 
78 
79  /** Greedy assign (source, target) to a machine using:
80  * bitset<MAX_MACHINE> src_degree : the degree presence of source over machines
81  * bitset<MAX_MACHINE> dst_degree : the degree presence of target over machines
82  * vector<size_t> proc_num_edges : the edge counts over machines
83  * */
84  procid_t edge_to_proc_greedy (const vertex_id_type source,
85  const vertex_id_type target,
86  bin_counts_type& src_degree,
87  bin_counts_type& dst_degree,
88  std::vector<size_t>& proc_num_edges,
89  bool usehash = false,
90  bool userecent = false
91  ) {
92  size_t numprocs = proc_num_edges.size();
93 
94  // Compute the score of each proc.
95  procid_t best_proc = -1;
96  double maxscore = 0.0;
97  double epsilon = 1.0;
98  std::vector<double> proc_score(numprocs);
99  size_t minedges = *std::min_element(proc_num_edges.begin(), proc_num_edges.end());
100  size_t maxedges = *std::max_element(proc_num_edges.begin(), proc_num_edges.end());
101 
102  for (size_t i = 0; i < numprocs; ++i) {
103  size_t sd = src_degree.get(i) + (usehash && (source % numprocs == i));
104  size_t td = dst_degree.get(i) + (usehash && (target % numprocs == i));
105  double bal = (maxedges - proc_num_edges[i])/(epsilon + maxedges - minedges);
106  proc_score[i] = bal + ((sd > 0) + (td > 0));
107  }
108  maxscore = *std::max_element(proc_score.begin(), proc_score.end());
109 
110  std::vector<procid_t> top_procs;
111  for (size_t i = 0; i < numprocs; ++i)
112  if (std::fabs(proc_score[i] - maxscore) < 1e-5)
113  top_procs.push_back(i);
114 
115  // Hash the edge to one of the best procs.
116  typedef std::pair<vertex_id_type, vertex_id_type> edge_pair_type;
117  const edge_pair_type edge_pair(std::min(source, target),
118  std::max(source, target));
119  best_proc = top_procs[edge_hashing(edge_pair) % top_procs.size()];
120 
121  ASSERT_LT(best_proc, numprocs);
122  if (userecent) {
123  src_degree.clear();
124  dst_degree.clear();
125  }
126  src_degree.set_bit(best_proc);
127  dst_degree.set_bit(best_proc);
128  ++proc_num_edges[best_proc];
129  return best_proc;
130  };
131 
132  /** Greedy assign (source, target) to a machine using:
133  * bitset<MAX_MACHINE> src_degree : the degree presence of source over machines
134  * bitset<MAX_MACHINE> dst_degree : the degree presence of target over machines
135  * vector<size_t> proc_num_edges : the edge counts over machines
136  * */
137  procid_t edge_to_proc_greedy (const vertex_id_type source,
138  const vertex_id_type target,
139  bin_counts_type& src_degree,
140  bin_counts_type& dst_degree,
141  std::vector<procid_t>& candidates,
142  std::vector<size_t>& proc_num_edges,
143  bool usehash = false,
144  bool userecent = false
145  ) {
146  size_t numprocs = proc_num_edges.size();
147 
148  // Compute the score of each proc.
149  procid_t best_proc = -1;
150  double maxscore = 0.0;
151  double epsilon = 1.0;
152  std::vector<double> proc_score(candidates.size());
153  size_t minedges = *std::min_element(proc_num_edges.begin(), proc_num_edges.end());
154  size_t maxedges = *std::max_element(proc_num_edges.begin(), proc_num_edges.end());
155 
156  for (size_t j = 0; j < candidates.size(); ++j) {
157  size_t i = candidates[j];
158  size_t sd = src_degree.get(i) + (usehash && (source % numprocs == i));
159  size_t td = dst_degree.get(i) + (usehash && (target % numprocs == i));
160  double bal = (maxedges - proc_num_edges[i])/(epsilon + maxedges - minedges);
161  proc_score[j] = bal + ((sd > 0) + (td > 0));
162  }
163  maxscore = *std::max_element(proc_score.begin(), proc_score.end());
164 
165  std::vector<procid_t> top_procs;
166  for (size_t j = 0; j < candidates.size(); ++j)
167  if (std::fabs(proc_score[j] - maxscore) < 1e-5)
168  top_procs.push_back(candidates[j]);
169 
170  // Hash the edge to one of the best procs.
171  typedef std::pair<vertex_id_type, vertex_id_type> edge_pair_type;
172  const edge_pair_type edge_pair(std::min(source, target),
173  std::max(source, target));
174  best_proc = top_procs[edge_hashing(edge_pair) % top_procs.size()];
175 
176  ASSERT_LT(best_proc, numprocs);
177  if (userecent) {
178  src_degree.clear();
179  dst_degree.clear();
180  }
181  src_degree.set_bit(best_proc);
182  dst_degree.set_bit(best_proc);
183  ++proc_num_edges[best_proc];
184  return best_proc;
185  };
186 
187 
188  ////////////////////// Deprecated ////////////////////////////////////
189 
190  /** Greedy assign (source, target) to a machine using:
191  * vector<size_t> src_degree : the degree counts of source over machines
192  * vector<size_t> dst_degree : the degree counts of target over machines
193  * vector<size_t> proc_num_edges : the edge counts over machines
194  * */
195  procid_t edge_to_proc_greedy (const vertex_id_type source,
196  const vertex_id_type target,
197  std::vector<size_t>& src_degree,
198  std::vector<size_t>& dst_degree,
199  std::vector<size_t>& proc_num_edges,
200  bool usehash = false,
201  bool userecent = false) {
202  size_t numprocs = proc_num_edges.size();
203  if (src_degree.size() == 0)
204  src_degree.resize(numprocs, 0);
205  if (dst_degree.size() == 0)
206  dst_degree.resize(numprocs, 0);
207 
208  // Compute the score of each proc.
209  procid_t best_proc = -1;
210  double maxscore = 0.0;
211  double epsilon = 1.0;
212  std::vector<double> proc_score(numprocs);
213  size_t minedges = *std::min_element(proc_num_edges.begin(), proc_num_edges.end());
214  size_t maxedges = *std::max_element(proc_num_edges.begin(), proc_num_edges.end());
215  for (size_t i = 0; i < numprocs; ++i) {
216  size_t sd = src_degree[i] + (usehash && (source % numprocs == i));
217  size_t td = dst_degree[i] + (usehash && (target % numprocs == i));
218  double bal = (maxedges - proc_num_edges[i])/(epsilon + maxedges - minedges);
219  proc_score[i] = bal + ((sd > 0) + (td > 0));
220  }
221  maxscore = *std::max_element(proc_score.begin(), proc_score.end());
222 
223  std::vector<procid_t> top_procs;
224  for (size_t i = 0; i < numprocs; ++i)
225  if (std::fabs(proc_score[i] - maxscore) < 1e-5)
226  top_procs.push_back(i);
227 
228  if (top_procs.size() > 1) {
229  if (maxscore >= 2) {
230  //PERMANENT_ACCUMULATE_DIST_EVENT(eventlog, EVENT_EDGE_SEEN_BOTH_TIE, 1)
231  } else if (maxscore >= 1) {
232  //PERMANENT_ACCUMULATE_DIST_EVENT(eventlog, EVENT_EDGE_SEEN_ONE_TIE, 1)
233  } else {
234  //PERMANENT_ACCUMULATE_DIST_EVENT(eventlog, EVENT_EDGE_SEEN_NONE_TIE, 1);
235  }
236  } else {
237  if (maxscore >= 2) {
238  //PERMANENT_ACCUMULATE_DIST_EVENT(eventlog, EVENT_EDGE_SEEN_BOTH_UNIQUE, 1);
239  } else if (maxscore >= 1) {
240  //PERMANENT_ACCUMULATE_DIST_EVENT(eventlog, EVENT_EDGE_SEEN_ONE_UNIQUE, 1)
241  } else {
242  //PERMANENT_ACCUMULATE_DIST_EVENT(eventlog, EVENT_EDGE_SEEN_NONE_UNIQUE, 1);
243  }
244  }
245 
246  // Hash the edge to one of the best procs.
247  typedef std::pair<vertex_id_type, vertex_id_type> edge_pair_type;
248  const edge_pair_type edge_pair(std::min(source, target),
249  std::max(source, target));
250  best_proc = top_procs[edge_hashing(edge_pair) % top_procs.size()];
251 
252  ASSERT_LT(best_proc, numprocs);
253 
254  // only counts the recent proc.
255  if (userecent) {
256  src_degree.clear();
257  dst_degree.clear();
258  }
259 
260  ++src_degree[best_proc];
261  ++dst_degree[best_proc];
262  ++proc_num_edges[best_proc];
263  return best_proc;
264  };
265 
266  public:
267  /*
268  * Bob Jenkin's 32 bit integer mix function from
269  * http://home.comcast.net/~bretm/hash/3.html
270  */
271  static size_t mix(size_t state) {
272  state += (state << 12);
273  state ^= (state >> 22);
274  state += (state << 4);
275  state ^= (state >> 9);
276  state += (state << 10);
277  state ^= (state >> 2);
278  state += (state << 7);
279  state ^= (state >> 12);
280  return state;
281  }
282 
283  static size_t edge_hashing (const std::pair<vertex_id_type, vertex_id_type>& e, const uint32_t seed = 5) {
284  // a bunch of random numbers
285 #if (__SIZEOF_PTRDIFF_T__ == 8)
286  static const size_t a[8] = {0x6306AA9DFC13C8E7,
287  0xA8CD7FBCA2A9FFD4,
288  0x40D341EB597ECDDC,
289  0x99CFA1168AF8DA7E,
290  0x7C55BCC3AF531D42,
291  0x1BC49DB0842A21DD,
292  0x2181F03B1DEE299F,
293  0xD524D92CBFEC63E9};
294 #else
295  static const size_t a[8] = {0xFC13C8E7,
296  0xA2A9FFD4,
297  0x597ECDDC,
298  0x8AF8DA7E,
299  0xAF531D42,
300  0x842A21DD,
301  0x1DEE299F,
302  0xBFEC63E9};
303 #endif
304  vertex_id_type src = e.first;
305  vertex_id_type dst = e.second;
306  return (mix(src^a[seed%8]))^(mix(dst^a[(seed+1)%8]));
307  }
308  };// end of ingress_edge_decision
309 }
310 
311 #endif