GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
graph_vertex_join.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 #ifndef GRAPHLAB_GRAPH_JOIN_HPP
24 #define GRAPHLAB_GRAPH_JOIN_HPP
25 #include <utility>
26 #include <boost/unordered_map.hpp>
27 #include <graphlab/util/hopscotch_map.hpp>
28 #include <graphlab/graph/distributed_graph.hpp>
29 #include <graphlab/rpc/dc_dist_object.hpp>
30 namespace graphlab {
31 
32 
33 /**
34  * \brief Provides the ability to pass information between vertices of two
35  * different graphs
36  *
37  * \tparam LeftGraph Type of the left graph
38  * \tparam RightGraph Type of the right graph
39 
40  * The graph_vertex_join class allows information to be passed between
41  * vertices of two different graphs.
42  *
43  * Given two graphs <code>g1</code> and <code>g2</code>, possibly of different
44  * types:
45  *
46  * \code
47  * typedef distributed_graph<VData1, EData1> graph_1_type;
48  * typedef distributed_graph<VData2, EData2> graph_2_type;
49  * graph_1_type g1;
50  * graph_2_type g2;
51  * \endcode
52  *
53  * A graph_vertex_join object can be created:
54  * \code
55  * graph_vertex_type<graph_1_type, graph_2_type> vjoin(dc, g1, g2);
56  * \endcode
57  *
58  * The first argument is the distributed control object. The second argument
59  * shall be referred to as the graph on the "left" side of the join, and the
60  * third argument shall be referred to as the graph on the "right" side of the
61  * join.
62  *
63  * The join operates by having each vertex in both graph emit an integer key.
64  * Vertices with the same key are then combined into the same group. The
65  * semantics of the key depends on the join operation to be performed.
66  * Right now, the only join operation supported is the Left Injective Join
67  * and the Right Injective Join (see below).
68  *
69 *
70  *
71  * ## Left Injective Join
72  * For the left injective join, vertices in the same graph must emit distinct
73  * unique keys. i.e. Each vertex in <code>g1</code> must emit a key which is
74  * different from all other vertices in <code>g1</code>. Vertices on the right
75  * graph are then matched with vertices on the left graph with the same key.
76  * The join operation is then allowed to modify vertices on the left graph
77  * given the data on the vertices of the right graph.
78  *
79  * To emit the keys:
80  * \code
81  * vjoin.prepare_injective_join(left_emit_key, right_emit_key);
82  * \endcode
83  * left_emit_key and right_emit_key are functions (or lambda) with the following
84  * prototype:
85  * \code
86  * size_t left_emit_key(const graph_1_type::vertex_type& vertex);
87  * size_t right_emit_key(const graph_2_type::vertex_type& vertex);
88  * \endcode
89  * They essentially take as a constant argument, the vertex of their respective
90  * graphs, and return an integer key. If the key has value (-1) it does not
91  * participate in the join.
92  * After keys are emitted and prepared with prepare_join, to perform a left
93  * injective join:
94  * \code
95  * vjoin.left_injective_join(join_op);
96  * \endcode
97  * Where join_op is a function with the following prototype:
98  * \code
99  * void join_op(graph_1_type::vertex_type& left_vertex,
100  * const graph_2_type::vertex_data_type right_vertex_data);
101  * \endcode
102  * Note the asymmetry in the arguments: the left vertex is passed as a
103  * vertex_type, while for the right vertex, only the vertex data is accessible.
104  * The function may make modifications on the left vertex.
105  *
106  * The left_injective_join() function must be called by all machines.
107  * As a result, it may be used from within an engine's
108  * \ref graphlab::iengine::add_vertex_aggregator aggregator's finalize
109  * function.
110  *
111  * ### Left Injective Join Example
112  * I have two graphs with identical structure. The left graph has data
113  * \code
114  * struct left_vertex_data {
115  * size_t user_id;
116  * std::string user_name;
117  * std::string email_address;
118  * // ... serializers omitted ...
119  * }
120  * typedef distributed_graph<left_vertex_data, some_edge_data> left_graph_type;
121  * \endcode
122  * However, when the left graph was constructed, there was no email address
123  * information conveniently available, and thus was left blank.
124  *
125  * And the right graph has vertex data:
126  * \code
127  * struct right_vertex_data {
128  * size_t user_id;
129  * std::string email_address;
130  * // ... serializers omitted ...
131  * }
132  * typedef distributed_graph<right_vertex_data, some_edge_data> right_graph_type;
133  * \endcode
134  * which was loaded from some other source, and contains all the email address
135  * information.
136  *
137  * I create emit functions for both graphs :
138  * \code
139  * size_t emit_user_id_field_left(const left_graph_type::vertex_type& vtype) {
140  * return vtype.data().user_id;
141  * }
142  * size_t emit_user_id_field_left(const right_graph_type::vertex_type& vtype) {
143  * return vtype.data().user_id;
144  * }
145  * \endcode
146  *
147  * Create a join object and prepare the join:
148  * \code
149  * graph_vertex_join<left_graph_type, right_graph_type> vjoin(dc,
150  * left_graph,
151  * right_graph);
152  * vjoin.prepare_injective_join(emit_user_id_field_left,
153  * emit_user_id_field_right);
154  * \endcode
155  *
156  * To copy the email address field from the right graph to the left graph:
157  * \code
158  * void join_email_address(left_graph_type::vertex_type& left_vertex,
159  * const right_vertex_data& rvtx) {
160  * left_vertex.data().email_address = rvtx.email_address;
161  * }
162  *
163  * vjoin.left_injective_join(join_email_address);
164  * \endcode
165  *
166  * ## Right Injective Join
167  * The right injective join is similar to the left injective join, but
168  * with types reversed.
169  */
170 template <typename LeftGraph, typename RightGraph>
172  public:
173  /// Type of the left graph
174  typedef LeftGraph left_graph_type;
175  /// Type of the right graph
176  typedef RightGraph right_graph_type;
177  /// Vertex Type of the left graph
178  typedef typename right_graph_type::vertex_type left_vertex_type;
179  /// Vertex Type of the right graph
180  typedef typename left_graph_type::vertex_type right_vertex_type;
181  /// Local Vertex Type of the left graph
182  typedef typename right_graph_type::local_vertex_type left_local_vertex_type;
183  /// Local Vertex Type of the right graph
184  typedef typename left_graph_type::local_vertex_type right_local_vertex_type;
185  /// Vertex Data Type of the left graph
186  typedef typename right_graph_type::vertex_data_type left_data_type;
187  /// Vertex Data Type of the right graph
188  typedef typename left_graph_type::vertex_data_type right_data_type;
189 
191 
192  private:
193  /// Reference to the left graph
194  left_graph_type& left_graph;
195  /// Reference to the right graph
196  right_graph_type& right_graph;
197 
198  struct injective_join_index {
199  std::vector<size_t> vtx_to_key;
201  // we use -1 here to indicate that the vertex is not participating
202  std::vector<procid_t> opposing_join_proc;
203  };
204 
205  injective_join_index left_inj_index, right_inj_index;
206 
207  public:
209  left_graph_type& left,
210  right_graph_type& right):
211  rmi(dc, this), left_graph(left), right_graph(right) { }
212 
213 
214  /**
215  * \brief Initializes the join by associating each vertex with a key
216  *
217  * \tparam LeftEmitKey Type of the left_emit_key parameter. It should
218  * not be necessary to specify this. C++ type inference should be able
219  * to infer this automatically.
220  * \tparam RightEmitKey Type of the right_emit_key parameter. It should
221  * not be necessary to specify this. C++ type inference should be able
222  * to infer this automatically.
223  *
224  * \param left_emit_key A function which takes a vertex_type from the
225  * left graph and emits an integral key value.
226  * Can be a lambda, of the prototype:
227  * size_t left_emit_key(const LeftGraph::vertex_type& vertex);
228  * \param right_emit_key A function which takes a vertex_type from the
229  * right graph and emits an integral key value.
230  * Can be a lambda, of the prototype:
231  * size_t right_emit_key(const RightGraph::vertex_type& vertex);
232  *
233  * The semantics of the key depend on the actual join operation performed.
234  * This function must be called by all machines.
235  *
236  * left_emit_key and right_emit_key are functions (or lambda) with the
237  * following prototype:
238  * \code
239  * size_t left_emit_key(const graph_1_type::vertex_type& vertex);
240  * size_t right_emit_key(const graph_2_type::vertex_type& vertex);
241  * \endcode
242  * They essentially take as a constant argument, the vertex of their
243  * respective graphs, and return an integer key. If a vertex emits the key
244  * (size_t)(-1) it does not participate in the join.
245  *
246  * prepare_injective_join() only needs to be called once. After which an
247  * arbitrary number of left_injective_join() and right_injective_join()
248  * calls may be made.
249  *
250  * If after a join, a new join is to be performed on the same graph using
251  * new data, or new emit functions, prepare_injective_join() can be called
252  * again to recompute the join.
253  */
254  template <typename LeftEmitKey, typename RightEmitKey>
255  void prepare_injective_join(LeftEmitKey left_emit_key,
256  RightEmitKey right_emit_key) {
257  typedef std::pair<size_t, vertex_id_type> key_vertex_pair;
258  // Basically, what we are trying to do is to figure out, for each vertex
259  // on one side of the graph, which vertices for the other graph
260  // (and on on which machines) emitted the same key.
261  //
262  // The target datastructure is:
263  // vtx_to_key[vtx]: The key for each vertex
264  // opposing_join_proc[vtx]: Machines which hold a vertex on the opposing
265  // graph which emitted the same key
266  // key_to_vtx[key] Mapping of keys to vertices.
267 
268  // resize the left index
269  // resize the right index
270 
271  reset_and_fill_injective_index(left_inj_index,
272  left_graph,
273  left_emit_key, "left graph");
274 
275  reset_and_fill_injective_index(right_inj_index,
276  right_graph,
277  right_emit_key, "right graph");
278  rmi.barrier();
279  // now, we need cross join across all machines to figure out the
280  // opposing join proc
281  // we need to do this twice. Once for left, and once for right.
282  compute_injective_join();
283  }
284 
285  /**
286  * \brief Performs an injective join from the right graph to the left graph.
287  *
288  * \tparam JoinOp The type of the joinop function. It should
289  * not be necessary to specify this. C++ type inference should be able
290  * to infer this automatically.
291  *
292  * \param join_op The joining function. May be a function pointer or a
293  * lambda matching the prototype
294  * void join_op(LeftGraph::vertex_type& left_vertex,
295  * const RightGraph::vertex_data_type right_vertex_data);
296  *
297  * prepare_injective_join() must be called before hand.
298  * All machines must call this function. join_op will be called on each
299  * left vertex with the data on a right vertex which emitted the same key
300  * in prepare_injective_join(). The join_op function is allowed to modify
301  * the vertex data on the left graph.
302  */
303  template <typename JoinOp>
304  void left_injective_join(JoinOp join_op) {
305  injective_join(left_inj_index, left_graph,
306  right_inj_index, right_graph,
307  join_op);
308  }
309 
310 
311  /**
312  * \brief Performs an injective join from the left graph to the right graph.
313  *
314  * \tparam JoinOp The type of the joinop function. It should
315  * not be necessary to specify this. C++ type inference should be able
316  * to infer this automatically.
317  *
318  * \param join_op The joining function. May be a function pointer or a
319  * lambda matching the prototype
320  * void join_op(RightGraph::vertex_type& right_vertex,
321  * const LeftGraph::vertex_data_type left_vertex_data);
322  *
323  * prepare_injective_join() must be called before hand.
324  * All machines must call this function. join_op will be called on each
325  * rght vertex with the data on a left vertex which emitted the same key
326  * in prepare_injective_join(). The join_op function is allowed to modify
327  * the vertex data on the right graph.
328  */
329  template <typename JoinOp>
330  void right_injective_join(JoinOp join_op) {
331  injective_join(right_inj_index, right_graph,
332  left_inj_index, left_graph,
333  join_op);
334  }
335 
336  private:
337  template <typename Graph, typename EmitKey>
338  void reset_and_fill_injective_index(injective_join_index& idx,
339  Graph& graph,
340  EmitKey& emit_key,
341  const char* message) {
342  // clear the data
343  idx.vtx_to_key.resize(graph.num_local_vertices());
344  idx.key_to_vtx.clear();
345  idx.opposing_join_proc.resize(graph.num_local_vertices(), (procid_t)(-1));
346  // loop through vertices, get the key and fill vtx_to_key and key_to_vtx
347  for(lvid_type v = 0; v < graph.num_local_vertices(); ++v) {
348  typename Graph::local_vertex_type lv = graph.l_vertex(v);
349  if (lv.owned()) {
350  typename Graph::vertex_type vtx(lv);
351  size_t key = emit_key(vtx);
352  idx.vtx_to_key[v] = key;
353  if (key != (size_t)(-1)) {
354  if (idx.key_to_vtx.count(key) > 0) {
355  logstream(LOG_ERROR) << "Duplicate key in " << message << std::endl;
356  logstream(LOG_ERROR) << "Duplicate keys not permitted" << std::endl;
357  throw "Duplicate Key in Join";
358  }
359  idx.key_to_vtx.insert(std::make_pair(key, v));
360  }
361  }
362  }
363  }
364 
365  void compute_injective_join() {
366  std::vector<std::vector<size_t> > left_keys =
367  get_procs_with_keys(left_inj_index.vtx_to_key, left_graph);
368  std::vector<std::vector<size_t> > right_keys =
369  get_procs_with_keys(right_inj_index.vtx_to_key, right_graph);
370  // now. for each key on the right, I need to figure out which proc it
371  // belongs in. and vice versa. This is actually kind of annoying.
372  // but since it is one-to-one, I only need to make a hash map of one side.
373  hopscotch_map<size_t, procid_t> left_key_to_procs;
374 
375  // construct a hash table of keys to procs
376  // clear frequently to use less memory
377  for (size_t p = 0; p < left_keys.size(); ++p) {
378  for (size_t i = 0; i < left_keys[p].size(); ++i) {
379  ASSERT_MSG(left_key_to_procs.count(left_keys[p][i]) == 0,
380  "Duplicate keys not permitted for left graph keys in injective join");
381  left_key_to_procs.insert(std::make_pair(left_keys[p][i], p));
382  }
383  std::vector<size_t>().swap(left_keys[p]);
384  }
385  left_keys.clear();
386 
387  std::vector<
388  std::vector<
389  std::pair<size_t, procid_t> > > left_match(rmi.numprocs());
390  std::vector<
391  std::vector<
392  std::pair<size_t, procid_t> > > right_match(rmi.numprocs());
393 
394  // now for each key on the right, find the matching key on the left
395  for (size_t p = 0; p < right_keys.size(); ++p) {
396  for (size_t i = 0; i < right_keys[p].size(); ++i) {
397  size_t key = right_keys[p][i];
398  hopscotch_map<size_t, procid_t>::iterator iter =
399  left_key_to_procs.find(key);
400  if (iter != left_key_to_procs.end()) {
401  ASSERT_MSG(iter->second != (procid_t)(-1),
402  "Duplicate keys not permitted for right graph keys in injective join");
403  // we have a match
404  procid_t left_proc = iter->second;
405  procid_t right_proc = p;
406  // now. left has to be told about right and right
407  // has to be told about left
408  left_match[left_proc].push_back(std::make_pair(key, right_proc));
409  right_match[right_proc].push_back(std::make_pair(key, left_proc));
410  // set the map entry to -1
411  // so we know if it is ever reused
412  iter->second = (procid_t)(-1);
413  }
414  }
415  std::vector<size_t>().swap(right_keys[p]);
416  }
417  right_keys.clear();
418 
419  rmi.all_to_all(left_match);
420  rmi.all_to_all(right_match);
421  // fill in the index
422  // go through the left match and set up the opposing index to based
423  // on the match result
424 #ifdef _OPENMP
425 #pragma omp parallel for
426 #endif
427  for (size_t p = 0;p < left_match.size(); ++p) {
428  for (size_t i = 0;i < left_match[p].size(); ++i) {
429  // search for the key in the left index
430  hopscotch_map<size_t, vertex_id_type>::const_iterator iter =
431  left_inj_index.key_to_vtx.find(left_match[p][i].first);
432  ASSERT_TRUE(iter != left_inj_index.key_to_vtx.end());
433  // fill in the match
434  left_inj_index.opposing_join_proc[iter->second] = left_match[p][i].second;
435  }
436  }
437  left_match.clear();
438  // repeat for the right match
439 #ifdef _OPENMP
440 #pragma omp parallel for
441 #endif
442  for (size_t p = 0;p < right_match.size(); ++p) {
443  for (size_t i = 0;i < right_match[p].size(); ++i) {
444  // search for the key in the right index
445  hopscotch_map<size_t, vertex_id_type>::const_iterator iter =
446  right_inj_index.key_to_vtx.find(right_match[p][i].first);
447  ASSERT_TRUE(iter != right_inj_index.key_to_vtx.end());
448  // fill in the match
449  right_inj_index.opposing_join_proc[iter->second] = right_match[p][i].second;
450  }
451  }
452  right_match.clear();
453  // ok done.
454  }
455 
456  // each key is assigned to a controlling machine, who receives
457  // the partial list of keys every other machine owns.
458  template <typename Graph>
459  std::vector<std::vector<size_t> >
460  get_procs_with_keys(const std::vector<size_t>& local_key_list, Graph& g) {
461  // this machine will get all keys from each processor where
462  // key = procid mod numprocs
463  std::vector<std::vector<size_t> > procs_with_keys(rmi.numprocs());
464  for (size_t i = 0; i < local_key_list.size(); ++i) {
465  if (g.l_vertex(i).owned() && local_key_list[i] != (size_t)(-1)) {
466  procid_t target_procid = local_key_list[i] % rmi.numprocs();
467  procs_with_keys[target_procid].push_back(local_key_list[i]);
468  }
469  }
470  rmi.all_to_all(procs_with_keys);
471  return procs_with_keys;
472  }
473 
474  template <typename TargetGraph, typename SourceGraph, typename JoinOp>
475  void injective_join(injective_join_index& target,
476  TargetGraph& target_graph,
477  injective_join_index& source,
478  SourceGraph& source_graph,
479  JoinOp joinop) {
480  // build up the exchange structure.
481  // move right vertex data to left
482  std::vector<
483  std::vector<
484  std::pair<size_t, typename SourceGraph::vertex_data_type> > >
485  source_data(rmi.numprocs());
486 
487  for (size_t i = 0; i < source.opposing_join_proc.size(); ++i) {
488  if (source_graph.l_vertex(i).owned()) {
489  procid_t target_proc = source.opposing_join_proc[i];
490  if (target_proc >= 0 && target_proc < rmi.numprocs()) {
491  source_data[target_proc].push_back(
492  std::make_pair(source.vtx_to_key[i],
493  source_graph.l_vertex(i).data()));
494  }
495  }
496  }
497  // exchange
498  rmi.all_to_all(source_data);
499  // ok. now join against left
500 #ifdef _OPENMP
501 #pragma omp parallel for
502 #endif
503  for (size_t p = 0;p < source_data.size(); ++p) {
504  for (size_t i = 0;i < source_data[p].size(); ++i) {
505  // find the target vertex with the matching key
506  hopscotch_map<size_t, vertex_id_type>::const_iterator iter =
507  target.key_to_vtx.find(source_data[p][i].first);
508  ASSERT_TRUE(iter != target.key_to_vtx.end());
509  // found it!
510  typename TargetGraph::local_vertex_type
511  lvtx = target_graph.l_vertex(iter->second);
512  typename TargetGraph::vertex_type vtx(lvtx);
513  joinop(vtx, source_data[p][i].second);
514  }
515  }
516  target_graph.synchronize();
517  }
518 };
519 
520 } // namespace graphlab
521 
522 #endif