GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
distributed_graph.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 #ifndef GRAPHLAB_DISTRIBUTED_GRAPH_HPP
24 #define GRAPHLAB_DISTRIBUTED_GRAPH_HPP
25 
26 #ifndef __NO_OPENMP__
27 #include <omp.h>
28 #endif
29 
30 #include <cmath>
31 
32 #include <string>
33 #include <list>
34 #include <vector>
35 #include <set>
36 #include <map>
37 #include <graphlab/util/dense_bitset.hpp>
38 
39 
40 #include <queue>
41 #include <algorithm>
42 #include <functional>
43 #include <fstream>
44 #include <sstream>
45 
46 #include <boost/functional.hpp>
47 #include <boost/algorithm/string/predicate.hpp>
48 #include <boost/iostreams/stream.hpp>
49 #include <boost/iostreams/filtering_streambuf.hpp>
50 #include <boost/iostreams/filtering_stream.hpp>
51 #include <boost/iostreams/copy.hpp>
52 #include <boost/iostreams/filter/gzip.hpp>
53 #include <boost/filesystem.hpp>
54 #include <boost/concept/requires.hpp>
55 
56 
58 #include <graphlab/logger/assertions.hpp>
59 
60 #include <graphlab/rpc/dc.hpp>
61 #include <graphlab/rpc/dc_dist_object.hpp>
62 #include <graphlab/rpc/buffered_exchange.hpp>
63 #include <graphlab/util/random.hpp>
64 #include <graphlab/util/branch_hints.hpp>
65 #include <graphlab/util/generics/conditional_addition_wrapper.hpp>
66 
67 #include <graphlab/options/graphlab_options.hpp>
68 #include <graphlab/serialization/serialization_includes.hpp>
69 #include <graphlab/vertex_program/op_plus_eq_concept.hpp>
70 
71 #include <graphlab/graph/local_graph.hpp>
72 #include <graphlab/graph/ingress/idistributed_ingress.hpp>
73 #include <graphlab/graph/ingress/distributed_ingress_base.hpp>
74 #include <graphlab/graph/ingress/distributed_batch_ingress.hpp>
75 #include <graphlab/graph/ingress/distributed_oblivious_ingress.hpp>
76 #include <graphlab/graph/ingress/distributed_random_ingress.hpp>
77 #include <graphlab/graph/ingress/distributed_identity_ingress.hpp>
78 
79 #include <graphlab/graph/ingress/sharding_constraint.hpp>
80 #include <graphlab/graph/ingress/distributed_constrained_random_ingress.hpp>
81 
82 
83 #include <graphlab/util/cuckoo_map_pow2.hpp>
84 
85 #include <graphlab/util/fs_util.hpp>
86 #include <graphlab/util/hdfs.hpp>
87 
88 
89 #include <graphlab/graph/builtin_parsers.hpp>
90 #include <graphlab/graph/json_parser.hpp>
91 #include <graphlab/graph/vertex_set.hpp>
92 
93 #include <graphlab/macros_def.hpp>
94 namespace graphlab {
95 
96  /** \brief A directed graph datastructure which is distributed across
97  * multiple machines.
98  *
99  * This class implements a distributed directed graph datastructure where
100  * vertices and edges may contain arbitrary user-defined datatypes as
101  * templatized by the VertexData and EdgeData template parameters.
102  *
103  * ### Initialization
104  *
105  * To declare a distributed graph you write:
106  * \code typedef
107  * graphlab::distributed_graph<vdata, edata> graph_type;
108  * graph_type graph(dc, clopts);
109  * \endcode
110  * where <code>vdata</code> is the type of data to be
111  * stored on vertices, and <code>edata</code> is the type of data to be
112  * stored on edges. The constructor must be called simultaneously on all
113  * machines. <code>dc</code> is a graphlab::distributed_control object that
114  * must be constructed at the start of the program, and clopts is a
115  * graphlab::graphlab_options object that is used to pass graph
116  * construction runtime options to the graph. See the code examples for
117  * further details.
118  *
119  * Each vertex is uniquely identified by an unsigned numeric ID of the type
120  * graphlab::vertex_id_type. Vertex IDs need not be sequential. However, the
121  * ID corresponding to <code>(vertex_id_type)(-1)</code> is reserved. (This
122  * is the largest possible ID, corresponding to 0xFFFFFFFF when using 32-bit
123  * IDs).
124  *
125  * Edges are not numbered, but are uniquely identified by its source->target
126  * pair. In other words, there can only be two edges between any pair of
127  * vertices, the edge going in the forward direction, and the edge going in
128  * the backward direction.
129  *
130  * ### Construction
131  *
132  * The distributed graph can be constructed in two different ways. The
133  * first, and the preferred method, is to construct the graph from files
134  * located on a shared filesystem (NFS mounts for instance) , or from files
135  * on HDFS (HDFS support must be compiled).
136  *
137  * To construct from files, the load_format() function provides built-in
138  * parsers to construct the graph structure from various graph file formats
139  * on disk or HDFS. Alternatively, the
140  * \ref load(const std::string& path, line_parser_type line_parser) "load()"
141  * function provides generalized parsing capabilities
142  * allowing you to construct from your own defined file format.
143  * Alternatively, load_binary() may be used to perform an extremely rapid
144  * load of a graph previously saved with save_binary(). The caveat being that
145  * the number of machines used to save the graph must match the number of
146  * machines used to load the graph.
147  *
148  * The second construction strategy is to call the add_vertex() and
149  * add_edge() functions directly. These functions are parallel reentrant, and
150  * are also distributed. Each vertex and each edge should be added no more
151  * than once across all machines.
152  *
153  * add_vertex() calls are not strictly required since add_edge(i, j) will
154  * implicitly construct vertices i and j. The data on these vertices
155  * will be default constructed.
156  *
157  * ### Finalization
158  * After all vertices and edges are inserted into the graph
159  * via either load from file functions or direct calls to add_vertex() and
160  * add_edge(), for the graph to the useable, it must be finalized.
161  *
162  * This is performed by calling \code graph.finalize(); \endcode on all
163  * machines simultaneously. None of the load* functions perform finalization
164  * so multiple load operations could be performed (reading from different
165  * file groups) before finalization.
166  *
167  * The finalize() operation partitions the graph and synchronizes all
168  * internal graph datastructures. After this point, all graph computation
169  * operations such as engine, map_reduce and transform operations will
170  * function.
171  *
172  * ### Partitioning Strategies
173  *
174  * The graph is partitioned across the machines using a "vertex separator"
175  * strategy where edges are assigned to machines, while vertices may span
176  * multiple machines. There are three partitioning strategies implemented.
177  * These can be selected by setting --graph_opts="ingress=[partition_method]"
178  * on the command line.
179  * \li \c "random" The most naive and the fastest partitioner. Random places
180  * edges on machines.
181  * \li \c "oblivious" Runs at roughly half the speed of random. Machines
182  * indepedently partitions the segment of the graph it
183  * read. Improves partitioning quality and will reduce
184  * runtime memory consumption.
185  * \li \c "batch" Runs at roughly half the speed of oblivious. Machines
186  * cooperate in partitioning the graph. This obtains the
187  * highest quality partition, reducing runtime memory
188  * consumption significantly, at load-time penalty.
189  *
190  * \li \c "grid" Runs at rouphly the same speed of random. Randomly places
191  * edges on machines with a grid constraint.
192  * This obtains quality partition, close to oblivious,
193  * but currently only works with perfect square number of machines.
194  *
195  * \li \c "pds" Runs at roughly the speed of random. Randomly places
196  * edges on machines with a sparser constraint generated by
197  * perfect difference set. This obtains the close to batch
198  * highest quality partition, reducing runtime memory
199  * consumption significantly, without load-time penalty.
200  * Currently only works with p^2+p+1 number of machines (p prime).
201  *
202  * ### Referencing Vertices / Edges Many GraphLab operations will pass around
203  * vertex_type and edge_type objects. These objects are light-weight copyable
204  * opaque references to vertices and edges in the distributed graph. The
205  * vertex_type object provides capabilities such as:
206  * \li \c vertex_type::id() Returns the ID of the vertex
207  * \li \c vertex_type::num_in_edges() Returns the number of in edges
208  * \li \c vertex_type::num_out_edges() Returns the number of out edges
209  * \li \c vertex_type::data() Returns a <b>reference</b> to the data on
210  * the vertex
211  *
212  * No traversal operations are currently provided and there there is no
213  * single method to return a list of adjacent edges to the vertex.
214  *
215  * The edge_type object has similar capabilities:
216  * \li \c edge_type::data() Returns a <b>reference</b> to the data on the edge
217  * \li \c edge_type::source() Returns a \ref vertex_type of the source vertex
218  * \li \c edge_type::target() Returns a \ref vertex_type of the target vertex
219  *
220  * This permits the use of <code>edge.source().data()</code> for instance, to
221  * obtain the vertex data on the source vertex.
222  *
223  * See the documentation for \ref vertex_type and \ref edge_type for further
224  * details.
225  *
226  * Due to the distributed nature of the graph, There is at the moment, no way
227  * to obtain a reference to arbitrary vertices or edges. The only way to
228  * obtain a reference to vertices or edges, is if one is passed to you via a
229  * callback (for instance in map_reduce_vertices() / map_reduce_edges() or in
230  * an update function). To manipulate the graph at a more fine-grained level
231  * will require a more intimate understanding of the underlying distributed
232  * graph representation.
233  *
234  * ### Saving the graph
235  * After computation is complete, the graph structure can be saved
236  * via save_format() which provides built-in writers to write various
237  * graph formats to disk or HDFS. Alternatively,
238  * \ref save(const std::string& prefix, writer writer, bool gzip, bool save_vertex, bool save_edge, size_t files_per_machine) "save()"
239  * provides generalized writing capabilities allowing you to write
240  * your own graph output to disk or HDFS.
241  *
242  * ### Distributed Representation
243  * The graph is partitioned over machines using vertex separators.
244  * In other words, each edge is assigned to a unique machine while
245  * vertices are allowed to span multiple machines.
246  *
247  * The image below demonstrates the procedure. The example graph
248  * on the left is to be separated among 4 machines where the cuts
249  * are denoted by the dotted red lines. After partitioning,
250  * (the image on the right), each vertex along the cut
251  * is now separated among multiple machines. For instance, the
252  * central vertex spans 4 different machines.
253  *
254  * \image html partition_fig.gif
255  *
256  * Each vertex which span multiple machines, has a <b>master</b>
257  * machine (a black vertex), and all other instances of the vertex
258  * are called <b>mirrors</b>. For instance, we observe that the central
259  * vertex spans 4 machines, where machine 3 holds the <b>master</b>
260  * copy, while all remaining machines hold <b>mirrored</b> copies.
261  *
262  * This concept of vertex separators allow us to easily manage large
263  * power-law graphs where vertices may have extremely high degrees,
264  * since the adjacency information for even the high degree vertices
265  * can be separated across multiple machines.
266  *
267  * ### Internal Representation
268  * \warning This is only useful if you plan to make use of the graph
269  * in ways which exceed the provided abstractions.
270  *
271  * Each machine maintains its local section of the graph in a
272  * graphlab::local_graph object. The local_graph object assigns
273  * each vertex a sequential vertex ID called the local vertex ID.
274  * A hash table is used to provide a mapping between the local vertex IDs
275  * and their corresponding global vertex IDs. Additionally, each local
276  * vertex is associated with a \ref vertex_record which provides information
277  * about global ID of the vertex, the machine which holds the master instance
278  * of the vertex, as well as a list of all machines holding a mirror
279  * of the vertex.
280  *
281  * To support traversal of the local graph, two additional types, the
282  * \ref local_vertex_type and the \ref local_edge_type is provided which
283  * provide references to vertices and edges on the local graph. These behave
284  * similarly to the \ref vertex_type and \ref edge_type types and have
285  * similar functionality. However, since these reference the local graph,
286  * there is substantially more flexility. In particular, the function
287  * l_vertex() may be used to obtain a reference to a local vertex from a
288  * local vertex ID. Also unlike the \ref vertex_type , the \ref
289  * local_vertex_type support traversal operations such as returning a list of
290  * all in_edges (local_vertex_type::in_edges()). However, the list only
291  * contains the edges which are local to the current machine. See
292  * \ref local_vertex_type and \ref local_edge_type for more details.
293  *
294  *
295  * \tparam VertexData Type of data stored on vertices. Must be
296  * Copyable, Default Constructable, Copy
297  * Constructable and \ref sec_serializable.
298  * \tparam EdgeData Type of data stored on edges. Must be
299  * Copyable, Default Constructable, Copy
300  * Constructable and \ref sec_serializable.
301  */
302  template<typename VertexData, typename EdgeData>
304  public:
305 
306  /// The type of the vertex data stored in the graph
307  typedef VertexData vertex_data_type;
308 
309  /**
310  * \cond GRAPHLAB_INTERNAL
311  * \brief GraphLab Requires that vertex data be default
312  * constructible. That is you must have a public member:
313  *
314  * \code
315  * class vertex_data {
316  * public:
317  * vertex_data() { }
318  * }; //
319  * \endcode
320  */
321  BOOST_CONCEPT_ASSERT((boost::DefaultConstructible<VertexData>));
322  /// \endcond
323 
324  /**
325  * \cond GRAPHLAB_INTERNAL
326  * \brief GraphLab Requires that vertex data be default
327  * Serializable.
328  */
329  BOOST_CONCEPT_ASSERT((graphlab::Serializable<VertexData>));
330  /// \endcond
331 
332 
333  /// The type of the edge data stored in the graph
334  typedef EdgeData edge_data_type;
335 
336  /**
337  * \cond GRAPHLAB_INTERNAL
338  * \brief GraphLab Requires that edge data be default
339  * constructible. That is you must have a public member:
340  *
341  * \code
342  * class edge_data {
343  * public:
344  * edge_data() { }
345  * }; //
346  * \endcode
347  */
348  BOOST_CONCEPT_ASSERT((boost::DefaultConstructible<EdgeData>));
349  /// \endcond
350 
351  /**
352  * \cond GRAPHLAB_INTERNAL
353  * \brief GraphLab Requires that edge data be default
354  * Serializable.
355  */
356  BOOST_CONCEPT_ASSERT((graphlab::Serializable<EdgeData>));
357  /// \endcond
358 
359 
360  /**
361  The line parse is any function (or functor) that has the form:
362 
363  <code>
364  bool line_parser(distributed_graph& graph, const std::string& filename,
365  const std::string& textline);
366  </code>
367 
368  the line parser returns true if the line is parsed successfully and
369  calls graph.add_vertex(...) or graph.add_edge(...)
370 
371  See \ref graphlab::distributed_graph::load(std::string path, line_parser_type line_parser) "load()"
372  for details.
373  */
374  typedef boost::function<bool(distributed_graph&, const std::string&,
375  const std::string&)> line_parser_type;
376 
377 
379 
380  /// The type of the local graph used to store the graph data
381  typedef graphlab::local_graph<VertexData, EdgeData> local_graph_type;
383 
384  friend class distributed_ingress_base<VertexData, EdgeData>;
385  friend class distributed_random_ingress<VertexData, EdgeData>;
386  friend class distributed_identity_ingress<VertexData, EdgeData>;
387  friend class distributed_batch_ingress<VertexData, EdgeData>;
388  friend class distributed_oblivious_ingress<VertexData, EdgeData>;
389  friend class distributed_constrained_random_ingress<VertexData, EdgeData>;
390  friend class json_parser<VertexData, EdgeData>;
391 
392  typedef graphlab::vertex_id_type vertex_id_type;
393  typedef graphlab::lvid_type lvid_type;
394  typedef graphlab::edge_id_type edge_id_type;
395 
396  struct vertex_type;
397  typedef bool edge_list_type;
398  class edge_type;
399 
400  struct local_vertex_type;
401  struct local_edge_list_type;
402  class local_edge_type;
403 
404  /**
405  * \brief Vertex object which provides access to the vertex data
406  * and information about the vertex.
407  *
408  * The vertex_type object may be copied and has very little internal
409  * state. It behaves as a reference to location of the vertex
410  * in the internal graph representation. While vertex_type may be copied
411  * it must not outlive the underlying graph.
412  */
413  struct vertex_type {
414  distributed_graph& graph_ref;
415  lvid_type lvid;
416 
417  /// \cond GRAPHLAB_INTERNAL
418  /** \brief Constructs a vertex_type object with local vid
419  * lvid. This function should not be used directly. Use
420  * distributed_graph::vertex() or distributed_graph::l_vertex()
421  *
422  * \param graph_ref A reference to the parent graph object
423  * \param lvid The local VID of the vertex to be accessed
424  */
425  vertex_type(distributed_graph& graph_ref, lvid_type lvid):
426  graph_ref(graph_ref), lvid(lvid) { }
427  /// \endcond
428 
429  /// \brief Compares two vertex_type's for equality. Returns true
430  // if they reference the same vertex and false otherwise.
431  bool operator==(vertex_type& v) const {
432  return lvid == v.lvid;
433  }
434 
435  /// \brief Returns a constant reference to the data on the vertex
436  const vertex_data_type& data() const {
437  return graph_ref.get_local_graph().vertex_data(lvid);
438  }
439 
440  /// \brief Returns a mutable reference to the data on the vertex
442  return graph_ref.get_local_graph().vertex_data(lvid);
443  }
444 
445  /// \brief Returns the number of in edges of the vertex
446  size_t num_in_edges() const {
447  return graph_ref.l_get_vertex_record(lvid).num_in_edges;
448  }
449 
450  /// \brief Returns the number of out edges of the vertex
451  size_t num_out_edges() const {
452  return graph_ref.l_get_vertex_record(lvid).num_out_edges;
453  }
454 
455  /// \brief Returns the vertex ID of the vertex
456  vertex_id_type id() const {
457  return graph_ref.global_vid(lvid);
458  }
459 
460  /// \cond GRAPHLAB_INTERNAL
461 
462  /// \brief Returns a list of in edges (not implemented)
463  edge_list_type in_edges() __attribute__ ((noreturn)) {
464  ASSERT_TRUE(false);
465  }
466 
467  /// \brief Returns a list of out edges (not implemented)
468  edge_list_type out_edges() __attribute__ ((noreturn)) {
469  ASSERT_TRUE(false);
470  }
471  /// \endcond
472 
473  /**
474  * \brief Returns the local ID of the vertex
475  */
476  lvid_type local_id() const {
477  return lvid;
478  }
479 
480  };
481 
482 
483  /**
484  * \brief The edge represents an edge in the graph and provide
485  * access to the data associated with that edge as well as the
486  * source and target distributed::vertex_type objects.
487  *
488  * An edge object may be copied and has very little internal state
489  * and essentially only a reference to the location of the edge
490  * information in the underlying graph. Therefore edge objects
491  * can be copied but must not outlive the underlying graph.
492  */
493  class edge_type {
494  private:
495  /** \brief An internal reference to the underlying graph */
496  distributed_graph& graph_ref;
497  /** \brief The edge in the local graph */
498  typename local_graph_type::edge_type edge;
499 
500  /**
501  * \internal
502  * \brief Constructs a edge_type object from a edge_type
503  * object of the graphlab::local_graph.
504  * lvid. This function should not be used directly.
505  *
506  * \param graph_ref A reference to the parent graph object
507  * \param edge The local graph's edge_type to access
508  */
509  edge_type(distributed_graph& graph_ref,
510  typename local_graph_type::edge_type edge):
511  graph_ref(graph_ref), edge(edge) { }
512  friend class distributed_graph;
513  public:
514 
515  /**
516  * \brief Returns the source vertex of the edge.
517  * This function returns a vertex_object by value and as a
518  * consequence it is possible to use the resulting vertex object
519  * to access and *modify* the associated vertex data.
520  *
521  * Modification of vertex data obtained through an edge object
522  * is *usually not safe* and can lead to data corruption.
523  *
524  * \return The vertex object representing the source vertex.
525  */
526  vertex_type source() const {
527  return vertex_type(graph_ref, edge.source().id());
528  }
529 
530  /**
531  * \brief Returns the target vertex of the edge.
532  *
533  * This function returns a vertex_object by value and as a
534  * consequence it is possible to use the resulting vertex object
535  * to access and *modify* the associated vertex data.
536  *
537  * Modification of vertex data obtained through an edge object
538  * is *usually not safe* and can lead to data corruption.
539  *
540  * \return The vertex object representing the target vertex.
541  */
542  vertex_type target() const {
543  return vertex_type(graph_ref, edge.target().id());
544  }
545 
546  /**
547  * \brief Returns a constant reference to the data on the edge
548  */
549  const edge_data_type& data() const { return edge.data(); }
550 
551  /**
552  * \brief Returns a mutable reference to the data on the edge
553  */
554  edge_data_type& data() { return edge.data(); }
555 
556  }; // end of edge_type
557 
558 
559 
560 
561 
562  // CONSTRUCTORS ==========================================================>
563 
564  /**
565  * Constructs a distributed graph. All machines must call this constructor
566  * simultaneously.
567  *
568  * Value graph options are:
569  * \li \c ingress The graph partitioning method to use. May be "random"
570  * "oblivious" or "batch". The methods are in increasing
571  * complexity. "random" is the simplest and produces the
572  * worst partitions, while "batch" takes longer, but produces
573  * a significantly better result. Improved partitioning
574  * has direct impacts on GraphLab runtime performance.
575  * \li \c userecent An optimization that can decrease memory utilization
576  * of oblivious and batch quite significantly (especially
577  * when there are a large number of machines) at a small
578  * partitioning penalty. Defaults to 0. Set to 1 to
579  * enable.
580  * \li \c bufsize The batch size used by the batch ingress method.
581  * Defaults to 50,000. Increasing this number will
582  * decrease partitioning time with a penalty to partitioning
583  * quality.
584  *
585  * \param [in] dc Distributed controller to associate with
586  * \param [in] opts A graphlab::graphlab_options object specifying engine
587  * parameters. This is typically constructed using
588  * \ref graphlab::command_line_options.
589  */
591  const graphlab_options& opts = graphlab_options()) :
592  rpc(dc, this), finalized(false), vid2lvid(-1),
593  nverts(0), nedges(0), local_own_nverts(0), nreplicas(0),
594  ingress_ptr(NULL), vertex_exchange(dc), vset_exchange(dc), parallel_ingress(true) {
595  rpc.barrier();
596  set_options(opts);
597  }
598 
599  private:
600  void set_options(const graphlab_options& opts) {
601  size_t bufsize = 50000;
602  bool usehash = false;
603  bool userecent = false;
604  std::string ingress_method = "random";
605  std::vector<std::string> keys = opts.get_graph_args().get_option_keys();
606  foreach(std::string opt, keys) {
607  if (opt == "ingress") {
608  opts.get_graph_args().get_option("ingress", ingress_method);
609  if (rpc.procid() == 0)
610  logstream(LOG_EMPH) << "Graph Option: ingress = "
611  << ingress_method << std::endl;
612  } else if (opt == "bufsize") {
613  opts.get_graph_args().get_option("bufsize", bufsize);
614  if (rpc.procid() == 0)
615  logstream(LOG_EMPH) << "Graph Option: bufsize = "
616  << bufsize << std::endl;
617  } else if (opt == "usehash") {
618  opts.get_graph_args().get_option("usehash", usehash);
619  if (rpc.procid() == 0)
620  logstream(LOG_EMPH) << "Graph Option: usehash = "
621  << usehash << std::endl;
622  } else if (opt == "userecent") {
623  opts.get_graph_args().get_option("userecent", userecent);
624  if (rpc.procid() == 0)
625  logstream(LOG_EMPH) << "Graph Option: userecent = "
626  << userecent << std::endl;
627  } else if (opt == "parallel_ingress") {
628  opts.get_graph_args().get_option("parallel_ingress", parallel_ingress);
629  if (!parallel_ingress && rpc.procid() == 0)
630  logstream(LOG_EMPH) << "Disable parallel ingress. Graph will be streamed through one node."
631  << std::endl;
632  } else {
633  logstream(LOG_ERROR) << "Unexpected Graph Option: " << opt << std::endl;
634  }
635  }
636  set_ingress_method(ingress_method, bufsize, usehash, userecent);
637  }
638 
639  public:
640 
641 
642 
643  // METHODS ===============================================================>
644  /**
645  * \brief Commits the graph structure. Once a graph is finalized it may
646  * no longer be modified. Must be called on all machines simultaneously.
647  *
648  * Finalize is used to complete graph ingress by resolving vertex
649  * ownship and completing local data structures. Once a graph is finalized
650  * its structure may not be modified. Repeated calls to finalize() do
651  * nothing.
652  */
653  void finalize() {
654  if (finalized) return;
655  ASSERT_NE(ingress_ptr, NULL);
656  logstream(LOG_INFO) << "Distributed graph: enter finalize" << std::endl;
657  ingress_ptr->finalize();
658  rpc.barrier(); delete ingress_ptr; ingress_ptr = NULL;
659  finalized = true;
660  }
661 
662  /// \brief Returns true if the graph is finalized.
663  bool is_finalized() {
664  return finalized;
665  }
666 
667  /** \brief Get the number of vertices */
668  size_t num_vertices() const { return nverts; }
669 
670  /** \brief Get the number of edges */
671  size_t num_edges() const { return nedges; }
672 
673  /** \brief converts a vertex ID to a vertex object. This function should
674  * not be used without a deep understanding of the distributed graph
675  * representation.
676  *
677  * This functions converts a global vertex ID to a vertex_type object.
678  * The global vertex ID must exist on this machine or assertion failures
679  * will be produced.
680  */
681  vertex_type vertex(vertex_id_type vid) {
682  return vertex_type(*this, local_vid(vid));
683  }
684 
685  /// \cond GRAPHLAB_INTERNAL
686  /** \brief Get a list of all in edges of a given vertex ID. Not Implemented */
687  edge_list_type in_edges(const vertex_id_type vid) const
688  __attribute__((noreturn)) {
689  // Not implemented.
690  logstream(LOG_WARNING) << "in_edges not implemented. " << std::endl;
691  ASSERT_TRUE(false);
692  }
693 
694  /** Get a list of all out edges of a given vertex ID. Not Implemented */
695  edge_list_type out_edges(const vertex_id_type vid) const
696  __attribute__((noreturn)) {
697  // Not implemented.
698  logstream(LOG_WARNING) << "in_edges not implemented. " << std::endl;
699  ASSERT_TRUE(false);
700  }
701  /// \endcond
702 
703 
704 
705  /**
706  * \brief Returns the number of in edges of a given global vertex ID. This
707  * function should not be used without a deep understanding of the
708  * distributed graph representation.
709  *
710  * Returns the number of in edges of a given vertex ID. Equivalent to
711  * vertex(vid).num_in_edges(). The global vertex ID must exist on this
712  * machine or assertion failures will be produced.
713  */
714  size_t num_in_edges(const vertex_id_type vid) const {
715  return get_vertex_record(vid).num_in_edges;
716  }
717 
718 
719  /**
720  * \brief Returns the number of out edges of a given global vertex ID. This
721  * function should not be used without a deep understanding of the
722  * distributed graph representation.
723  *
724  * Returns the number of out edges of a given vertex ID. Equivalent to
725  * vertex(vid).num_out_edges(). The global vertex ID must exist on this
726  * machine or assertion failures will be produced.
727  */
728  size_t num_out_edges(const vertex_id_type vid) const {
729  return get_vertex_record(vid).num_out_edges;
730  }
731 
732  /**
733  * \brief Creates a vertex containing the vertex data.
734  *
735  * Creates a vertex with a particular vertex ID and containing a
736  * particular vertex data. Vertex IDs need not be sequential, and
737  * may arbitrarily span the unsigned integer range of vertex_id_type
738  * with the exception of (vertex_id_type)(-1), or corresponding to
739  * 0xFFFFFFFF on 32-bit vertex IDs.
740  *
741  * This function is parallel and distributed. i.e. It does not matter which
742  * machine, or which thread on which machines calls add_vertex() for a
743  * particular ID.
744  *
745  * However, each vertex may only be added exactly once.
746  */
747  void add_vertex(const vertex_id_type& vid,
748  const VertexData& vdata = VertexData() ) {
749  if(finalized) {
750  logstream(LOG_FATAL)
751  << "\n\tAttempting to add a vertex to a finalized graph."
752  << "\n\tVertices cannot be added to a graph after finalization."
753  << std::endl;
754  }
755  if(vid == vertex_id_type(-1)) {
756  logstream(LOG_FATAL)
757  << "\n\tAdding a vertex with id -1 is not allowed."
758  << "\n\tThe -1 vertex id is reserved for internal use."
759  << std::endl;
760  }
761  ASSERT_NE(ingress_ptr, NULL);
762  ingress_ptr->add_vertex(vid, vdata);
763  }
764 
765 
766  /**
767  * \brief Creates an edge connecting vertex source, and vertex target().
768  *
769  * Creates a edge connecting two vertex IDs.
770  *
771  * This function is parallel and distributed. i.e. It does not matter which
772  * machine, or which thread on which machines calls add_edge() for a
773  * particular ID.
774  *
775  * However, each edge direction may only be added exactly once. i.e.
776  * if edge 5->6 is added already, no other calls to add edge 5->6 should be
777  * made.
778  */
779  void add_edge(vertex_id_type source, vertex_id_type target,
780  const EdgeData& edata = EdgeData()) {
781  if(finalized) {
782  logstream(LOG_FATAL)
783  << "\n\tAttempting to add an edge to a finalized graph."
784  << "\n\tEdges cannot be added to a graph after finalization."
785  << std::endl;
786  }
787  if(source == vertex_id_type(-1)) {
788  logstream(LOG_FATAL)
789  << "\n\tThe source vertex with id vertex_id_type(-1)\n"
790  << "\tor unsigned value " << vertex_id_type(-1) << " in edge \n"
791  << "\t(" << source << "->" << target << ") is not allowed.\n"
792  << "\tThe -1 vertex id is reserved for internal use."
793  << std::endl;
794  }
795  if(target == vertex_id_type(-1)) {
796  logstream(LOG_FATAL)
797  << "\n\tThe target vertex with id vertex_id_type(-1)\n"
798  << "\tor unsigned value " << vertex_id_type(-1) << " in edge \n"
799  << "\t(" << source << "->" << target << ") is not allowed.\n"
800  << "\tThe -1 vertex id is reserved for internal use."
801  << std::endl;
802  }
803  if(source == target) {
804  logstream(LOG_FATAL)
805  << "\n\tTrying to add self edge (" << source << "->" << target << ")."
806  << "\n\tSelf edges are not allowed."
807  << std::endl;
808  }
809  ASSERT_NE(ingress_ptr, NULL);
810 
811  ingress_ptr->add_edge(source, target, edata);
812  }
813 
814 
815  /**
816  * \brief Performs a map-reduce operation on each vertex in the
817  * graph returning the result.
818  *
819  * Given a map function, map_reduce_vertices() call the map function on all
820  * vertices in the graph. The return values are then summed together and the
821  * final result returned. The map function should only read the vertex data
822  * and should not make any modifications. map_reduce_vertices() must be
823  * called on all machines simultaneously.
824  *
825  * ### Basic Usage
826  * For instance, if the graph has float vertex data, and float edge data:
827  * \code
828  * typedef graphlab::distributed_graph<float, float> graph_type;
829  * \endcode
830  *
831  * To compute an absolute sum over all the vertex data, we would write
832  * a function which reads in each a vertex, and returns the absolute
833  * value of the data on the vertex.
834  * \code
835  * float absolute_vertex_data(const graph_type::vertex_type& vertex) {
836  * return std::fabs(vertex.data());
837  * }
838  * \endcode
839  * After which calling:
840  * \code
841  * float sum = graph.map_reduce_vertices<float>(absolute_vertex_data);
842  * \endcode
843  * will call the <code>absolute_vertex_data()</code> function
844  * on each vertex in the graph. <code>absolute_vertex_data()</code>
845  * reads the value of the vertex and returns the absolute result.
846  * This return values are then summed together and returned.
847  * All machines see the same result.
848  *
849  * The template argument <code><float></code> is needed to inform
850  * the compiler regarding the return type of the mapfunction.
851  *
852  * The optional argument vset can be used to restrict he set of vertices
853  * map-reduced over.
854  *
855  * ### Relations
856  * This function is similar to
857  * graphlab::iengine::map_reduce_vertices()
858  * with the difference that this does not take a context
859  * and thus cannot influence engine signalling.
860  * transform_vertices() can be used to perform a similar
861  * but may also make modifications to graph data.
862  *
863  * \tparam ReductionType The output of the map function. Must have
864  * operator+= defined, and must be \ref sec_serializable.
865  * \tparam VertexMapperType The type of the map function.
866  * Not generally needed.
867  * Can be inferred by the compiler.
868  * \param mapfunction The map function to use. Must take
869  * a \ref vertex_type, or a reference to a
870  * \ref vertex_type as its only argument.
871  * Returns a ReductionType which must be summable
872  * and \ref sec_serializable .
873  * \param vset The set of vertices to map reduce over. Optional. Defaults to
874  * complete_set()
875  */
876  template <typename ReductionType, typename MapFunctionType>
877  ReductionType map_reduce_vertices(MapFunctionType mapfunction,
878  const vertex_set& vset = complete_set()) {
879  BOOST_CONCEPT_ASSERT((graphlab::Serializable<ReductionType>));
880  BOOST_CONCEPT_ASSERT((graphlab::OpPlusEq<ReductionType>));
881  if(!finalized) {
882  logstream(LOG_FATAL)
883  << "\n\tAttempting to run graph.map_reduce_vertices(...) "
884  << "\n\tbefore calling graph.finalize()."
885  << std::endl;
886  }
887 
888  rpc.barrier();
889  bool global_result_set = false;
890  ReductionType global_result = ReductionType();
891 #ifdef _OPENMP
892 #pragma omp parallel
893 #endif
894  {
895  bool result_set = false;
896  ReductionType result = ReductionType();
897 #ifdef _OPENMP
898  #pragma omp for
899 #endif
900  for (int i = 0; i < (int)local_graph.num_vertices(); ++i) {
901  if (lvid2record[i].owner == rpc.procid() &&
902  vset.l_contains((lvid_type)i)) {
903  if (!result_set) {
904  const vertex_type vtx(l_vertex(i));
905  result = mapfunction(vtx);
906  result_set = true;
907  }
908  else if (result_set){
909  const vertex_type vtx(l_vertex(i));
910  const ReductionType tmp = mapfunction(vtx);
911  result += tmp;
912  }
913  }
914  }
915 #ifdef _OPENMP
916  #pragma omp critical
917 #endif
918  {
919  if (result_set) {
920  if (!global_result_set) {
921  global_result = result;
922  global_result_set = true;
923  }
924  else {
925  global_result += result;
926  }
927  }
928  }
929  }
930  conditional_addition_wrapper<ReductionType>
931  wrapper(global_result, global_result_set);
932  rpc.all_reduce(wrapper);
933  return wrapper.value;
934  } // end of map_reduce_vertices
935 
936  /**
937  * \brief Performs a map-reduce operation on each edge in the
938  * graph returning the result.
939  *
940  * Given a map function, map_reduce_edges() call the map function on all
941  * edges in the graph. The return values are then summed together and the
942  * final result returned. The map function should only read data
943  * and should not make any modifications. map_reduce_edges() must be
944  * called on all machines simultaneously.
945  *
946  * ### Basic Usage
947  * For instance, if the graph has float vertex data, and float edge data:
948  * \code
949  * typedef graphlab::distributed_graph<float, float> graph_type;
950  * \endcode
951  *
952  * To compute an absolute sum over all the edge data, we would write
953  * a function which reads in each a edge, and returns the absolute
954  * value of the data on the edge.
955  * \code
956  * float absolute_edge_datac(const graph_type::edge_type& edge) {
957  * return std::fabs(edge.data());
958  * }
959  * \endcode
960  * After which calling:
961  * \code
962  * float sum = graph.map_reduce_edges<float>(absolute_edge_data);
963  * \endcode
964  * will call the <code>absolute_edge_data()</code> function
965  * on each edge in the graph. <code>absolute_edge_data()</code>
966  * reads the value of the edge and returns the absolute result.
967  * This return values are then summed together and returned.
968  * All machines see the same result.
969  *
970  * The template argument <code><float></code> is needed to inform
971  * the compiler regarding the return type of the mapfunction.
972  *
973  * The two optional arguments vset and edir can be used to restrict the
974  * set of edges which are map-reduced over.
975  *
976  * ### Relations
977  * This function similar to
978  * graphlab::distributed_graph::map_reduce_edges()
979  * with the difference that this does not take a context
980  * and thus cannot influence engine signalling.
981  * Finally transform_edges() can be used to perform a similar
982  * but may also make modifications to graph data.
983  *
984  * \tparam ReductionType The output of the map function. Must have
985  * operator+= defined, and must be \ref sec_serializable.
986  * \tparam EdgeMapperType The type of the map function.
987  * Not generally needed.
988  * Can be inferred by the compiler.
989  * \param mapfunction The map function to use. Must take
990  * a \ref edge_type, or a reference to a
991  * \ref edge_type as its only argument.
992  * Returns a ReductionType which must be summable
993  * and \ref sec_serializable .
994  * \param vset A set of vertices. Combines with
995  * edir to identify the set of edges. For instance, if
996  * edir == IN_EDGES, map_reduce_edges will map over all in edges
997  * of the vertices in vset. Optional. Defaults to complete_set().
998  * \param edir An edge direction. Combines with vset to identify the set
999  * of edges to map over. For instance, if
1000  * edir == IN_EDGES, map_reduce_edges will map over all in edges
1001  * of the vertices in vset. Optional. Defaults to IN_EDGES.
1002  */
1003  template <typename ReductionType, typename MapFunctionType>
1004  ReductionType map_reduce_edges(MapFunctionType mapfunction,
1005  const vertex_set& vset = complete_set(),
1006  edge_dir_type edir = IN_EDGES) {
1007  BOOST_CONCEPT_ASSERT((graphlab::Serializable<ReductionType>));
1008  BOOST_CONCEPT_ASSERT((graphlab::OpPlusEq<ReductionType>));
1009  if(!finalized) {
1010  logstream(LOG_FATAL)
1011  << "\n\tAttempting to run graph.map_reduce_vertices(...)"
1012  << "\n\tbefore calling graph.finalize()."
1013  << std::endl;
1014  }
1015 
1016  rpc.barrier();
1017  bool global_result_set = false;
1018  ReductionType global_result = ReductionType();
1019 #ifdef _OPENMP
1020 #pragma omp parallel
1021 #endif
1022  {
1023  bool result_set = false;
1024  ReductionType result = ReductionType();
1025 #ifdef _OPENMP
1026  #pragma omp for
1027 #endif
1028  for (int i = 0; i < (int)local_graph.num_vertices(); ++i) {
1029  if (vset.l_contains((lvid_type)i)) {
1030  if (edir == IN_EDGES || edir == ALL_EDGES) {
1031  foreach(const local_edge_type& e, l_vertex(i).in_edges()) {
1032  if (!result_set) {
1033  edge_type edge(e);
1034  result = mapfunction(edge);
1035  result_set = true;
1036  }
1037  else if (result_set){
1038  edge_type edge(e);
1039  const ReductionType tmp = mapfunction(edge);
1040  result += tmp;
1041  }
1042  }
1043  }
1044  if (edir == OUT_EDGES || edir == ALL_EDGES) {
1045  foreach(const local_edge_type& e, l_vertex(i).out_edges()) {
1046  if (!result_set) {
1047  edge_type edge(e);
1048  result = mapfunction(edge);
1049  result_set = true;
1050  }
1051  else if (result_set){
1052  edge_type edge(e);
1053  const ReductionType tmp = mapfunction(edge);
1054  result += tmp;
1055  }
1056  }
1057  }
1058  }
1059  }
1060 #ifdef _OPENMP
1061  #pragma omp critical
1062 #endif
1063  {
1064  if (result_set) {
1065  if (!global_result_set) {
1066  global_result = result;
1067  global_result_set = true;
1068  }
1069  else {
1070  global_result += result;
1071  }
1072  }
1073  }
1074  }
1075 
1076  conditional_addition_wrapper<ReductionType>
1077  wrapper(global_result, global_result_set);
1078  rpc.all_reduce(wrapper);
1079  return wrapper.value;
1080  } // end of map_reduce_edges
1081 
1082  /**
1083  * \brief Performs a transformation operation on each vertex in the graph.
1084  *
1085  * Given a mapfunction, transform_vertices() calls mapfunction on
1086  * every vertex in graph. The map function may make modifications
1087  * to the data on the vertex. transform_vertices() must be called by all
1088  * machines simultaneously.
1089  *
1090  * The optional vset argument may be used to restrict the set of vertices
1091  * operated upon.
1092  *
1093  * ### Basic Usage
1094  * For instance, if the graph has integer vertex data, and integer edge
1095  * data:
1096  * \code
1097  * typedef graphlab::distributed_graph<size_t, size_t> graph_type;
1098  * \endcode
1099  *
1100  * To set each vertex value to be the number of out-going edges,
1101  * we may write the following function:
1102  * \code
1103  * void set_vertex_value(graph_type::vertex_type& vertex)i {
1104  * vertex.data() = vertex.num_out_edges();
1105  * }
1106  * \endcode
1107  *
1108  * Calling transform_vertices():
1109  * \code
1110  * graph.transform_vertices(set_vertex_value);
1111  * \endcode
1112  * will run the <code>set_vertex_value()</code> function
1113  * on each vertex in the graph, setting its new value.
1114  *
1115  * ### Relations
1116  * map_reduce_vertices() provide similar signalling functionality,
1117  * but should not make modifications to graph data.
1118  * graphlab::iengine::transform_vertices() provide
1119  * the same graph modification capabilities, but with a context
1120  * and thus can perform signalling.
1121  *
1122  * \tparam VertexMapperType The type of the map function.
1123  * Not generally needed.
1124  * Can be inferred by the compiler.
1125  * \param mapfunction The map function to use. Must take an
1126  * \ref icontext_type& as its first argument, and
1127  * a \ref vertex_type, or a reference to a
1128  * \ref vertex_type as its second argument.
1129  * Returns void.
1130  * \param vset The set of vertices to transform. Optional. Defaults to
1131  * complete_set()
1132  */
1133  template <typename TransformType>
1134  void transform_vertices(TransformType transform_functor,
1135  const vertex_set vset = complete_set()) {
1136  if(!finalized) {
1137  logstream(LOG_FATAL)
1138  << "\n\tAttempting to call graph.transform_vertices(...)"
1139  << "\n\tbefore finalizing the graph."
1140  << std::endl;
1141  }
1142 
1143  rpc.barrier();
1144 #ifdef _OPENMP
1145  #pragma omp parallel for
1146 #endif
1147  for (int i = 0; i < (int)local_graph.num_vertices(); ++i) {
1148  if (lvid2record[i].owner == rpc.procid() &&
1149  vset.l_contains((lvid_type)i)) {
1150  vertex_type vtx(l_vertex(i));
1151  transform_functor(vtx);
1152  }
1153  }
1154  rpc.barrier();
1155  synchronize();
1156  }
1157 
1158  /**
1159  * \brief Performs a transformation operation on each edge in the graph.
1160  *
1161  * Given a mapfunction, transform_edges() calls mapfunction on
1162  * every edge in graph. The map function may make modifications
1163  * to the data on the edge. transform_edges() must be called on
1164  * all machines simultaneously.
1165  *
1166  * ### Basic Usage
1167  * For instance, if the graph has integer vertex data, and integer edge
1168  * data:
1169  * \code
1170  * typedef graphlab::distributed_graph<size_t, size_t> graph_type;
1171  * \endcode
1172  *
1173  * To set each edge value to be the number of out-going edges
1174  * of the target vertex, we may write the following:
1175  * \code
1176  * void set_edge_value(graph_type::edge_type& edge) {
1177  * edge.data() = edge.target().num_out_edges();
1178  * }
1179  * \endcode
1180  *
1181  * Calling transform_edges():
1182  * \code
1183  * graph.transform_edges(set_edge_value);
1184  * \endcode
1185  * will run the <code>set_edge_value()</code> function
1186  * on each edge in the graph, setting its new value.
1187  *
1188  * The two optional arguments vset and edir may be used to restrict
1189  * the set of edges operated upon.
1190  *
1191  * ### Relations
1192  * map_reduce_edges() provide similar signalling functionality,
1193  * but should not make modifications to graph data.
1194  * graphlab::iengine::transform_edges() provide
1195  * the same graph modification capabilities, but with a context
1196  * and thus can perform signalling.
1197  *
1198  * \tparam EdgeMapperType The type of the map function.
1199  * Not generally needed.
1200  * Can be inferred by the compiler.
1201  * \param mapfunction The map function to use. Must take an
1202  * \ref icontext_type& as its first argument, and
1203  * a \ref edge_type, or a reference to a
1204  * \ref edge_type as its second argument.
1205  * Returns void.
1206  * \param vset A set of vertices. Combines with
1207  * edir to identify the set of edges. For instance, if
1208  * edir == IN_EDGES, map_reduce_edges will map over all in edges
1209  * of the vertices in vset. Optional. Defaults to complete_set().
1210  * \param edir An edge direction. Combines with vset to identify the set
1211  * of edges to map over. For instance, if
1212  * edir == IN_EDGES, map_reduce_edges will map over all in edges
1213  * of the vertices in vset. Optional. Defaults to IN_EDGES.
1214  */
1215  template <typename TransformType>
1216  void transform_edges(TransformType transform_functor,
1217  const vertex_set& vset = complete_set(),
1218  edge_dir_type edir = IN_EDGES) {
1219  if(!finalized) {
1220  logstream(LOG_FATAL)
1221  << "\n\tAttempting to call graph.transform_edges(...)"
1222  << "\n\tbefore finalizing the graph."
1223  << std::endl;
1224  }
1225  rpc.barrier();
1226 #ifdef _OPENMP
1227  #pragma omp parallel for
1228 #endif
1229  for (int i = 0; i < (int)local_graph.num_vertices(); ++i) {
1230  if (vset.l_contains((lvid_type)i)) {
1231  if (edir == IN_EDGES || edir == ALL_EDGES) {
1232  foreach(const local_edge_type& e, l_vertex(i).in_edges()) {
1233  edge_type edge(e);
1234  transform_functor(edge);
1235  }
1236  }
1237  if (edir == OUT_EDGES || edir == ALL_EDGES) {
1238  foreach(const local_edge_type& e, l_vertex(i).out_edges()) {
1239  edge_type edge(e);
1240  transform_functor(edge);
1241  }
1242  }
1243  }
1244  }
1245  rpc.barrier();
1246  }
1247 
1248  // disable documentation for parallel_for stuff. These are difficult
1249  // to use properly by the user
1250  /// \cond GRAPHLAB_INTERNAL
1251  /**
1252  * \internal
1253  * parallel_for_vertices will partition the set of vertices among the
1254  * vector of accfunctions. Each accfunction is then executed sequentially
1255  * on the set of vertices it was assigned.
1256  *
1257  * \param accfunction must be a void function which takes a single
1258  * vertex_type argument. It may be a functor and contain state.
1259  * The function need not be reentrant as it is only called sequentially
1260  */
1261  template <typename VertexFunctorType>
1262  void parallel_for_vertices(std::vector<VertexFunctorType>& accfunction) {
1263  ASSERT_TRUE(finalized);
1264  rpc.barrier();
1265  int numaccfunctions = (int)accfunction.size();
1266  ASSERT_GE(numaccfunctions, 1);
1267 #ifdef _OPENMP
1268  #pragma omp parallel for
1269 #endif
1270  for (int i = 0; i < (int)accfunction.size(); ++i) {
1271  for (int j = i;j < (int)local_graph.num_vertices(); j+=numaccfunctions) {
1272  if (lvid2record[j].owner == rpc.procid()) {
1273  accfunction[i](vertex_type(l_vertex(j)));
1274  }
1275  }
1276  }
1277  rpc.barrier();
1278  }
1279 
1280 
1281  /**
1282  * \internal
1283  * parallel_for_edges will partition the set of edges among the
1284  * vector of accfunctions. Each accfunction is then executed sequentially
1285  * on the set of edges it was assigned.
1286  *
1287  * \param accfunction must be a void function which takes a single
1288  * edge_type argument. It may be a functor and contain state.
1289  * The function need not be reentrant as it is only called sequentially
1290  */
1291  template <typename EdgeFunctorType>
1292  void parallel_for_edges(std::vector<EdgeFunctorType>& accfunction) {
1293  ASSERT_TRUE(finalized);
1294  rpc.barrier();
1295  int numaccfunctions = (int)accfunction.size();
1296  ASSERT_GE(numaccfunctions, 1);
1297 #ifdef _OPENMP
1298  #pragma omp parallel for
1299 #endif
1300  for (int i = 0; i < (int)accfunction.size(); ++i) {
1301  for (int j = i;j < (int)local_graph.num_vertices(); j+=numaccfunctions) {
1302  foreach(const local_edge_type& e, l_vertex(j).in_edges()) {
1303  accfunction[i](edge_type(e));
1304  }
1305  }
1306  }
1307  rpc.barrier();
1308  }
1309 
1310 
1311 
1312  /** \brief Load the graph from an archive */
1313  void load(iarchive& arc) {
1314  // read the vertices
1315  arc >> nverts
1316  >> nedges
1317  >> local_own_nverts
1318  >> nreplicas
1319  >> begin_eid
1320  >> vid2lvid
1321  >> lvid2record
1322  >> local_graph;
1323  finalized = true;
1324  // check the graph condition
1325  } // end of load
1326 
1327 
1328 
1329  /** \brief Save the graph to an archive */
1330  void save(oarchive& arc) const {
1331  if(!finalized) {
1332  logstream(LOG_FATAL)
1333  << "\n\tAttempting to save a graph before calling graph.finalize()."
1334  << std::endl;
1335  }
1336  // Write the number of edges and vertices
1337  arc << nverts
1338  << nedges
1339  << local_own_nverts
1340  << nreplicas
1341  << begin_eid
1342  << vid2lvid
1343  << lvid2record
1344  << local_graph;
1345  } // end of save
1346 
1347  /// \endcond
1348 
1349  /// \brief Clears and resets the graph, releasing all memory used.
1350  void clear () {
1351  foreach (vertex_record& vrec, lvid2record)
1352  vrec.clear();
1353  lvid2record.clear();
1354  vid2lvid.clear();
1355  finalized=false;
1356  }
1357 
1358 
1359 
1360  /** \brief Load a distributed graph from a native binary format
1361  * previously saved with save_binary(). This function must be called
1362  * simultaneously on all machines.
1363  *
1364  * This function loads a sequence of files numbered
1365  * \li [prefix].0.gz
1366  * \li [prefix].1.gz
1367  * \li [prefix].2.gz
1368  * \li etc.
1369  *
1370  * These files must be previously saved using save_binary(), and
1371  * must be saved <b>using the same number of machines</b>.
1372  * This function uses the graphlab serialization system, so
1373  * the user must ensure that the vertex data and edge data
1374  * serialization formats have not changed since the graph was saved.
1375  *
1376  * A graph loaded using load_binary() is already finalized and
1377  * structure modifications are not permitted after loading.
1378  */
1379  void load_binary(const std::string& prefix) {
1380  rpc.full_barrier();
1381  std::string fname = prefix + tostr(rpc.procid()) + ".bin";
1382 
1383  logstream(LOG_INFO) << "Load graph from " << fname << std::endl;
1384  if(boost::starts_with(fname, "hdfs://")) {
1385  graphlab::hdfs hdfs;
1386  graphlab::hdfs::fstream in_file(hdfs, fname);
1387  boost::iostreams::filtering_stream<boost::iostreams::input> fin;
1388  fin.push(boost::iostreams::gzip_decompressor());
1389  fin.push(in_file);
1390 
1391  if(!fin.good()) {
1392  logstream(LOG_FATAL) << "\n\tError opening file: " << fname << std::endl;
1393  exit(-1);
1394  }
1395  iarchive iarc(fin);
1396  iarc >> *this;
1397  fin.pop();
1398  fin.pop();
1399  in_file.close();
1400  } else {
1401  std::ifstream in_file(fname.c_str(),
1402  std::ios_base::in | std::ios_base::binary);
1403  boost::iostreams::filtering_stream<boost::iostreams::input> fin;
1404  fin.push(boost::iostreams::gzip_decompressor());
1405  fin.push(in_file);
1406  iarchive iarc(fin);
1407  iarc >> *this;
1408  fin.pop();
1409  fin.pop();
1410  in_file.close();
1411  }
1412  logstream(LOG_INFO) << "Finish loading graph from " << fname << std::endl;
1413  rpc.full_barrier();
1414  } // end of load
1415 
1416 
1417  /** \brief Saves a distributed graph to a native binary format
1418  * which can be loaded with load_binary(). This function must be called
1419  * simultaneously on all machines.
1420  *
1421  * This function saves a sequence of files numbered
1422  * \li [prefix].0.gz
1423  * \li [prefix].1.gz
1424  * \li [prefix].2.gz
1425  * \li etc.
1426  *
1427  * This files can be loaded with load_binary() using the <b> same number
1428  * of machines</b>.
1429  * This function uses the graphlab serialization system, so
1430  * the vertex data and edge data serialization formats must not
1431  * change between the use of save_binary() and load_binary().
1432  *
1433  * If the graph is not alreasy finalized before save_binary() is called,
1434  * this function will finalize the graph.
1435  */
1436  void save_binary(const std::string& prefix) {
1437  rpc.full_barrier();
1438  finalize();
1439  timer savetime; savetime.start();
1440  std::string fname = prefix + tostr(rpc.procid()) + ".bin";
1441  logstream(LOG_INFO) << "Save graph to " << fname << std::endl;
1442  if(boost::starts_with(fname, "hdfs://")) {
1443  graphlab::hdfs hdfs;
1444  graphlab::hdfs::fstream out_file(hdfs, fname, true);
1445  boost::iostreams::filtering_stream<boost::iostreams::output> fout;
1446  fout.push(boost::iostreams::gzip_compressor());
1447  fout.push(out_file);
1448  if (!fout.good()) {
1449  logstream(LOG_FATAL) << "\n\tError opening file: " << fname << std::endl;
1450  exit(-1);
1451  }
1452  oarchive oarc(fout);
1453  oarc << *this;
1454  fout.pop();
1455  fout.pop();
1456  out_file.close();
1457  } else {
1458  std::ofstream out_file(fname.c_str(),
1459  std::ios_base::out | std::ios_base::binary);
1460  if (!out_file.good()) {
1461  logstream(LOG_FATAL) << "\n\tError opening file: " << fname << std::endl;
1462  exit(-1);
1463  }
1464  boost::iostreams::filtering_stream<boost::iostreams::output> fout;
1465  fout.push(boost::iostreams::gzip_compressor());
1466  fout.push(out_file);
1467  oarchive oarc(fout);
1468  oarc << *this;
1469  fout.pop();
1470  fout.pop();
1471  out_file.close();
1472  }
1473  logstream(LOG_INFO) << "Finish saving graph to " << fname << std::endl
1474  << "Finished saving binary graph: "
1475  << savetime.current_time() << std::endl;
1476  rpc.full_barrier();
1477  } // end of save
1478 
1479 
1480  /**
1481  * \brief Saves the graph to the filesystem using a provided Writer object.
1482  * Like \ref save(const std::string& prefix, writer writer, bool gzip, bool save_vertex, bool save_edge, size_t files_per_machine) "save()"
1483  * but only saves to local filesystem.
1484  */
1485  template<typename Writer>
1486  void save_to_posixfs(const std::string& prefix, Writer writer,
1487  bool gzip = true,
1488  bool save_vertex = true,
1489  bool save_edge = true,
1490  size_t files_per_machine = 4) {
1491  typedef boost::function<void(vertex_type)> vertex_function_type;
1492  typedef boost::function<void(edge_type)> edge_function_type;
1493  typedef std::ofstream base_fstream_type;
1494  typedef boost::iostreams::filtering_stream<boost::iostreams::output>
1495  boost_fstream_type;
1496  rpc.full_barrier();
1497  finalize();
1498  // figure out the filenames
1499  std::vector<std::string> graph_files;
1500  std::vector<base_fstream_type*> outstreams;
1501  std::vector<boost_fstream_type*> booststreams;
1502  graph_files.resize(files_per_machine);
1503  for(size_t i = 0; i < files_per_machine; ++i) {
1504  graph_files[i] = prefix + "_" + tostr(1 + i + rpc.procid() * files_per_machine)
1505  + "_of_" + tostr(rpc.numprocs() * files_per_machine);
1506  if (gzip) graph_files[i] += ".gz";
1507  }
1508 
1509  // create the vector of callbacks
1510  std::vector<vertex_function_type> vertex_callbacks(graph_files.size());
1511  std::vector<edge_function_type> edge_callbacks(graph_files.size());
1512 
1513  for(size_t i = 0; i < graph_files.size(); ++i) {
1514  logstream(LOG_INFO) << "Saving to file: " << graph_files[i] << std::endl;
1515  // open the stream
1516  base_fstream_type* out_file =
1517  new base_fstream_type(graph_files[i].c_str(),
1518  std::ios_base::out | std::ios_base::binary);
1519  // attach gzip if the file is gzip
1520  boost_fstream_type* fout = new boost_fstream_type;
1521  // Using gzip filter
1522  if (gzip) fout->push(boost::iostreams::gzip_compressor());
1523  fout->push(*out_file);
1524 
1525  outstreams.push_back(out_file);
1526  booststreams.push_back(fout);
1527  // construct the callback for the parallel for
1529  vertex_callbacks[i] =
1530  boost::bind(&graph_type::template save_vertex_to_stream<boost_fstream_type, Writer>,
1531  this, _1, boost::ref(*fout), boost::ref(writer));
1532  edge_callbacks[i] =
1533  boost::bind(&graph_type::template save_edge_to_stream<boost_fstream_type, Writer>,
1534  this, _1, boost::ref(*fout), boost::ref(writer));
1535  }
1536 
1537  if (save_vertex) parallel_for_vertices(vertex_callbacks);
1538  if (save_edge) parallel_for_edges(edge_callbacks);
1539 
1540  // cleanup
1541  for(size_t i = 0; i < graph_files.size(); ++i) {
1542  booststreams[i]->pop();
1543  if (gzip) booststreams[i]->pop();
1544  delete booststreams[i];
1545  delete outstreams[i];
1546  }
1547  vertex_callbacks.clear();
1548  edge_callbacks.clear();
1549  outstreams.clear();
1550  booststreams.clear();
1551  rpc.full_barrier();
1552  } // end of save to posixfs
1553 
1554 
1555 
1556  /**
1557  * \brief Saves the graph to HDFS using a provided Writer object.
1558  * Like \ref save(const std::string& prefix, writer writer, bool gzip, bool save_vertex, bool save_edge, size_t files_per_machine) "save()"
1559  * but only saves to HDFS.
1560  */
1561  template<typename Writer>
1562  void save_to_hdfs(const std::string& prefix, Writer writer,
1563  bool gzip = true,
1564  bool save_vertex = true,
1565  bool save_edge = true,
1566  size_t files_per_machine = 4) {
1567  typedef boost::function<void(vertex_type)> vertex_function_type;
1568  typedef boost::function<void(edge_type)> edge_function_type;
1569  typedef graphlab::hdfs::fstream base_fstream_type;
1570  typedef boost::iostreams::filtering_stream<boost::iostreams::output>
1571  boost_fstream_type;
1572  rpc.full_barrier();
1573  finalize();
1574  // figure out the filenames
1575  std::vector<std::string> graph_files;
1576  std::vector<base_fstream_type*> outstreams;
1577  std::vector<boost_fstream_type*> booststreams;
1578  graph_files.resize(files_per_machine);
1579  for(size_t i = 0; i < files_per_machine; ++i) {
1580  graph_files[i] = prefix + "_" + tostr(1 + i + rpc.procid() * files_per_machine)
1581  + "_of_" + tostr(rpc.numprocs() * files_per_machine);
1582  if (gzip) graph_files[i] += ".gz";
1583  }
1584 
1585  if(!hdfs::has_hadoop()) {
1586  logstream(LOG_FATAL)
1587  << "\n\tAttempting to save a graph to HDFS but GraphLab"
1588  << "\n\twas built without HDFS."
1589  << std::endl;
1590  }
1591  hdfs& hdfs = hdfs::get_hdfs();
1592 
1593  // create the vector of callbacks
1594 
1595  std::vector<vertex_function_type> vertex_callbacks(graph_files.size());
1596  std::vector<edge_function_type> edge_callbacks(graph_files.size());
1597 
1598  for(size_t i = 0; i < graph_files.size(); ++i) {
1599  logstream(LOG_INFO) << "Saving to file: " << graph_files[i] << std::endl;
1600  // open the stream
1601  base_fstream_type* out_file = new base_fstream_type(hdfs,
1602  graph_files[i],
1603  true);
1604  // attach gzip if the file is gzip
1605  boost_fstream_type* fout = new boost_fstream_type;
1606  // Using gzip filter
1607  if (gzip) fout->push(boost::iostreams::gzip_compressor());
1608  fout->push(*out_file);
1609 
1610  outstreams.push_back(out_file);
1611  booststreams.push_back(fout);
1612  // construct the callback for the parallel for
1614  vertex_callbacks[i] =
1615  boost::bind(&graph_type::template save_vertex_to_stream<boost_fstream_type, Writer>,
1616  this, _1, boost::ref(*fout), writer);
1617  edge_callbacks[i] =
1618  boost::bind(&graph_type::template save_edge_to_stream<boost_fstream_type, Writer>,
1619  this, _1, boost::ref(*fout), writer);
1620  }
1621 
1622  if (save_vertex) parallel_for_vertices(vertex_callbacks);
1623  if (save_edge) parallel_for_edges(edge_callbacks);
1624 
1625  // cleanup
1626  for(size_t i = 0; i < graph_files.size(); ++i) {
1627  booststreams[i]->pop();
1628  if (gzip) booststreams[i]->pop();
1629  delete booststreams[i];
1630  delete outstreams[i];
1631  }
1632  vertex_callbacks.clear();
1633  edge_callbacks.clear();
1634  outstreams.clear();
1635  booststreams.clear();
1636  rpc.full_barrier();
1637  } // end of save to hdfs
1638 
1639 
1640 
1641  /**
1642  * \brief Saves the graph to the filesystem or to HDFS using
1643  * a user provided Writer object. This function should be called on
1644  * all machines simultaneously.
1645  *
1646  * This function saves the current graph to disk using a user provided
1647  * Writer object. The writer object must implement two functions:
1648  * \code
1649  * std::string Writer::save_vertex(graph_type::vertex_type v);
1650  * std::string Writer::save_edge(graph_type::edge_type e);
1651  * \endcode
1652  *
1653  * The <code>save_vertex()</code> function will be called on each vertex
1654  * on the graph, and the output of the function is written to file.
1655  * Similarly, the <code>save_edge()</code> function is called on each edge
1656  * in the graph and the output written to file.
1657  *
1658  * For instance, a simple Writer object which saves a file containing
1659  * a list of edges will be:
1660  * \code
1661  * struct edge_list_writer {
1662  * std::string save_vertex(vertex_type) { return ""; }
1663  * std::string save_edge(edge_type e) {
1664  * char c[128];
1665  * sprintf(c, "%u\t%u\n", e.source().id(), e.target().id());
1666  * return c;
1667  * }
1668  * };
1669  * \endcode
1670  * The save_edge() function is called on each edge in the graph. It then
1671  * constructs a string containing "[source] \\t [target] \\n" and returns
1672  * the string.
1673  *
1674  * This can also be used to data in human readable format. For instance,
1675  * if the vertex data type is a floating point number (say a PageRank
1676  * value), to save a list of vertices and their corresponding PageRanks,
1677  * the following writer could be implemented:
1678  * \code
1679  * struct pagerank_writer {
1680  * std::string save_vertex(vertex_type v) {
1681  * char c[128];
1682  * sprintf(c, "%u\t%f\n", v.id(), v.data());
1683  * return c;
1684  * }
1685  * std::string save_edge(edge_type) {}
1686  * };
1687  * \endcode
1688  * \note Note that these is not an example a reliable parser since sprintf
1689  * may break if the size of vertex_id_type changes
1690  *
1691  * The output files will be written in
1692  * \li [prefix]_1_of_16.gz
1693  * \li [prefix]_2_of_16.gz
1694  * \li [prefix].3_of_16.gz
1695  * \li etc.
1696  *
1697  * To accelerate the saving process, multiple files are be written
1698  * per machine in parallel. If the gzip option is not set, the ".gz" suffix
1699  * is not added.
1700  *
1701  * For instance, if there are 4 machines, running:
1702  * \code
1703  * save("test_graph", pagerank_writer);
1704  * \endcode
1705  * Will create the files
1706  * \li test_graph_1_of_16.gz
1707  * \li test_graph_2_of_16.gz
1708  * \li ...
1709  * \li test_graph_16_of_16.gz
1710  *
1711  * If HDFS support is compiled in, this function can save to HDFS by
1712  * adding "hdfs://" to the prefix.
1713  *
1714  * For instance, if there are 4 machines, running:
1715  * \code
1716  * save("hdfs:///hdfs_server/data/test_graph", pagerank_writer);
1717  * \endcode
1718  * Will create on the HDFS server, the files
1719  * \li /data/test_graph_1_of_16.gz
1720  * \li /data/test_graph_2_of_16.gz
1721  * \li ...
1722  * \li /data/test_graph_16_of_16.gz
1723  *
1724  * \tparam Writer The writer object type. This is generally inferred by the
1725  * compiler and need not be specified.
1726  *
1727  * \param prefix The file prefix to save the output graph files. The output
1728  * files will be numbered [prefix].0 , [prefix].1 , etc.
1729  * If prefix begins with "hdfs://", the output is written to
1730  * HDFS
1731  * \param writer The writer object to use.
1732  * \param gzip If gzip compression should be used. If set, all files will be
1733  * appended with the .gz suffix. Defaults to true.
1734  * \param save_vertex If vertices should be saved. Defaults to true.
1735  * \param save_edges If edges should be saved. Defaults to true.
1736  * \param files_per_machine Number of files to write simultaneously in
1737  * parallel per machine. Defaults to 4.
1738  */
1739  template<typename Writer>
1740  void save(const std::string& prefix, Writer writer,
1741  bool gzip = true, bool save_vertex = true, bool save_edge = true,
1742  size_t files_per_machine = 4) {
1743  if(boost::starts_with(prefix, "hdfs://")) {
1744  save_to_hdfs(prefix, writer, gzip, save_vertex, save_edge, files_per_machine);
1745  } else {
1746  save_to_posixfs(prefix, writer, gzip, save_vertex, save_edge, files_per_machine);
1747  }
1748  } // end of save
1749 
1750 
1751 
1752  /**
1753  * \brief Saves the graph in the specified format. This function should be
1754  * called on all machines simultaneously.
1755  *
1756  * The output files will be written in
1757  * \li [prefix].0.gz
1758  * \li [prefix].1.gz
1759  * \li [prefix].2.gz
1760  * \li etc.
1761  *
1762  * To accelerate the saving process, multiple files are be written
1763  * per machine in parallel. If the gzip option is not set, the ".gz" suffix
1764  * is not added.
1765  *
1766  * For instance, if there are 4 machines, running:
1767  * \code
1768  * save_format("test_graph", "tsv");
1769  * \endcode
1770  * Will create the files
1771  * \li test_graph_0.gz
1772  * \li test_graph_1.gz
1773  * \li ...
1774  * \li test_graph_15.gz
1775  *
1776  * The supported formats are described in \ref graph_formats.
1777  *
1778  * \param prefix The file prefix to save the output graph files. The output
1779  * files will be numbered [prefix].0 , [prefix].1 , etc.
1780  * If prefix begins with "hdfs://", the output is written to
1781  * HDFS.
1782  * \param format The file format to save in.
1783  * Either "tsv", "snap", "graphjrl" or "bin".
1784  * \param gzip If gzip compression should be used. If set, all files will be
1785  * appended with the .gz suffix. Defaults to true. Ignored
1786  * if format == "bin".
1787  * \param files_per_machine Number of files to write simultaneously in
1788  * parallel per machine. Defaults to 4. Ignored if
1789  * format == "bin".
1790  */
1791  void save_format(const std::string& prefix, const std::string& format,
1792  bool gzip = true, size_t files_per_machine = 4) {
1793  if (format == "snap" || format == "tsv") {
1794  save(prefix, builtin_parsers::tsv_writer<distributed_graph>(),
1795  gzip, false, true, files_per_machine);
1796  } else if (format == "graphjrl") {
1797  save(prefix, builtin_parsers::graphjrl_writer<distributed_graph>(),
1798  gzip, true, true, files_per_machine);
1799  } else if (format == "bin") {
1800  save_binary(prefix);
1801  } else if (format == "bintsv4") {
1802  save_direct(prefix, gzip, &graph_type::save_bintsv4_to_stream);
1803  } else {
1804  logstream(LOG_FATAL)
1805  << "Unrecognized Format \"" << format << "\"!" << std::endl;
1806  return;
1807  }
1808  } // end of save structure
1809 
1810 
1811 
1812 
1813  /**
1814  * \brief Load a graph from a collection of files in stored on
1815  * the filesystem using the user defined line parser. Like
1816  * \ref load(const std::string& path, line_parser_type line_parser)
1817  * but only loads from the filesystem.
1818  */
1819  void load_from_posixfs(std::string prefix,
1820  line_parser_type line_parser) {
1821  std::string directory_name; std::string original_path(prefix);
1822  boost::filesystem::path path(prefix);
1823  std::string search_prefix;
1824  if (boost::filesystem::is_directory(path)) {
1825  // if this is a directory
1826  // force a "/" at the end of the path
1827  // make sure to check that the path is non-empty. (you do not
1828  // want to make the empty path "" the root path "/" )
1829  directory_name = path.native();
1830  }
1831  else {
1832  directory_name = path.parent_path().native();
1833  search_prefix = path.filename().native();
1834  directory_name = (directory_name.empty() ? "." : directory_name);
1835  }
1836  std::vector<std::string> graph_files;
1837  fs_util::list_files_with_prefix(directory_name, search_prefix, graph_files);
1838  if (graph_files.size() == 0) {
1839  logstream(LOG_WARNING) << "No files found matching " << original_path << std::endl;
1840  }
1841  for(size_t i = 0; i < graph_files.size(); ++i) {
1842  if ((parallel_ingress && (i % rpc.numprocs() == rpc.procid()))
1843  || (!parallel_ingress && (rpc.procid() == 0))) {
1844  logstream(LOG_EMPH) << "Loading graph from file: " << graph_files[i] << std::endl;
1845  // is it a gzip file ?
1846  const bool gzip = boost::ends_with(graph_files[i], ".gz");
1847  // open the stream
1848  std::ifstream in_file(graph_files[i].c_str(),
1849  std::ios_base::in | std::ios_base::binary);
1850  // attach gzip if the file is gzip
1851  boost::iostreams::filtering_stream<boost::iostreams::input> fin;
1852  // Using gzip filter
1853  if (gzip) fin.push(boost::iostreams::gzip_decompressor());
1854  fin.push(in_file);
1855  const bool success = load_from_stream(graph_files[i], fin, line_parser);
1856  if(!success) {
1857  logstream(LOG_FATAL)
1858  << "\n\tError parsing file: " << graph_files[i] << std::endl;
1859  }
1860  fin.pop();
1861  if (gzip) fin.pop();
1862  }
1863  }
1864  rpc.full_barrier();
1865  } // end of load from posixfs
1866 
1867  /**
1868  * \brief Load a graph from a collection of files in stored on
1869  * the HDFS using the user defined line parser. Like
1870  * \ref load(const std::string& path, line_parser_type line_parser)
1871  * but only loads from HDFS.
1872  */
1873  void load_from_hdfs(std::string prefix, line_parser_type line_parser) {
1874  // force a "/" at the end of the path
1875  // make sure to check that the path is non-empty. (you do not
1876  // want to make the empty path "" the root path "/" )
1877  std::string path = prefix;
1878  if (path.length() > 0 && path[path.length() - 1] != '/') path = path + "/";
1879  if(!hdfs::has_hadoop()) {
1880  logstream(LOG_FATAL)
1881  << "\n\tAttempting to load a graph from HDFS but GraphLab"
1882  << "\n\twas built without HDFS."
1883  << std::endl;
1884  }
1885  hdfs& hdfs = hdfs::get_hdfs();
1886  std::vector<std::string> graph_files;
1887  graph_files = hdfs.list_files(path);
1888  if (graph_files.size() == 0) {
1889  logstream(LOG_WARNING) << "No files found matching " << prefix << std::endl;
1890  }
1891  for(size_t i = 0; i < graph_files.size(); ++i) {
1892  if ((parallel_ingress && (i % rpc.numprocs() == rpc.procid())) ||
1893  (!parallel_ingress && (rpc.procid() == 0))) {
1894  logstream(LOG_EMPH) << "Loading graph from file: " << graph_files[i] << std::endl;
1895  // is it a gzip file ?
1896  const bool gzip = boost::ends_with(graph_files[i], ".gz");
1897  // open the stream
1898  graphlab::hdfs::fstream in_file(hdfs, graph_files[i]);
1899  boost::iostreams::filtering_stream<boost::iostreams::input> fin;
1900  if(gzip) fin.push(boost::iostreams::gzip_decompressor());
1901  fin.push(in_file);
1902  const bool success = load_from_stream(graph_files[i], fin, line_parser);
1903  if(!success) {
1904  logstream(LOG_FATAL)
1905  << "\n\tError parsing file: " << graph_files[i] << std::endl;
1906  }
1907  fin.pop();
1908  if (gzip) fin.pop();
1909  }
1910  }
1911  rpc.full_barrier();
1912  } // end of load from hdfs
1913 
1914 
1915  /**
1916  * \brief Load a the graph from a given path using a user defined
1917  * line parser. This function should be called on all machines
1918  * simultaneously.
1919  *
1920  * This functions loads all files in the filesystem or on HDFS matching
1921  * the pattern "[prefix]*".
1922  *
1923  * Examples:
1924  *
1925  * <b> prefix = "webgraph.txt" </b>
1926  *
1927  * will load the file webgraph.txt if such a file exists. It will also
1928  * load all files in the current directory which begins with "webgraph.txt".
1929  * For instance, webgraph.txt.0, webgraph.txt.1, etc.
1930  *
1931  * <b>prefix = "graph/data"</b>
1932  *
1933  * will load all files in the "graph" directory which begin with "data"
1934  *
1935  * <b> prefix = "hdfs:///hdfs_server/graph/data" </b>
1936  *
1937  * will load all files from the HDFS server in the "/graph/" directory
1938  * which begin with "data".
1939  *
1940  * If files have the ".gz" suffix, it is automatically decompressed.
1941  *
1942  * The line_parser is a user defined function matching the following
1943  * prototype:
1944  *
1945  * \code
1946  * bool parser(graph_type& graph,
1947  * const std::string& filename,
1948  * const std::string& line);
1949  * \endcode
1950  *
1951  * The load() function will call the parser one line at a time, and the
1952  * paser function should process the line and call add_vertex / add_edge
1953  * functions in the graph. It should return true on success, and false
1954  * on failure. Since the parsing may be parallelized,
1955  * the parser should treat each line independently
1956  * and not depend on a sequential pass through a file.
1957  *
1958  * For instance, if the graph is in a simple edge list format, a parser
1959  * could be:
1960  * \code
1961  * bool edge_list_parser(graph_type& graph,
1962  * const std::string& filename,
1963  * const std::string& line) {
1964  * if (line.empty()) return true;
1965  * vertex_id_type source, target;
1966  * if (sscanf(line.c_str(), "%u %u", source, target) < 2) {
1967  * // parsed less than 2 objects, failure.
1968  * return false;
1969  * }
1970  * else {
1971  * graph.add_edge(source, target);
1972  * return true;
1973  * }
1974  * }
1975  * \endcode
1976  * \note Note that this is not an example a reliable parser since sscanf
1977  * may break if the size of vertex_id_type changes
1978  *
1979  * \param prefix The file prefix to read from. All files matching
1980  * the pattern "[prefix]*" are loaded. If prefix begins with
1981  * "hdfs://" the files are read from hdfs.
1982  * \param line_parser A user defined parsing function
1983  */
1984  void load(std::string prefix, line_parser_type line_parser) {
1985  rpc.full_barrier();
1986  if (prefix.length() == 0) return;
1987  if(boost::starts_with(prefix, "hdfs://")) {
1988  load_from_hdfs(prefix, line_parser);
1989  } else {
1990  load_from_posixfs(prefix, line_parser);
1991  }
1992  rpc.full_barrier();
1993  } // end of load
1994 
1995  /**
1996  * \brief Constructs a synthetic power law graph. Must be called on
1997  * all machines simultaneously.
1998  *
1999  * This function constructs a synthetic out-degree power law of "nverts"
2000  * vertices with a particular alpha parameter.
2001  * In other words, the probability that a vertex has out-degree \f$d\f$,
2002  * is given by:
2003  *
2004  * \f[ P(d) \propto d^{-\alpha} \f]
2005  *
2006  * By default, the out-degree distribution of each vertex
2007  * will have power-law distribution, but the in-degrees will be nearly
2008  * uniform. This can be reversed by setting the second argument "in_degree"
2009  * to true.
2010  *
2011  * \param nverts Number of vertices to generate
2012  * \param in_degree If set to true, the graph will have power-law in-degree.
2013  * Defaults to false.
2014  * \param alpha The alpha parameter in the power law distribution. Defaults
2015  * to 2.1
2016  * \param truncate Limits the maximum degree of any vertex. (thus generating
2017  * a truncated power-law distribution). Necessary
2018  * for large number of vertices (hundreds of millions)
2019  * since this function allocates a PDF vector of
2020  * "nverts" to sample from.
2021  */
2022  void load_synthetic_powerlaw(size_t nverts, bool in_degree = false,
2023  double alpha = 2.1, size_t truncate = (size_t)(-1)) {
2024  rpc.full_barrier();
2025  std::vector<double> prob(std::min(nverts, truncate), 0);
2026  logstream(LOG_INFO) << "constructing pdf" << std::endl;
2027  for(size_t i = 0; i < prob.size(); ++i)
2028  prob[i] = std::pow(double(i+1), -alpha);
2029  logstream(LOG_INFO) << "constructing cdf" << std::endl;
2030  random::pdf2cdf(prob);
2031  logstream(LOG_INFO) << "Building graph" << std::endl;
2032  size_t target_index = rpc.procid();
2033  size_t addedvtx = 0;
2034 
2035  // A large prime number
2036  const size_t HASH_OFFSET = 2654435761;
2037  for(size_t source = rpc.procid(); source < nverts;
2038  source += rpc.numprocs()) {
2039  const size_t out_degree = random::multinomial_cdf(prob) + 1;
2040  for(size_t i = 0; i < out_degree; ++i) {
2041  target_index = (target_index + HASH_OFFSET) % nverts;
2042  while (source == target_index) {
2043  target_index = (target_index + HASH_OFFSET) % nverts;
2044  }
2045  if(in_degree) add_edge(target_index, source);
2046  else add_edge(source, target_index);
2047  }
2048  ++addedvtx;
2049  if (addedvtx % 10000000 == 0) {
2050  logstream(LOG_EMPH) << addedvtx << " inserted\n";
2051  }
2052  }
2053  rpc.full_barrier();
2054  } // end of load random powerlaw
2055 
2056 
2057  /**
2058  * \brief load a graph with a standard format. Must be called on all
2059  * machines simultaneously.
2060  *
2061  * The supported graph formats are described in \ref graph_formats.
2062  */
2063  void load_format(const std::string& path, const std::string& format) {
2064  line_parser_type line_parser;
2065  if (format == "snap") {
2066  line_parser = builtin_parsers::snap_parser<distributed_graph>;
2067  load(path, line_parser);
2068  } else if (format == "adj") {
2069  line_parser = builtin_parsers::adj_parser<distributed_graph>;
2070  load(path, line_parser);
2071  } else if (format == "tsv") {
2072  line_parser = builtin_parsers::tsv_parser<distributed_graph>;
2073  load(path, line_parser);
2074  } else if (format == "graphjrl") {
2075  line_parser = builtin_parsers::graphjrl_parser<distributed_graph>;
2076  load(path, line_parser);
2077  } else if (format == "bintsv4") {
2078  load_direct(path,&graph_type::load_bintsv4_from_stream);
2079  } else if (format == "bin") {
2080  load_binary(path);
2081  } else {
2082  logstream(LOG_ERROR)
2083  << "Unrecognized Format \"" << format << "\"!" << std::endl;
2084  return;
2085  }
2086  } // end of load
2087 
2088 
2089  /** \brief Load a distributed graph from a native json output format.
2090  * This function must be called simultaneously on all machines.
2091  *
2092  * This function loads a sequence of files numbered
2093  * \li [prefix].0.gz
2094  * \li [prefix].1.gz
2095  * \li [prefix].2.gz
2096  * \li etc.
2097  *
2098  * These files must be previously saved using external graphbuilder library,
2099  * and must be saved <b>using the same number of machines</b>.
2100  *
2101  * A graph loaded using load_json() is already finalized and
2102  * structure modifications are not permitted after loading.
2103  */
2104  typedef json_parser<VertexData, EdgeData> json_parser_type;
2105  typedef typename json_parser_type::edge_parser_type edge_parser_type;
2106  typedef typename json_parser_type::vertex_parser_type vertex_parser_type;
2107  void load_json (const std::string& prefix, bool gzip=false,
2108  edge_parser_type edge_parser = builtin_parsers::empty_edge_parser<EdgeData>,
2109  vertex_parser_type vertex_parser = builtin_parsers::empty_vertex_parser<VertexData>
2110  ) {
2111  rpc.full_barrier();
2112  json_parser<VertexData, EdgeData> jsonparser(*this, prefix, gzip, edge_parser, vertex_parser);
2113  jsonparser.load();
2114  rpc.full_barrier();
2115  } // end of load_json
2116 
2117 
2118 /****************************************************************************
2119  * Vertex Set Functions *
2120  * ---------------------- *
2121  * Manages operations involving sets of vertices *
2122  ****************************************************************************/
2123 
2124  /**
2125  * \brief Retuns an empty set of vertices
2126  */
2128  return vertex_set(false);
2129  }
2130 
2131  /**
2132  * \brief Retuns a full set of vertices
2133  */
2135  return vertex_set(true);
2136  }
2137 
2138  ///
2139  vertex_set neighbors(const vertex_set& cur,
2140  edge_dir_type edir) {
2141  // foreach master bit which is set, set its corresponding mirror
2142  // synchronize master to mirrors
2143  vertex_set ret(empty_set());
2144  ret.make_explicit(*this);
2145 
2146  foreach(size_t lvid, cur.get_lvid_bitset(*this)) {
2147  if (edir == IN_EDGES || edir == ALL_EDGES) {
2148  foreach(local_edge_type e, l_vertex(lvid).in_edges()) {
2149  ret.set_lvid_unsync(e.source().id());
2150  }
2151  }
2152  if (edir == OUT_EDGES || edir == ALL_EDGES) {
2153  foreach(local_edge_type e, l_vertex(lvid).out_edges()) {
2154  ret.set_lvid_unsync(e.target().id());
2155  }
2156  }
2157  }
2158  ret.synchronize_mirrors_to_master_or(*this, vset_exchange);
2159  ret.synchronize_master_to_mirrors(*this, vset_exchange);
2160  return ret;
2161  }
2162 
2163 
2164  /**
2165  * \brief Constructs a vertex set from a predicate operation which
2166  * is executed on each vertex.
2167  *
2168  * This function selects a subset of vertices on which the predicate
2169  * evaluates to true.
2170  For instance if vertices contain an integer, the following
2171  * code will construct a set of vertices containing only vertices with data
2172  * which are a multiple of 2.
2173  *
2174  * \code
2175  * bool is_multiple_of_2(const graph_type::vertex_type& vertex) {
2176  * return vertex.data() % 2 == 0;
2177  * }
2178  * vertex_set even_vertices = graph.select(is_multiple_of_2);
2179  * \endcode
2180  *
2181  * select() also takes a second argument which restricts the set of vertices
2182  * queried. For instance,
2183  * \code
2184  * bool is_multiple_of_3(const graph_type::vertex_type& vertex) {
2185  * return vertex.data() % 3 == 0;
2186  * }
2187  * vertex_set div_6_vertices = graph.select(is_multiple_of_3, even_vertices);
2188  * \endcode
2189  * will select from the set of even vertices, all vertices which are also
2190  * divisible by 3. The resultant set is therefore the set of all vertices
2191  * which are divisible by 6.
2192  *
2193  * \param select_functor A function/functor which takes a
2194  * const vertex_type& argument and returns a boolean
2195  * denoting of the vertex is to be included in the
2196  * returned set
2197  * \param vset Optional. The set of vertices to evaluate the selection on.
2198  * Defaults to complete_set()
2199  */
2200  template <typename FunctionType>
2201  vertex_set select(FunctionType select_functor,
2202  const vertex_set& vset = complete_set()) {
2203  vertex_set ret(empty_set());
2204 
2205  ret.make_explicit(*this);
2206 #ifdef _OPENMP
2207  #pragma omp for
2208 #endif
2209  for (int i = 0; i < (int)local_graph.num_vertices(); ++i) {
2210  if (lvid2record[i].owner == rpc.procid() &&
2211  vset.l_contains((lvid_type)i)) {
2212  const vertex_type vtx(l_vertex(i));
2213  if (select_functor(vtx)) ret.set_lvid(i);
2214  }
2215  }
2216  ret.synchronize_master_to_mirrors(*this, vset_exchange);
2217  return ret;
2218  }
2219 
2220  /**
2221  * \brief Returns the number of vertices in a vertex set.
2222  *
2223  * This function must be called on all machines and returns the number of
2224  * vertices contained in the vertex set.
2225  *
2226  * For instance:
2227  * \code
2228  * graph.vertex_set_size(graph.complete_set());
2229  * \endcode
2230  * will always evaluate to graph.num_vertices();
2231  */
2232  size_t vertex_set_size(const vertex_set& vset) {
2233  size_t count = 0;
2234  for (int i = 0; i < (int)local_graph.num_vertices(); ++i) {
2235  count += (lvid2record[i].owner == rpc.procid() &&
2236  vset.l_contains((lvid_type)i));
2237  }
2238  rpc.all_reduce(count);
2239  return count;
2240  }
2241 
2242 
2243  /**
2244  * \brief Returns true if the vertex set is empty
2245  *
2246  * This function must be called on all machines and returns
2247  * true if the vertex set is empty
2248  */
2249  bool vertex_set_empty(const vertex_set& vset) {
2250  if (vset.lazy) return !vset.is_complete_set;
2251 
2252  size_t count = vset.get_lvid_bitset(*this).empty();
2253  rpc.all_reduce(count);
2254  return count == rpc.numprocs();
2255  }
2256 /****************************************************************************
2257  * Internal Functions *
2258  * ---------------------- *
2259  * These functions functions and types provide internal access to the *
2260  * underlying graph representation. They should not be used unless you *
2261  * *really* know what you are doing. *
2262  ****************************************************************************/
2263 
2264 
2265  /**
2266  * \internal
2267  * The vertex record stores information associated with each
2268  * vertex on this proc
2269  */
2270  struct vertex_record {
2271  /// The official owning processor for this vertex
2272  procid_t owner;
2273  /// The local vid of this vertex on this proc
2274  vertex_id_type gvid;
2275  /// The number of in edges
2277  /** The set of proc that mirror this vertex. The owner should
2278  NOT be in this set.*/
2279  mirror_type _mirrors;
2280  vertex_record() :
2281  owner(-1), gvid(-1), num_in_edges(0), num_out_edges(0) { }
2282  vertex_record(const vertex_id_type& vid) :
2283  owner(-1), gvid(vid), num_in_edges(0), num_out_edges(0) { }
2284  procid_t get_owner () const { return owner; }
2285  const mirror_type& mirrors() const { return _mirrors; }
2286  size_t num_mirrors() const { return _mirrors.popcount(); }
2287 
2288  void clear() {
2289  _mirrors.clear();
2290  }
2291 
2292  void load(iarchive& arc) {
2293  clear();
2294  arc >> owner
2295  >> gvid
2296  >> num_in_edges
2297  >> num_out_edges
2298  >> _mirrors;
2299  }
2300 
2301  void save(oarchive& arc) const {
2302  arc << owner
2303  << gvid
2304  << num_in_edges
2305  << num_out_edges
2306  << _mirrors;
2307  } // end of save
2308  }; // end of vertex_record
2309 
2310 
2311 
2312 
2313  /** \internal
2314  * \brief converts a local vertex ID to a local vertex object
2315  */
2316  local_vertex_type l_vertex(lvid_type vid) {
2317  return local_vertex_type(*this, vid);
2318  }
2319 
2320  /** \internal
2321  *\brief Get the Total number of vertex replicas in the graph */
2322  size_t num_replicas() const { return nreplicas; }
2323 
2324  /** \internal
2325  *\brief Get the number of vertices local to this proc */
2326  size_t num_local_vertices() const { return local_graph.num_vertices(); }
2327 
2328  /** \internal
2329  *\brief Get the number of edges local to this proc */
2330  size_t num_local_edges() const { return local_graph.num_edges(); }
2331 
2332  /** \internal
2333  *\brief Get the number of vertices owned by this proc */
2334  size_t num_local_own_vertices() const { return local_own_nverts; }
2335 
2336  /** \internal
2337  *\brief Convert a global vid to a local vid */
2338  lvid_type local_vid (const vertex_id_type vid) const {
2339  // typename boost::unordered_map<vertex_id_type, lvid_type>::
2340  // const_iterator iter = vid2lvid.find(vid);
2341  typename cuckoo_map_type::const_iterator iter = vid2lvid.find(vid);
2342  return iter->second;
2343  } // end of local_vertex_id
2344 
2345  /** \internal
2346  *\brief Convert a local vid to a global vid */
2347  vertex_id_type global_vid(const lvid_type lvid) const {
2348  ASSERT_LT(lvid, lvid2record.size());
2349  return lvid2record[lvid].gvid;
2350  } // end of global_vertex_id
2351 
2352 
2353 
2354  /**
2355  * \internal
2356  * \brief Returns an edge list of all in edges of a local vertex ID
2357  * on the local graph
2358  *
2359  * Equivalent to l_vertex(lvid).in_edges()
2360  */
2361  local_edge_list_type l_in_edges(const lvid_type lvid) {
2362  return local_edge_list_type(*this, local_graph.in_edges(lvid));
2363  }
2364 
2365  /**
2366  * \internal
2367  * \brief Returns the number of in edges of a local vertex ID
2368  * on the local graph
2369  *
2370  * Equivalent to l_vertex(lvid).num in_edges()
2371  */
2372  size_t l_num_in_edges(const lvid_type lvid) const {
2373  return local_graph.num_in_edges(lvid);
2374  }
2375 
2376  /**
2377  * \internal
2378  * \brief Returns an edge list of all out edges of a local vertex ID
2379  * on the local graph
2380  *
2381  * Equivalent to l_vertex(lvid).out_edges()
2382  */
2383  local_edge_list_type l_out_edges(const lvid_type lvid) {
2384  return local_edge_list_type(*this, local_graph.out_edges(lvid));
2385  }
2386 
2387  /**
2388  * \internal
2389  * \brief Returns the number of out edges of a local vertex ID
2390  * on the local graph
2391  *
2392  * Equivalent to l_vertex(lvid).num out_edges()
2393  */
2394  size_t l_num_out_edges(const lvid_type lvid) const {
2395  return local_graph.num_out_edges(lvid);
2396  }
2397 
2398  procid_t procid() const {
2399  return rpc.procid();
2400  }
2401 
2402 
2403  procid_t numprocs() const {
2404  return rpc.numprocs();
2405  }
2406 
2407  distributed_control& dc() {
2408  return rpc.dc();
2409  }
2410 
2411 
2412 
2413  /** \internal
2414  * \brief Returns the internal vertex record of a given global vertex ID
2415  */
2416  const vertex_record& get_vertex_record(vertex_id_type vid) const {
2417  // typename boost::unordered_map<vertex_id_type, lvid_type>::
2418  // const_iterator iter = vid2lvid.find(vid);
2419  typename cuckoo_map_type::const_iterator iter = vid2lvid.find(vid);
2420  ASSERT_TRUE(iter != vid2lvid.end());
2421  return lvid2record[iter->second];
2422  }
2423 
2424  /** \internal
2425  * \brief Returns the internal vertex record of a given local vertex ID
2426  */
2427  vertex_record& l_get_vertex_record(lvid_type lvid) {
2428  ASSERT_LT(lvid, lvid2record.size());
2429  return lvid2record[lvid];
2430  }
2431 
2432  /** \internal
2433  * \brief Returns the internal vertex record of a given local vertex ID
2434  */
2435  const vertex_record& l_get_vertex_record(lvid_type lvid) const {
2436  ASSERT_LT(lvid, lvid2record.size());
2437  return lvid2record[lvid];
2438  }
2439 
2440  /** \internal
2441  * \brief Returns true if the provided global vertex ID is a
2442  * master vertex on this machine and false otherwise.
2443  */
2444  bool is_master(vertex_id_type vid) const {
2445  typename cuckoo_map_type::const_iterator iter = vid2lvid.find(vid);
2446  return (iter != vid2lvid.end()) && l_is_master(iter->second);
2447  }
2448  /** \internal
2449  * \brief Returns true if the provided local vertex ID is a master vertex.
2450  * Returns false otherwise.
2451  */
2452  bool l_is_master(lvid_type lvid) const {
2453  ASSERT_LT(lvid, lvid2record.size());
2454  return lvid2record[lvid].owner == rpc.procid();
2455  }
2456 
2457  /** \internal
2458  * \brief Returns the master procid for vertex lvid.
2459  */
2460  procid_t l_master(lvid_type lvid) const {
2461  ASSERT_LT(lvid, lvid2record.size());
2462  return lvid2record[lvid].owner;
2463  }
2464 
2465 
2466  /** \internal
2467  * \brief Returns a reference to the internal graph representation
2468  */
2469  local_graph_type& get_local_graph() {
2470  return local_graph;
2471  }
2472 
2473  /** \internal
2474  * \brief Returns a const reference to the internal graph representation
2475  */
2476  const local_graph_type& get_local_graph() const {
2477  return local_graph;
2478  }
2479 
2480 
2481 
2482 
2483  /** \internal
2484  * This function synchronizes the master vertex data with all the mirrors.
2485  * This function must be called simultaneously by all machines
2486  */
2487  void synchronize() {
2488  typedef std::pair<vertex_id_type, vertex_data_type> pair_type;
2489  typename buffered_exchange<pair_type>::buffer_type recv_buffer;
2490  procid_t sending_proc;
2491  // Loop over all the local vertex records
2492  for(lvid_type lvid = 0; lvid < lvid2record.size(); ++lvid) {
2493  const vertex_record& record = lvid2record[lvid];
2494  // if this machine is the owner of a record then send the
2495  // vertex data to all mirrors
2496  if(record.owner == rpc.procid()) {
2497  foreach(size_t proc, record.mirrors()) {
2498  const pair_type pair(record.gvid, local_graph.vertex_data(lvid));
2499  vertex_exchange.send(proc, pair);
2500  }
2501  }
2502  // Receive any vertex data and update local mirrors
2503  while(vertex_exchange.recv(sending_proc, recv_buffer)) {
2504  foreach(const pair_type& pair, recv_buffer) {
2505  vertex(pair.first).data() = pair.second;
2506  }
2507  recv_buffer.clear();
2508  }
2509  }
2510  vertex_exchange.flush();
2511  while(vertex_exchange.recv(sending_proc, recv_buffer)) {
2512  foreach(const pair_type& pair, recv_buffer) {
2513  vertex(pair.first).data() = pair.second;
2514  }
2515  recv_buffer.clear();
2516  }
2517  ASSERT_TRUE(vertex_exchange.empty());
2518  } // end of synchronize
2519 
2520 
2521 
2522 
2523 
2524 
2525  /** \internal
2526  * vertex type while provides access to local graph vertices.
2527  */
2528  struct local_vertex_type {
2529  distributed_graph& graph_ref;
2530  lvid_type lvid;
2531 
2532  local_vertex_type(distributed_graph& graph_ref, lvid_type lvid):
2533  graph_ref(graph_ref), lvid(lvid) { }
2534 
2535  /// \brief Can be casted from local_vertex_type using an explicit cast
2536  explicit local_vertex_type(vertex_type v) :graph_ref(v.graph_ref),lvid(v.lvid) { }
2537  /// \brief Can be casted to vertex_type using an explicit cast
2538  operator vertex_type() const {
2539  return vertex_type(graph_ref, lvid);
2540  }
2541 
2542  bool operator==(local_vertex_type& v) const {
2543  return lvid == v.lvid;
2544  }
2545 
2546  /// \brief Returns a reference to the data on the local vertex
2547  const vertex_data_type& data() const {
2548  return graph_ref.get_local_graph().vertex_data(lvid);
2549  }
2550 
2551  /// \brief Returns a reference to the data on the local vertex
2552  vertex_data_type& data() {
2553  return graph_ref.get_local_graph().vertex_data(lvid);
2554  }
2555 
2556  /** \brief Returns the number of in edges on the
2557  * local graph of this local vertex
2558  */
2559  size_t num_in_edges() const {
2560  return graph_ref.get_local_graph().num_in_edges(lvid);
2561  }
2562 
2563  /** \brief Returns the number of in edges on the
2564  * local graph of this local vertex
2565  */
2566  size_t num_out_edges() const {
2567  return graph_ref.get_local_graph().num_out_edges(lvid);
2568  }
2569 
2570  /// \brief Returns the local ID of this local vertex
2571  lvid_type id() const {
2572  return lvid;
2573  }
2574 
2575  /// \brief Returns the global ID of this local vertex
2576  vertex_id_type global_id() const {
2577  return graph_ref.global_vid(lvid);
2578  }
2579 
2580  /** \brief Returns a list of all in edges on the
2581  * local graph of this local vertex
2582  */
2583  local_edge_list_type in_edges() {
2584  return graph_ref.l_in_edges(lvid);
2585  }
2586 
2587  /** \brief Returns a list of all out edges on the
2588  * local graph of this local vertex
2589  */
2590  local_edge_list_type out_edges() {
2591  return graph_ref.l_out_edges(lvid);
2592  }
2593 
2594  /** \brief Returns the owner of this local vertex
2595  */
2596  procid_t owner() const {
2597  return graph_ref.l_get_vertex_record(lvid).owner;
2598  }
2599 
2600  /** \brief Returns the owner of this local vertex
2601  */
2602  bool owned() const {
2603  return graph_ref.l_get_vertex_record(lvid).owner == graph_ref.procid();
2604  }
2605 
2606  /** \brief Returns the number of in_edges of this vertex
2607  * on the global graph
2608  */
2609  size_t global_num_in_edges() const {
2610  return graph_ref.l_get_vertex_record(lvid).num_in_edges;
2611  }
2612 
2613 
2614  /** \brief Returns the number of out_edges of this vertex
2615  * on the global graph
2616  */
2617  size_t global_num_out_edges() const {
2618  return graph_ref.l_get_vertex_record(lvid).num_out_edges;
2619  }
2620 
2621 
2622  /** \brief Returns the set of mirrors of this vertex
2623  */
2624  const mirror_type& mirrors() const {
2625  return graph_ref.l_get_vertex_record(lvid)._mirrors;
2626  }
2627 
2628  size_t num_mirrors() const {
2629  return graph_ref.l_get_vertex_record(lvid).num_mirrors();
2630  }
2631 
2632  /** \brief Returns the vertex record of this
2633  * this local vertex
2634  */
2635  vertex_record& get_vertex_record() {
2636  return graph_ref.l_get_vertex_record(lvid);
2637  }
2638  };
2639 
2640 
2641  /** \internal
2642  * edge type which provides access to local graph edges */
2643  class local_edge_type {
2644  private:
2645  distributed_graph& graph_ref;
2646  typename local_graph_type::edge_type e;
2647  public:
2648  local_edge_type(distributed_graph& graph_ref,
2649  typename local_graph_type::edge_type e):
2650  graph_ref(graph_ref), e(e) { }
2651 
2652  /// \brief Can be converted from edge_type via an explicit cast
2653  explicit local_edge_type(edge_type ge) :graph_ref(ge.graph_ref),e(ge.e) { }
2654 
2655  /// \brief Can be casted to edge_type using an explicit cast
2656  operator edge_type() const {
2657  return edge_type(graph_ref, e);
2658  }
2659 
2660  /// \brief Returns the source local vertex of the edge
2661  local_vertex_type source() { return local_vertex_type(graph_ref, e.source().id()); }
2662 
2663  /// \brief Returns the target local vertex of the edge
2664  local_vertex_type target() { return local_vertex_type(graph_ref, e.target().id()); }
2665 
2666 
2667 
2668  /// \brief Returns a constant reference to the data on the vertex
2669  const edge_data_type& data() const { return e.data(); }
2670 
2671  /// \brief Returns a reference to the data on the vertex
2672  edge_data_type& data() { return e.data(); }
2673 
2674  /// \brief Returns the internal ID of this edge
2675  edge_id_type id() const { return e.id(); }
2676  };
2677 
2678  /** \internal
2679  * \brief A functor which converts local_graph_type::edge_type to
2680  * local_edge_type
2681  */
2682  struct make_local_edge_type_functor {
2683  typedef typename local_graph_type::edge_type argument_type;
2684  typedef local_edge_type result_type;
2685  distributed_graph& graph_ref;
2686  make_local_edge_type_functor(distributed_graph& graph_ref):
2687  graph_ref(graph_ref) { }
2688  result_type operator() (const argument_type et) const {
2689  return local_edge_type(graph_ref, et);
2690  }
2691  };
2692 
2693 
2694  /** \internal
2695  * \brief A list of edges. Used by l_in_edges() and l_out_edges()
2696  */
2697  struct local_edge_list_type {
2698  make_local_edge_type_functor me_functor;
2699  typename local_graph_type::edge_list_type elist;
2700 
2701  typedef boost::transform_iterator<make_local_edge_type_functor,
2702  typename local_graph_type::edge_list_type::iterator> iterator;
2703  typedef iterator const_iterator;
2704 
2705  local_edge_list_type(distributed_graph& graph_ref,
2706  typename local_graph_type::edge_list_type elist) :
2707  me_functor(graph_ref), elist(elist) { }
2708  /// \brief Returns the number of edges in the list
2709  size_t size() const { return elist.size(); }
2710 
2711  /// \brief Random access to the list elements
2712  local_edge_type operator[](size_t i) const { return me_functor(elist[i]); }
2713 
2714  /** \brief Returns an iterator to the beginning of the list.
2715  *
2716  * Returns an iterator to the beginning of the list. \see end()
2717  * The iterator_type is local_edge_list_type::iterator.
2718  *
2719  * \code
2720  * local_edge_list_type::iterator iter = elist.begin();
2721  * while(iter != elist.end()) {
2722  * ... [do stuff] ...
2723  * ++iter;
2724  * }
2725  * \endcode
2726  *
2727  */
2728  iterator begin() const { return
2729  boost::make_transform_iterator(elist.begin(), me_functor); }
2730 
2731  /** \brief Returns an iterator to the end of the list.
2732  *
2733  * Returns an iterator to the end of the list. \see begin()
2734  * The iterator_type is local_edge_list_type::iterator.
2735  *
2736  * \code
2737  * local_edge_list_type::iterator iter = elist.begin();
2738  * while(iter != elist.end()) {
2739  * ... [do stuff] ...
2740  * ++iter;
2741  * }
2742  * \endcode
2743  *
2744  */
2745  iterator end() const { return
2746  boost::make_transform_iterator(elist.end(), me_functor); }
2747 
2748  /// \brief Returns true if the list is empty
2749  bool empty() const { return elist.empty(); }
2750  };
2751 
2752 
2753  private:
2754 
2755  // PRIVATE DATA MEMBERS ===================================================>
2756  /** The rpc interface for this class */
2757  mutable dc_dist_object<distributed_graph> rpc;
2758 
2759  bool finalized;
2760 
2761  /** The local graph data */
2762  local_graph_type local_graph;
2763 
2764  /** The map from global vertex ids to vertex records */
2765  std::vector<vertex_record> lvid2record;
2766 
2767  // boost::unordered_map<vertex_id_type, lvid_type> vid2lvid;
2768  /** The map from global vertex ids back to local vertex ids */
2769  typedef cuckoo_map_pow2<vertex_id_type, lvid_type, 3, uint32_t> cuckoo_map_type;
2770  typedef cuckoo_map_type vid2lvid_map_type;
2771 
2772  cuckoo_map_type vid2lvid;
2773 
2774 
2775  /** The global number of vertices and edges */
2776  size_t nverts, nedges;
2777 
2778  /** The number of vertices owned by this proc */
2779  size_t local_own_nverts;
2780 
2781  /** The global number of vertex replica */
2782  size_t nreplicas;
2783 
2784  /** The beginning edge id for this machine */
2785  size_t begin_eid;
2786 
2787  /** pointer to the distributed ingress object*/
2788  idistributed_ingress<VertexData, EdgeData>* ingress_ptr;
2789 
2790  /** Buffered Exchange used by synchronize() */
2791  buffered_exchange<std::pair<vertex_id_type, vertex_data_type> > vertex_exchange;
2792 
2793  /** Buffered Exchange used by vertex sets */
2794  buffered_exchange<vertex_id_type> vset_exchange;
2795 
2796  /** Command option to disable parallel ingress. Used for simulating single node ingress */
2797  bool parallel_ingress;
2798 
2799  void set_ingress_method(const std::string& method,
2800  size_t bufsize = 50000, bool usehash = false, bool userecent = false) {
2801  if(ingress_ptr != NULL) { delete ingress_ptr; ingress_ptr = NULL; }
2802  if (method == "batch") {
2803  logstream(LOG_EMPH) << "Use batch ingress, bufsize: " << bufsize
2804  << ", usehash: " << usehash << ", userecent" << userecent << std::endl;
2805  ingress_ptr = new distributed_batch_ingress<VertexData, EdgeData>(rpc.dc(), *this,
2806  bufsize, usehash, userecent);
2807  } else if (method == "oblivious") {
2808  logstream(LOG_EMPH) << "Use oblivious ingress, usehash: " << usehash
2809  << ", userecent: " << userecent << std::endl;
2810  ingress_ptr = new distributed_oblivious_ingress<VertexData, EdgeData>(rpc.dc(), *this,
2811  usehash, userecent);
2812  } else if (method == "identity") {
2813  logstream(LOG_EMPH) << "Use identity ingress" << std::endl;
2814  ingress_ptr = new distributed_identity_ingress<VertexData, EdgeData>(rpc.dc(), *this);
2815  } else if (method == "grid") {
2816  logstream(LOG_EMPH) << "Use random grid ingress" << std::endl;
2817  ingress_ptr = new distributed_constrained_random_ingress<VertexData, EdgeData>(rpc.dc(), *this, "grid");
2818  } else if (method == "pds") {
2819  logstream(LOG_EMPH) << "Use random pds ingress" << std::endl;
2820  ingress_ptr = new distributed_constrained_random_ingress<VertexData, EdgeData>(rpc.dc(), *this, "pds");
2821  }else {
2822  logstream(LOG_EMPH) << "Use random ingress" << std::endl;
2823  ingress_ptr = new distributed_random_ingress<VertexData, EdgeData>(rpc.dc(), *this);
2824  }
2825  } // end of set ingress method
2826 
2827 
2828  /**
2829  \internal
2830  This internal function is used to load a single line from an input stream
2831  */
2832  template<typename Fstream>
2833  bool load_from_stream(std::string filename, Fstream& fin,
2834  line_parser_type& line_parser) {
2835  size_t linecount = 0;
2836  timer ti; ti.start();
2837  while(fin.good() && !fin.eof()) {
2838  std::string line;
2839  std::getline(fin, line);
2840  if(line.empty()) continue;
2841  if(fin.fail()) break;
2842  const bool success = line_parser(*this, filename, line);
2843  if (!success) {
2844  logstream(LOG_WARNING)
2845  << "Error parsing line " << linecount << " in "
2846  << filename << ": " << std::endl
2847  << "\t\"" << line << "\"" << std::endl;
2848  return false;
2849  }
2850  ++linecount;
2851  if (ti.current_time() > 5.0) {
2852  logstream(LOG_INFO) << linecount << " Lines read" << std::endl;
2853  ti.start();
2854  }
2855  }
2856  return true;
2857  } // end of load from stream
2858 
2859 
2860  template<typename Fstream, typename Writer>
2861  void save_vertex_to_stream(vertex_type& vertex, Fstream& fout, Writer writer) {
2862  fout << writer.save_vertex(vertex);
2863  } // end of save_vertex_to_stream
2864 
2865 
2866  template<typename Fstream, typename Writer>
2867  void save_edge_to_stream(edge_type& edge, Fstream& fout, Writer writer) {
2868  std::string ret = writer.save_edge(edge);
2869  fout << ret;
2870  } // end of save_edge_to_stream
2871 
2872 
2873  void save_bintsv4_to_stream(std::ostream& out) {
2874  for (int i = 0; i < (int)local_graph.num_vertices(); ++i) {
2875  uint32_t src = l_vertex(i).global_id();
2876  foreach(local_edge_type e, l_vertex(i).out_edges()) {
2877  uint32_t dest = e.target().global_id();
2878  out.write(reinterpret_cast<char*>(&src), 4);
2879  out.write(reinterpret_cast<char*>(&dest), 4);
2880  }
2881  if (l_vertex(i).owner() == rpc.procid()) {
2882  vertex_type gv = vertex_type(l_vertex(i));
2883  // store disconnected vertices if I am the master of the vertex
2884  if (gv.num_in_edges() == 0 && gv.num_out_edges() == 0) {
2885  out.write(reinterpret_cast<char*>(&src), 4);
2886  uint32_t dest = (uint32_t)(-1);
2887  out.write(reinterpret_cast<char*>(&dest), 4);
2888  }
2889  }
2890  }
2891  }
2892 
2893  bool load_bintsv4_from_stream(std::istream& in) {
2894  while(in.good()) {
2895  uint32_t src, dest;
2896  in.read(reinterpret_cast<char*>(&src), 4);
2897  in.read(reinterpret_cast<char*>(&dest), 4);
2898  if (in.fail()) break;
2899  if (dest == (uint32_t)(-1)) {
2900  add_vertex(src);
2901  }
2902  else {
2903  add_edge(src, dest);
2904  }
2905  }
2906  return true;
2907  }
2908 
2909 
2910  /** \brief Saves a distributed graph using a direct ostream saving function
2911  *
2912  * This function saves a sequence of files numbered
2913  * \li [prefix]_0
2914  * \li [prefix]_1
2915  * \li [prefix]_2
2916  * \li etc.
2917  *
2918  * This files can be loaded with direct_stream_load().
2919  */
2920  void save_direct(const std::string& prefix, bool gzip,
2921  boost::function<void (graph_type*, std::ostream&)> saver) {
2922  rpc.full_barrier();
2923  finalize();
2924  timer savetime; savetime.start();
2925  std::string fname = prefix + "_" + tostr(rpc.procid() + 1) + "_of_" +
2926  tostr(rpc.numprocs());
2927  if (gzip) fname = fname + ".gz";
2928  logstream(LOG_INFO) << "Save graph to " << fname << std::endl;
2929  if(boost::starts_with(fname, "hdfs://")) {
2930  graphlab::hdfs hdfs;
2931  graphlab::hdfs::fstream out_file(hdfs, fname, true);
2932  boost::iostreams::filtering_stream<boost::iostreams::output> fout;
2933  if (gzip) fout.push(boost::iostreams::gzip_compressor());
2934  fout.push(out_file);
2935  if (!fout.good()) {
2936  logstream(LOG_FATAL) << "\n\tError opening file: " << fname << std::endl;
2937  exit(-1);
2938  }
2939  saver(this, boost::ref(fout));
2940  fout.pop();
2941  if (gzip) fout.pop();
2942  out_file.close();
2943  } else {
2944  std::ofstream out_file(fname.c_str(),
2945  std::ios_base::out | std::ios_base::binary);
2946  if (!out_file.good()) {
2947  logstream(LOG_FATAL) << "\n\tError opening file: " << fname << std::endl;
2948  exit(-1);
2949  }
2950  boost::iostreams::filtering_stream<boost::iostreams::output> fout;
2951  if (gzip) fout.push(boost::iostreams::gzip_compressor());
2952  fout.push(out_file);
2953  saver(this, boost::ref(fout));
2954  fout.pop();
2955  if (gzip) fout.pop();
2956  out_file.close();
2957  }
2958  logstream(LOG_INFO) << "Finish saving graph to " << fname << std::endl
2959  << "Finished saving bintsv4 graph: "
2960  << savetime.current_time() << std::endl;
2961  rpc.full_barrier();
2962  } // end of save
2963 
2964 
2965 
2966  /**
2967  * \brief Load a graph from a collection of files in stored on
2968  * the filesystem using the user defined line parser. Like
2969  * \ref load(const std::string& path, line_parser_type line_parser)
2970  * but only loads from the filesystem.
2971  */
2972  void load_direct_from_posixfs(std::string prefix,
2973  boost::function<bool (graph_type*, std::istream&)> parser) {
2974  std::string directory_name; std::string original_path(prefix);
2975  boost::filesystem::path path(prefix);
2976  std::string search_prefix;
2977  if (boost::filesystem::is_directory(path)) {
2978  // if this is a directory
2979  // force a "/" at the end of the path
2980  // make sure to check that the path is non-empty. (you do not
2981  // want to make the empty path "" the root path "/" )
2982  directory_name = path.native();
2983  }
2984  else {
2985  directory_name = path.parent_path().native();
2986  search_prefix = path.filename().native();
2987  directory_name = (directory_name.empty() ? "." : directory_name);
2988  }
2989  std::vector<std::string> graph_files;
2990  fs_util::list_files_with_prefix(directory_name, search_prefix, graph_files);
2991  if (graph_files.size() == 0) {
2992  logstream(LOG_WARNING) << "No files found matching " << original_path << std::endl;
2993  }
2994  for(size_t i = 0; i < graph_files.size(); ++i) {
2995  if (i % rpc.numprocs() == rpc.procid()) {
2996  logstream(LOG_EMPH) << "Loading graph from file: " << graph_files[i] << std::endl;
2997  // is it a gzip file ?
2998  const bool gzip = boost::ends_with(graph_files[i], ".gz");
2999  // open the stream
3000  std::ifstream in_file(graph_files[i].c_str(),
3001  std::ios_base::in | std::ios_base::binary);
3002  // attach gzip if the file is gzip
3003  boost::iostreams::filtering_stream<boost::iostreams::input> fin;
3004  // Using gzip filter
3005  if (gzip) fin.push(boost::iostreams::gzip_decompressor());
3006  fin.push(in_file);
3007  const bool success = parser(this, boost::ref(fin));
3008  if(!success) {
3009  logstream(LOG_FATAL)
3010  << "\n\tError parsing file: " << graph_files[i] << std::endl;
3011  }
3012  fin.pop();
3013  if (gzip) fin.pop();
3014  }
3015  }
3016  rpc.full_barrier();
3017  }
3018 
3019  /**
3020  * \brief Load a graph from a collection of files in stored on
3021  * the HDFS using the user defined line parser. Like
3022  * \ref load(const std::string& path, line_parser_type line_parser)
3023  * but only loads from HDFS.
3024  */
3025  void load_direct_from_hdfs(std::string prefix,
3026  boost::function<bool (graph_type*, std::istream&)> parser) {
3027  // force a "/" at the end of the path
3028  // make sure to check that the path is non-empty. (you do not
3029  // want to make the empty path "" the root path "/" )
3030  std::string path = prefix;
3031  if (path.length() > 0 && path[path.length() - 1] != '/') path = path + "/";
3032  if(!hdfs::has_hadoop()) {
3033  logstream(LOG_FATAL)
3034  << "\n\tAttempting to load a graph from HDFS but GraphLab"
3035  << "\n\twas built without HDFS."
3036  << std::endl;
3037  }
3038  hdfs& hdfs = hdfs::get_hdfs();
3039  std::vector<std::string> graph_files;
3040  graph_files = hdfs.list_files(path);
3041  if (graph_files.size() == 0) {
3042  logstream(LOG_WARNING) << "No files found matching " << prefix << std::endl;
3043  }
3044  for(size_t i = 0; i < graph_files.size(); ++i) {
3045  if (i % rpc.numprocs() == rpc.procid()) {
3046  logstream(LOG_EMPH) << "Loading graph from file: " << graph_files[i] << std::endl;
3047  // is it a gzip file ?
3048  const bool gzip = boost::ends_with(graph_files[i], ".gz");
3049  // open the stream
3050  graphlab::hdfs::fstream in_file(hdfs, graph_files[i]);
3051  boost::iostreams::filtering_stream<boost::iostreams::input> fin;
3052  if(gzip) fin.push(boost::iostreams::gzip_decompressor());
3053  fin.push(in_file);
3054  const bool success = parser(this, boost::ref(fin));
3055  if(!success) {
3056  logstream(LOG_FATAL)
3057  << "\n\tError parsing file: " << graph_files[i] << std::endl;
3058  }
3059  fin.pop();
3060  if (gzip) fin.pop();
3061  }
3062  }
3063  rpc.full_barrier();
3064  }
3065 
3066  void load_direct(std::string prefix,
3067  boost::function<bool (graph_type*, std::istream&)> parser) {
3068  rpc.full_barrier();
3069  if(boost::starts_with(prefix, "hdfs://")) {
3070  load_direct_from_hdfs(prefix, parser);
3071  } else {
3072  load_direct_from_posixfs(prefix, parser);
3073  }
3074  rpc.full_barrier();
3075  } // end of load
3076 
3077 
3078 
3079  }; // End of graph
3080 } // end of namespace graphlab
3081 #include <graphlab/macros_undef.hpp>
3082 
3083 #endif
3084