23 #ifndef GRAPHLAB_DISTRIBUTED_GRAPH_HPP
24 #define GRAPHLAB_DISTRIBUTED_GRAPH_HPP
37 #include <graphlab/util/dense_bitset.hpp>
46 #include <boost/functional.hpp>
47 #include <boost/algorithm/string/predicate.hpp>
48 #include <boost/iostreams/stream.hpp>
49 #include <boost/iostreams/filtering_streambuf.hpp>
50 #include <boost/iostreams/filtering_stream.hpp>
51 #include <boost/iostreams/copy.hpp>
52 #include <boost/iostreams/filter/gzip.hpp>
53 #include <boost/filesystem.hpp>
54 #include <boost/concept/requires.hpp>
58 #include <graphlab/logger/assertions.hpp>
60 #include <graphlab/rpc/dc.hpp>
61 #include <graphlab/rpc/dc_dist_object.hpp>
62 #include <graphlab/rpc/buffered_exchange.hpp>
63 #include <graphlab/util/random.hpp>
64 #include <graphlab/util/branch_hints.hpp>
65 #include <graphlab/util/generics/conditional_addition_wrapper.hpp>
67 #include <graphlab/options/graphlab_options.hpp>
68 #include <graphlab/serialization/serialization_includes.hpp>
69 #include <graphlab/vertex_program/op_plus_eq_concept.hpp>
71 #include <graphlab/graph/local_graph.hpp>
72 #include <graphlab/graph/ingress/idistributed_ingress.hpp>
73 #include <graphlab/graph/ingress/distributed_ingress_base.hpp>
74 #include <graphlab/graph/ingress/distributed_batch_ingress.hpp>
75 #include <graphlab/graph/ingress/distributed_oblivious_ingress.hpp>
76 #include <graphlab/graph/ingress/distributed_random_ingress.hpp>
77 #include <graphlab/graph/ingress/distributed_identity_ingress.hpp>
79 #include <graphlab/graph/ingress/sharding_constraint.hpp>
80 #include <graphlab/graph/ingress/distributed_constrained_random_ingress.hpp>
83 #include <graphlab/util/cuckoo_map_pow2.hpp>
85 #include <graphlab/util/fs_util.hpp>
86 #include <graphlab/util/hdfs.hpp>
89 #include <graphlab/graph/builtin_parsers.hpp>
90 #include <graphlab/graph/json_parser.hpp>
91 #include <graphlab/graph/vertex_set.hpp>
93 #include <graphlab/macros_def.hpp>
302 template<
typename VertexData,
typename EdgeData>
321 BOOST_CONCEPT_ASSERT((boost::DefaultConstructible<VertexData>));
348 BOOST_CONCEPT_ASSERT((boost::DefaultConstructible<EdgeData>));
384 friend class distributed_ingress_base<VertexData, EdgeData>;
387 friend class distributed_batch_ingress<VertexData, EdgeData>;
390 friend class json_parser<VertexData, EdgeData>;
397 typedef bool edge_list_type;
400 struct local_vertex_type;
401 struct local_edge_list_type;
402 class local_edge_type;
426 graph_ref(graph_ref), lvid(lvid) { }
432 return lvid == v.lvid;
437 return graph_ref.get_local_graph().vertex_data(lvid);
442 return graph_ref.get_local_graph().vertex_data(lvid);
447 return graph_ref.l_get_vertex_record(lvid).num_in_edges;
452 return graph_ref.l_get_vertex_record(lvid).num_out_edges;
456 vertex_id_type
id()
const {
457 return graph_ref.global_vid(lvid);
463 edge_list_type in_edges() __attribute__ ((noreturn)) {
468 edge_list_type out_edges() __attribute__ ((noreturn)) {
511 graph_ref(graph_ref), edge(edge) { }
592 rpc(dc, this), finalized(false), vid2lvid(-1),
593 nverts(0), nedges(0), local_own_nverts(0), nreplicas(0),
594 ingress_ptr(NULL), vertex_exchange(dc), vset_exchange(dc), parallel_ingress(true) {
601 size_t bufsize = 50000;
602 bool usehash =
false;
603 bool userecent =
false;
604 std::string ingress_method =
"random";
605 std::vector<std::string>
keys = opts.get_graph_args().get_option_keys();
606 foreach(std::string opt, keys) {
607 if (opt ==
"ingress") {
608 opts.get_graph_args().
get_option(
"ingress", ingress_method);
610 logstream(
LOG_EMPH) <<
"Graph Option: ingress = "
611 << ingress_method << std::endl;
612 }
else if (opt ==
"bufsize") {
613 opts.get_graph_args().
get_option(
"bufsize", bufsize);
615 logstream(
LOG_EMPH) <<
"Graph Option: bufsize = "
616 << bufsize << std::endl;
617 }
else if (opt ==
"usehash") {
618 opts.get_graph_args().
get_option(
"usehash", usehash);
620 logstream(
LOG_EMPH) <<
"Graph Option: usehash = "
621 << usehash << std::endl;
622 }
else if (opt ==
"userecent") {
623 opts.get_graph_args().
get_option(
"userecent", userecent);
625 logstream(
LOG_EMPH) <<
"Graph Option: userecent = "
626 << userecent << std::endl;
627 }
else if (opt ==
"parallel_ingress") {
628 opts.get_graph_args().
get_option(
"parallel_ingress", parallel_ingress);
629 if (!parallel_ingress && rpc.
procid() == 0)
630 logstream(
LOG_EMPH) <<
"Disable parallel ingress. Graph will be streamed through one node."
633 logstream(
LOG_ERROR) <<
"Unexpected Graph Option: " << opt << std::endl;
636 set_ingress_method(ingress_method, bufsize, usehash, userecent);
654 if (finalized)
return;
655 ASSERT_NE(ingress_ptr, NULL);
656 logstream(
LOG_INFO) <<
"Distributed graph: enter finalize" << std::endl;
657 ingress_ptr->finalize();
658 rpc.
barrier();
delete ingress_ptr; ingress_ptr = NULL;
688 __attribute__((noreturn)) {
690 logstream(
LOG_WARNING) <<
"in_edges not implemented. " << std::endl;
695 edge_list_type out_edges(
const vertex_id_type vid)
const
696 __attribute__((noreturn)) {
698 logstream(
LOG_WARNING) <<
"in_edges not implemented. " << std::endl;
715 return get_vertex_record(vid).num_in_edges;
729 return get_vertex_record(vid).num_out_edges;
748 const VertexData& vdata = VertexData() ) {
751 <<
"\n\tAttempting to add a vertex to a finalized graph."
752 <<
"\n\tVertices cannot be added to a graph after finalization."
755 if(vid == vertex_id_type(-1)) {
757 <<
"\n\tAdding a vertex with id -1 is not allowed."
758 <<
"\n\tThe -1 vertex id is reserved for internal use."
761 ASSERT_NE(ingress_ptr, NULL);
762 ingress_ptr->add_vertex(vid, vdata);
779 void add_edge(vertex_id_type source, vertex_id_type target,
780 const EdgeData& edata = EdgeData()) {
783 <<
"\n\tAttempting to add an edge to a finalized graph."
784 <<
"\n\tEdges cannot be added to a graph after finalization."
787 if(source == vertex_id_type(-1)) {
789 <<
"\n\tThe source vertex with id vertex_id_type(-1)\n"
790 <<
"\tor unsigned value " << vertex_id_type(-1) <<
" in edge \n"
791 <<
"\t(" << source <<
"->" << target <<
") is not allowed.\n"
792 <<
"\tThe -1 vertex id is reserved for internal use."
795 if(target == vertex_id_type(-1)) {
797 <<
"\n\tThe target vertex with id vertex_id_type(-1)\n"
798 <<
"\tor unsigned value " << vertex_id_type(-1) <<
" in edge \n"
799 <<
"\t(" << source <<
"->" << target <<
") is not allowed.\n"
800 <<
"\tThe -1 vertex id is reserved for internal use."
803 if(source == target) {
805 <<
"\n\tTrying to add self edge (" << source <<
"->" << target <<
")."
806 <<
"\n\tSelf edges are not allowed."
809 ASSERT_NE(ingress_ptr, NULL);
811 ingress_ptr->add_edge(source, target, edata);
876 template <
typename ReductionType,
typename MapFunctionType>
883 <<
"\n\tAttempting to run graph.map_reduce_vertices(...) "
884 <<
"\n\tbefore calling graph.finalize()."
889 bool global_result_set =
false;
890 ReductionType global_result = ReductionType();
895 bool result_set =
false;
896 ReductionType result = ReductionType();
900 for (
int i = 0; i < (int)local_graph.num_vertices(); ++i) {
901 if (lvid2record[i].owner == rpc.
procid() &&
902 vset.l_contains((lvid_type)i)) {
905 result = mapfunction(vtx);
908 else if (result_set){
910 const ReductionType tmp = mapfunction(vtx);
920 if (!global_result_set) {
921 global_result = result;
922 global_result_set =
true;
925 global_result += result;
930 conditional_addition_wrapper<ReductionType>
931 wrapper(global_result, global_result_set);
933 return wrapper.value;
1003 template <
typename ReductionType,
typename MapFunctionType>
1011 <<
"\n\tAttempting to run graph.map_reduce_vertices(...)"
1012 <<
"\n\tbefore calling graph.finalize()."
1017 bool global_result_set =
false;
1018 ReductionType global_result = ReductionType();
1020 #pragma omp parallel
1023 bool result_set =
false;
1024 ReductionType result = ReductionType();
1028 for (
int i = 0; i < (int)local_graph.num_vertices(); ++i) {
1029 if (vset.l_contains((lvid_type)i)) {
1031 foreach(
const local_edge_type& e, l_vertex(i).in_edges()) {
1034 result = mapfunction(edge);
1037 else if (result_set){
1039 const ReductionType tmp = mapfunction(edge);
1045 foreach(
const local_edge_type& e, l_vertex(i).out_edges()) {
1048 result = mapfunction(edge);
1051 else if (result_set){
1053 const ReductionType tmp = mapfunction(edge);
1061 #pragma omp critical
1065 if (!global_result_set) {
1066 global_result = result;
1067 global_result_set =
true;
1070 global_result += result;
1076 conditional_addition_wrapper<ReductionType>
1077 wrapper(global_result, global_result_set);
1079 return wrapper.value;
1133 template <
typename TransformType>
1138 <<
"\n\tAttempting to call graph.transform_vertices(...)"
1139 <<
"\n\tbefore finalizing the graph."
1145 #pragma omp parallel for
1147 for (
int i = 0; i < (int)local_graph.num_vertices(); ++i) {
1148 if (lvid2record[i].owner == rpc.
procid() &&
1149 vset.l_contains((lvid_type)i)) {
1151 transform_functor(vtx);
1215 template <
typename TransformType>
1221 <<
"\n\tAttempting to call graph.transform_edges(...)"
1222 <<
"\n\tbefore finalizing the graph."
1227 #pragma omp parallel for
1229 for (
int i = 0; i < (int)local_graph.num_vertices(); ++i) {
1230 if (vset.l_contains((lvid_type)i)) {
1232 foreach(
const local_edge_type& e, l_vertex(i).in_edges()) {
1234 transform_functor(edge);
1238 foreach(
const local_edge_type& e, l_vertex(i).out_edges()) {
1240 transform_functor(edge);
1261 template <
typename VertexFunctorType>
1262 void parallel_for_vertices(std::vector<VertexFunctorType>& accfunction) {
1263 ASSERT_TRUE(finalized);
1265 int numaccfunctions = (int)accfunction.size();
1266 ASSERT_GE(numaccfunctions, 1);
1268 #pragma omp parallel for
1270 for (
int i = 0; i < (int)accfunction.size(); ++i) {
1271 for (
int j = i;j < (int)local_graph.num_vertices(); j+=numaccfunctions) {
1272 if (lvid2record[j].owner == rpc.
procid()) {
1273 accfunction[i](vertex_type(l_vertex(j)));
1291 template <
typename EdgeFunctorType>
1292 void parallel_for_edges(std::vector<EdgeFunctorType>& accfunction) {
1293 ASSERT_TRUE(finalized);
1295 int numaccfunctions = (int)accfunction.size();
1296 ASSERT_GE(numaccfunctions, 1);
1298 #pragma omp parallel for
1300 for (
int i = 0; i < (int)accfunction.size(); ++i) {
1301 for (
int j = i;j < (int)local_graph.num_vertices(); j+=numaccfunctions) {
1302 foreach(
const local_edge_type& e, l_vertex(j).in_edges()) {
1303 accfunction[i](edge_type(e));
1313 void load(iarchive& arc) {
1330 void save(oarchive& arc)
const {
1333 <<
"\n\tAttempting to save a graph before calling graph.finalize()."
1351 foreach (vertex_record& vrec, lvid2record)
1353 lvid2record.clear();
1381 std::string fname = prefix +
tostr(rpc.
procid()) +
".bin";
1383 logstream(
LOG_INFO) <<
"Load graph from " << fname << std::endl;
1384 if(boost::starts_with(fname,
"hdfs://")) {
1385 graphlab::hdfs hdfs;
1386 graphlab::hdfs::fstream in_file(hdfs, fname);
1387 boost::iostreams::filtering_stream<boost::iostreams::input> fin;
1388 fin.push(boost::iostreams::gzip_decompressor());
1392 logstream(
LOG_FATAL) <<
"\n\tError opening file: " << fname << std::endl;
1401 std::ifstream in_file(fname.c_str(),
1402 std::ios_base::in | std::ios_base::binary);
1403 boost::iostreams::filtering_stream<boost::iostreams::input> fin;
1404 fin.push(boost::iostreams::gzip_decompressor());
1412 logstream(
LOG_INFO) <<
"Finish loading graph from " << fname << std::endl;
1440 std::string fname = prefix +
tostr(rpc.
procid()) +
".bin";
1441 logstream(
LOG_INFO) <<
"Save graph to " << fname << std::endl;
1442 if(boost::starts_with(fname,
"hdfs://")) {
1443 graphlab::hdfs hdfs;
1444 graphlab::hdfs::fstream out_file(hdfs, fname,
true);
1445 boost::iostreams::filtering_stream<boost::iostreams::output> fout;
1446 fout.push(boost::iostreams::gzip_compressor());
1447 fout.push(out_file);
1449 logstream(
LOG_FATAL) <<
"\n\tError opening file: " << fname << std::endl;
1458 std::ofstream out_file(fname.c_str(),
1459 std::ios_base::out | std::ios_base::binary);
1460 if (!out_file.good()) {
1461 logstream(
LOG_FATAL) <<
"\n\tError opening file: " << fname << std::endl;
1464 boost::iostreams::filtering_stream<boost::iostreams::output> fout;
1465 fout.push(boost::iostreams::gzip_compressor());
1466 fout.push(out_file);
1473 logstream(
LOG_INFO) <<
"Finish saving graph to " << fname << std::endl
1474 <<
"Finished saving binary graph: "
1485 template<
typename Writer>
1488 bool save_vertex =
true,
1489 bool save_edge =
true,
1490 size_t files_per_machine = 4) {
1491 typedef boost::function<void(vertex_type)> vertex_function_type;
1492 typedef boost::function<void(edge_type)> edge_function_type;
1493 typedef std::ofstream base_fstream_type;
1494 typedef boost::iostreams::filtering_stream<boost::iostreams::output>
1499 std::vector<std::string> graph_files;
1500 std::vector<base_fstream_type*> outstreams;
1501 std::vector<boost_fstream_type*> booststreams;
1502 graph_files.resize(files_per_machine);
1503 for(
size_t i = 0; i < files_per_machine; ++i) {
1504 graph_files[i] = prefix +
"_" +
tostr(1 + i + rpc.
procid() * files_per_machine)
1506 if (gzip) graph_files[i] +=
".gz";
1510 std::vector<vertex_function_type> vertex_callbacks(graph_files.size());
1511 std::vector<edge_function_type> edge_callbacks(graph_files.size());
1513 for(
size_t i = 0; i < graph_files.size(); ++i) {
1514 logstream(
LOG_INFO) <<
"Saving to file: " << graph_files[i] << std::endl;
1516 base_fstream_type* out_file =
1517 new base_fstream_type(graph_files[i].c_str(),
1518 std::ios_base::out | std::ios_base::binary);
1520 boost_fstream_type* fout =
new boost_fstream_type;
1522 if (gzip) fout->push(boost::iostreams::gzip_compressor());
1523 fout->push(*out_file);
1525 outstreams.push_back(out_file);
1526 booststreams.push_back(fout);
1529 vertex_callbacks[i] =
1530 boost::bind(&graph_type::template save_vertex_to_stream<boost_fstream_type, Writer>,
1531 this, _1, boost::ref(*fout), boost::ref(writer));
1533 boost::bind(&graph_type::template save_edge_to_stream<boost_fstream_type, Writer>,
1534 this, _1, boost::ref(*fout), boost::ref(writer));
1537 if (save_vertex) parallel_for_vertices(vertex_callbacks);
1538 if (save_edge) parallel_for_edges(edge_callbacks);
1541 for(
size_t i = 0; i < graph_files.size(); ++i) {
1542 booststreams[i]->pop();
1543 if (gzip) booststreams[i]->pop();
1544 delete booststreams[i];
1545 delete outstreams[i];
1547 vertex_callbacks.clear();
1548 edge_callbacks.clear();
1550 booststreams.clear();
1561 template<
typename Writer>
1564 bool save_vertex =
true,
1565 bool save_edge =
true,
1566 size_t files_per_machine = 4) {
1567 typedef boost::function<void(vertex_type)> vertex_function_type;
1568 typedef boost::function<void(edge_type)> edge_function_type;
1569 typedef graphlab::hdfs::fstream base_fstream_type;
1570 typedef boost::iostreams::filtering_stream<boost::iostreams::output>
1575 std::vector<std::string> graph_files;
1576 std::vector<base_fstream_type*> outstreams;
1577 std::vector<boost_fstream_type*> booststreams;
1578 graph_files.resize(files_per_machine);
1579 for(
size_t i = 0; i < files_per_machine; ++i) {
1580 graph_files[i] = prefix +
"_" +
tostr(1 + i + rpc.
procid() * files_per_machine)
1582 if (gzip) graph_files[i] +=
".gz";
1585 if(!hdfs::has_hadoop()) {
1587 <<
"\n\tAttempting to save a graph to HDFS but GraphLab"
1588 <<
"\n\twas built without HDFS."
1591 hdfs& hdfs = hdfs::get_hdfs();
1595 std::vector<vertex_function_type> vertex_callbacks(graph_files.size());
1596 std::vector<edge_function_type> edge_callbacks(graph_files.size());
1598 for(
size_t i = 0; i < graph_files.size(); ++i) {
1599 logstream(
LOG_INFO) <<
"Saving to file: " << graph_files[i] << std::endl;
1601 base_fstream_type* out_file =
new base_fstream_type(hdfs,
1605 boost_fstream_type* fout =
new boost_fstream_type;
1607 if (gzip) fout->push(boost::iostreams::gzip_compressor());
1608 fout->push(*out_file);
1610 outstreams.push_back(out_file);
1611 booststreams.push_back(fout);
1614 vertex_callbacks[i] =
1615 boost::bind(&graph_type::template save_vertex_to_stream<boost_fstream_type, Writer>,
1616 this, _1, boost::ref(*fout), writer);
1618 boost::bind(&graph_type::template save_edge_to_stream<boost_fstream_type, Writer>,
1619 this, _1, boost::ref(*fout), writer);
1622 if (save_vertex) parallel_for_vertices(vertex_callbacks);
1623 if (save_edge) parallel_for_edges(edge_callbacks);
1626 for(
size_t i = 0; i < graph_files.size(); ++i) {
1627 booststreams[i]->pop();
1628 if (gzip) booststreams[i]->pop();
1629 delete booststreams[i];
1630 delete outstreams[i];
1632 vertex_callbacks.clear();
1633 edge_callbacks.clear();
1635 booststreams.clear();
1739 template<
typename Writer>
1740 void save(
const std::string& prefix, Writer writer,
1741 bool gzip =
true,
bool save_vertex =
true,
bool save_edge =
true,
1742 size_t files_per_machine = 4) {
1743 if(boost::starts_with(prefix,
"hdfs://")) {
1744 save_to_hdfs(prefix, writer, gzip, save_vertex, save_edge, files_per_machine);
1746 save_to_posixfs(prefix, writer, gzip, save_vertex, save_edge, files_per_machine);
1791 void save_format(
const std::string& prefix,
const std::string& format,
1792 bool gzip =
true,
size_t files_per_machine = 4) {
1793 if (format ==
"snap" || format ==
"tsv") {
1794 save(prefix, builtin_parsers::tsv_writer<distributed_graph>(),
1795 gzip,
false,
true, files_per_machine);
1796 }
else if (format ==
"graphjrl") {
1797 save(prefix, builtin_parsers::graphjrl_writer<distributed_graph>(),
1798 gzip,
true,
true, files_per_machine);
1799 }
else if (format ==
"bin") {
1801 }
else if (format ==
"bintsv4") {
1802 save_direct(prefix, gzip, &graph_type::save_bintsv4_to_stream);
1805 <<
"Unrecognized Format \"" << format <<
"\"!" << std::endl;
1821 std::string directory_name; std::string original_path(prefix);
1822 boost::filesystem::path path(prefix);
1823 std::string search_prefix;
1824 if (boost::filesystem::is_directory(path)) {
1829 directory_name = path.native();
1832 directory_name = path.parent_path().native();
1833 search_prefix = path.filename().native();
1834 directory_name = (directory_name.empty() ?
"." : directory_name);
1836 std::vector<std::string> graph_files;
1837 fs_util::list_files_with_prefix(directory_name, search_prefix, graph_files);
1838 if (graph_files.size() == 0) {
1839 logstream(
LOG_WARNING) <<
"No files found matching " << original_path << std::endl;
1841 for(
size_t i = 0; i < graph_files.size(); ++i) {
1843 || (!parallel_ingress && (rpc.
procid() == 0))) {
1844 logstream(
LOG_EMPH) <<
"Loading graph from file: " << graph_files[i] << std::endl;
1846 const bool gzip = boost::ends_with(graph_files[i],
".gz");
1848 std::ifstream in_file(graph_files[i].c_str(),
1849 std::ios_base::in | std::ios_base::binary);
1851 boost::iostreams::filtering_stream<boost::iostreams::input> fin;
1853 if (gzip) fin.push(boost::iostreams::gzip_decompressor());
1855 const bool success = load_from_stream(graph_files[i], fin, line_parser);
1858 <<
"\n\tError parsing file: " << graph_files[i] << std::endl;
1861 if (gzip) fin.pop();
1877 std::string path = prefix;
1878 if (path.length() > 0 && path[path.length() - 1] !=
'/') path = path +
"/";
1879 if(!hdfs::has_hadoop()) {
1881 <<
"\n\tAttempting to load a graph from HDFS but GraphLab"
1882 <<
"\n\twas built without HDFS."
1885 hdfs& hdfs = hdfs::get_hdfs();
1886 std::vector<std::string> graph_files;
1887 graph_files = hdfs.list_files(path);
1888 if (graph_files.size() == 0) {
1889 logstream(
LOG_WARNING) <<
"No files found matching " << prefix << std::endl;
1891 for(
size_t i = 0; i < graph_files.size(); ++i) {
1892 if ((parallel_ingress && (i % rpc.
numprocs() == rpc.
procid())) ||
1893 (!parallel_ingress && (rpc.
procid() == 0))) {
1894 logstream(
LOG_EMPH) <<
"Loading graph from file: " << graph_files[i] << std::endl;
1896 const bool gzip = boost::ends_with(graph_files[i],
".gz");
1898 graphlab::hdfs::fstream in_file(hdfs, graph_files[i]);
1899 boost::iostreams::filtering_stream<boost::iostreams::input> fin;
1900 if(gzip) fin.push(boost::iostreams::gzip_decompressor());
1902 const bool success = load_from_stream(graph_files[i], fin, line_parser);
1905 <<
"\n\tError parsing file: " << graph_files[i] << std::endl;
1908 if (gzip) fin.pop();
1986 if (prefix.length() == 0)
return;
1987 if(boost::starts_with(prefix,
"hdfs://")) {
2023 double alpha = 2.1,
size_t truncate = (
size_t)(-1)) {
2025 std::vector<double> prob(std::min(nverts, truncate), 0);
2026 logstream(
LOG_INFO) <<
"constructing pdf" << std::endl;
2027 for(
size_t i = 0; i < prob.size(); ++i)
2028 prob[i] = std::pow(
double(i+1), -alpha);
2029 logstream(
LOG_INFO) <<
"constructing cdf" << std::endl;
2031 logstream(
LOG_INFO) <<
"Building graph" << std::endl;
2032 size_t target_index = rpc.
procid();
2033 size_t addedvtx = 0;
2036 const size_t HASH_OFFSET = 2654435761;
2037 for(
size_t source = rpc.
procid(); source < nverts;
2040 for(
size_t i = 0; i < out_degree; ++i) {
2041 target_index = (target_index + HASH_OFFSET) % nverts;
2042 while (source == target_index) {
2043 target_index = (target_index + HASH_OFFSET) % nverts;
2045 if(in_degree)
add_edge(target_index, source);
2046 else add_edge(source, target_index);
2049 if (addedvtx % 10000000 == 0) {
2050 logstream(
LOG_EMPH) << addedvtx <<
" inserted\n";
2063 void load_format(
const std::string& path,
const std::string& format) {
2065 if (format ==
"snap") {
2066 line_parser = builtin_parsers::snap_parser<distributed_graph>;
2067 load(path, line_parser);
2068 }
else if (format ==
"adj") {
2069 line_parser = builtin_parsers::adj_parser<distributed_graph>;
2070 load(path, line_parser);
2071 }
else if (format ==
"tsv") {
2072 line_parser = builtin_parsers::tsv_parser<distributed_graph>;
2073 load(path, line_parser);
2074 }
else if (format ==
"graphjrl") {
2075 line_parser = builtin_parsers::graphjrl_parser<distributed_graph>;
2076 load(path, line_parser);
2077 }
else if (format ==
"bintsv4") {
2078 load_direct(path,&graph_type::load_bintsv4_from_stream);
2079 }
else if (format ==
"bin") {
2083 <<
"Unrecognized Format \"" << format <<
"\"!" << std::endl;
2105 typedef typename json_parser_type::edge_parser_type edge_parser_type;
2106 typedef typename json_parser_type::vertex_parser_type vertex_parser_type;
2107 void load_json (
const std::string& prefix,
bool gzip=
false,
2108 edge_parser_type edge_parser = builtin_parsers::empty_edge_parser<EdgeData>,
2109 vertex_parser_type vertex_parser = builtin_parsers::empty_vertex_parser<VertexData>
2112 json_parser<VertexData, EdgeData> jsonparser(*
this, prefix, gzip, edge_parser, vertex_parser);
2144 ret.make_explicit(*
this);
2146 foreach(
size_t lvid, cur.get_lvid_bitset(*
this)) {
2148 foreach(local_edge_type e, l_vertex(lvid).in_edges()) {
2149 ret.set_lvid_unsync(e.source().id());
2153 foreach(local_edge_type e, l_vertex(lvid).out_edges()) {
2154 ret.set_lvid_unsync(e.target().id());
2158 ret.synchronize_mirrors_to_master_or(*
this, vset_exchange);
2159 ret.synchronize_master_to_mirrors(*
this, vset_exchange);
2200 template <
typename FunctionType>
2205 ret.make_explicit(*
this);
2209 for (
int i = 0; i < (int)local_graph.num_vertices(); ++i) {
2210 if (lvid2record[i].owner == rpc.
procid() &&
2211 vset.l_contains((lvid_type)i)) {
2213 if (select_functor(vtx)) ret.set_lvid(i);
2216 ret.synchronize_master_to_mirrors(*
this, vset_exchange);
2234 for (
int i = 0; i < (int)local_graph.num_vertices(); ++i) {
2235 count += (lvid2record[i].owner == rpc.
procid() &&
2236 vset.l_contains((lvid_type)i));
2250 if (vset.lazy)
return !vset.is_complete_set;
2252 size_t count = vset.get_lvid_bitset(*this).empty();
2270 struct vertex_record {
2279 mirror_type _mirrors;
2282 vertex_record(
const vertex_id_type& vid) :
2284 procid_t get_owner ()
const {
return owner; }
2285 const mirror_type& mirrors()
const {
return _mirrors; }
2286 size_t num_mirrors()
const {
return _mirrors.popcount(); }
2292 void load(iarchive& arc) {
2301 void save(oarchive& arc)
const {
2316 local_vertex_type l_vertex(lvid_type vid) {
2317 return local_vertex_type(*
this, vid);
2322 size_t num_replicas()
const {
return nreplicas; }
2326 size_t num_local_vertices()
const {
return local_graph.num_vertices(); }
2330 size_t num_local_edges()
const {
return local_graph.num_edges(); }
2334 size_t num_local_own_vertices()
const {
return local_own_nverts; }
2338 lvid_type local_vid (
const vertex_id_type vid)
const {
2341 typename cuckoo_map_type::const_iterator iter = vid2lvid.find(vid);
2342 return iter->second;
2347 vertex_id_type global_vid(
const lvid_type lvid)
const {
2348 ASSERT_LT(lvid, lvid2record.size());
2349 return lvid2record[lvid].gvid;
2361 local_edge_list_type l_in_edges(
const lvid_type lvid) {
2362 return local_edge_list_type(*
this, local_graph.in_edges(lvid));
2372 size_t l_num_in_edges(
const lvid_type lvid)
const {
2373 return local_graph.num_in_edges(lvid);
2383 local_edge_list_type l_out_edges(
const lvid_type lvid) {
2384 return local_edge_list_type(*
this, local_graph.out_edges(lvid));
2394 size_t l_num_out_edges(
const lvid_type lvid)
const {
2395 return local_graph.num_out_edges(lvid);
2407 distributed_control& dc() {
2416 const vertex_record& get_vertex_record(vertex_id_type vid)
const {
2419 typename cuckoo_map_type::const_iterator iter = vid2lvid.find(vid);
2420 ASSERT_TRUE(iter != vid2lvid.end());
2421 return lvid2record[iter->second];
2427 vertex_record& l_get_vertex_record(lvid_type lvid) {
2428 ASSERT_LT(lvid, lvid2record.size());
2429 return lvid2record[lvid];
2435 const vertex_record& l_get_vertex_record(lvid_type lvid)
const {
2436 ASSERT_LT(lvid, lvid2record.size());
2437 return lvid2record[lvid];
2444 bool is_master(vertex_id_type vid)
const {
2445 typename cuckoo_map_type::const_iterator iter = vid2lvid.find(vid);
2446 return (iter != vid2lvid.end()) && l_is_master(iter->second);
2452 bool l_is_master(lvid_type lvid)
const {
2453 ASSERT_LT(lvid, lvid2record.size());
2454 return lvid2record[lvid].owner == rpc.
procid();
2460 procid_t l_master(lvid_type lvid)
const {
2461 ASSERT_LT(lvid, lvid2record.size());
2462 return lvid2record[lvid].owner;
2487 void synchronize() {
2488 typedef std::pair<vertex_id_type, vertex_data_type> pair_type;
2489 typename buffered_exchange<pair_type>::buffer_type recv_buffer;
2492 for(lvid_type lvid = 0; lvid < lvid2record.size(); ++lvid) {
2493 const vertex_record& record = lvid2record[lvid];
2496 if(record.owner == rpc.
procid()) {
2497 foreach(
size_t proc, record.mirrors()) {
2498 const pair_type pair(record.gvid, local_graph.vertex_data(lvid));
2499 vertex_exchange.send(proc, pair);
2503 while(vertex_exchange.recv(sending_proc, recv_buffer)) {
2504 foreach(
const pair_type& pair, recv_buffer) {
2507 recv_buffer.clear();
2510 vertex_exchange.flush();
2511 while(vertex_exchange.recv(sending_proc, recv_buffer)) {
2512 foreach(
const pair_type& pair, recv_buffer) {
2515 recv_buffer.clear();
2517 ASSERT_TRUE(vertex_exchange.empty());
2528 struct local_vertex_type {
2533 graph_ref(graph_ref), lvid(lvid) { }
2536 explicit local_vertex_type(vertex_type v) :graph_ref(v.graph_ref),lvid(v.lvid) { }
2538 operator vertex_type()
const {
2539 return vertex_type(graph_ref, lvid);
2542 bool operator==(local_vertex_type& v)
const {
2543 return lvid == v.lvid;
2547 const vertex_data_type& data()
const {
2548 return graph_ref.get_local_graph().vertex_data(lvid);
2552 vertex_data_type& data() {
2553 return graph_ref.get_local_graph().vertex_data(lvid);
2560 return graph_ref.get_local_graph().num_in_edges(lvid);
2567 return graph_ref.get_local_graph().num_out_edges(lvid);
2577 return graph_ref.global_vid(lvid);
2583 local_edge_list_type in_edges() {
2584 return graph_ref.l_in_edges(lvid);
2590 local_edge_list_type out_edges() {
2591 return graph_ref.l_out_edges(lvid);
2597 return graph_ref.l_get_vertex_record(lvid).owner;
2602 bool owned()
const {
2603 return graph_ref.l_get_vertex_record(lvid).owner == graph_ref.procid();
2609 size_t global_num_in_edges()
const {
2610 return graph_ref.l_get_vertex_record(lvid).num_in_edges;
2617 size_t global_num_out_edges()
const {
2618 return graph_ref.l_get_vertex_record(lvid).num_out_edges;
2624 const mirror_type& mirrors()
const {
2625 return graph_ref.l_get_vertex_record(lvid)._mirrors;
2628 size_t num_mirrors()
const {
2629 return graph_ref.l_get_vertex_record(lvid).num_mirrors();
2635 vertex_record& get_vertex_record() {
2636 return graph_ref.l_get_vertex_record(lvid);
2643 class local_edge_type {
2646 typename local_graph_type::edge_type e;
2649 typename local_graph_type::edge_type e):
2650 graph_ref(graph_ref), e(e) { }
2653 explicit local_edge_type(edge_type ge) :graph_ref(ge.graph_ref),e(ge.e) { }
2656 operator edge_type()
const {
2657 return edge_type(graph_ref, e);
2661 local_vertex_type source() {
return local_vertex_type(graph_ref, e.source().id()); }
2664 local_vertex_type target() {
return local_vertex_type(graph_ref, e.target().id()); }
2669 const edge_data_type& data()
const {
return e.data(); }
2672 edge_data_type& data() {
return e.data(); }
2682 struct make_local_edge_type_functor {
2683 typedef typename local_graph_type::edge_type argument_type;
2684 typedef local_edge_type result_type;
2687 graph_ref(graph_ref) { }
2688 result_type operator() (
const argument_type et)
const {
2689 return local_edge_type(graph_ref, et);
2697 struct local_edge_list_type {
2698 make_local_edge_type_functor me_functor;
2699 typename local_graph_type::edge_list_type elist;
2701 typedef boost::transform_iterator<make_local_edge_type_functor,
2702 typename local_graph_type::edge_list_type::iterator> iterator;
2703 typedef iterator const_iterator;
2706 typename local_graph_type::edge_list_type elist) :
2707 me_functor(graph_ref), elist(elist) { }
2709 size_t size()
const {
return elist.size(); }
2712 local_edge_type operator[](
size_t i)
const {
return me_functor(elist[i]); }
2728 iterator begin()
const {
return
2729 boost::make_transform_iterator(elist.begin(), me_functor); }
2745 iterator end()
const {
return
2746 boost::make_transform_iterator(elist.end(), me_functor); }
2749 bool empty()
const {
return elist.empty(); }
2757 mutable dc_dist_object<distributed_graph> rpc;
2765 std::vector<vertex_record> lvid2record;
2769 typedef cuckoo_map_pow2<vertex_id_type, lvid_type, 3, uint32_t> cuckoo_map_type;
2770 typedef cuckoo_map_type vid2lvid_map_type;
2772 cuckoo_map_type vid2lvid;
2776 size_t nverts, nedges;
2779 size_t local_own_nverts;
2788 idistributed_ingress<VertexData, EdgeData>* ingress_ptr;
2791 buffered_exchange<std::pair<vertex_id_type, vertex_data_type> > vertex_exchange;
2794 buffered_exchange<vertex_id_type> vset_exchange;
2797 bool parallel_ingress;
2799 void set_ingress_method(
const std::string& method,
2800 size_t bufsize = 50000,
bool usehash =
false,
bool userecent =
false) {
2801 if(ingress_ptr != NULL) {
delete ingress_ptr; ingress_ptr = NULL; }
2802 if (method ==
"batch") {
2803 logstream(
LOG_EMPH) <<
"Use batch ingress, bufsize: " << bufsize
2804 <<
", usehash: " << usehash <<
", userecent" << userecent << std::endl;
2805 ingress_ptr =
new distributed_batch_ingress<VertexData, EdgeData>(rpc.
dc(), *
this,
2806 bufsize, usehash, userecent);
2807 }
else if (method ==
"oblivious") {
2808 logstream(
LOG_EMPH) <<
"Use oblivious ingress, usehash: " << usehash
2809 <<
", userecent: " << userecent << std::endl;
2810 ingress_ptr =
new distributed_oblivious_ingress<VertexData, EdgeData>(rpc.
dc(), *
this,
2811 usehash, userecent);
2812 }
else if (method ==
"identity") {
2813 logstream(
LOG_EMPH) <<
"Use identity ingress" << std::endl;
2814 ingress_ptr =
new distributed_identity_ingress<VertexData, EdgeData>(rpc.
dc(), *
this);
2815 }
else if (method ==
"grid") {
2816 logstream(
LOG_EMPH) <<
"Use random grid ingress" << std::endl;
2817 ingress_ptr =
new distributed_constrained_random_ingress<VertexData, EdgeData>(rpc.
dc(), *
this,
"grid");
2818 }
else if (method ==
"pds") {
2819 logstream(
LOG_EMPH) <<
"Use random pds ingress" << std::endl;
2820 ingress_ptr =
new distributed_constrained_random_ingress<VertexData, EdgeData>(rpc.
dc(), *
this,
"pds");
2822 logstream(
LOG_EMPH) <<
"Use random ingress" << std::endl;
2823 ingress_ptr =
new distributed_random_ingress<VertexData, EdgeData>(rpc.
dc(), *
this);
2832 template<
typename Fstream>
2833 bool load_from_stream(std::string filename, Fstream& fin,
2835 size_t linecount = 0;
2836 timer ti; ti.start();
2837 while(fin.good() && !fin.eof()) {
2839 std::getline(fin, line);
2840 if(line.empty())
continue;
2841 if(fin.fail())
break;
2842 const bool success = line_parser(*
this, filename, line);
2845 <<
"Error parsing line " << linecount <<
" in "
2846 << filename <<
": " << std::endl
2847 <<
"\t\"" << line <<
"\"" << std::endl;
2851 if (ti.current_time() > 5.0) {
2852 logstream(
LOG_INFO) << linecount <<
" Lines read" << std::endl;
2860 template<
typename Fstream,
typename Writer>
2861 void save_vertex_to_stream(vertex_type&
vertex, Fstream& fout, Writer writer) {
2862 fout << writer.save_vertex(vertex);
2866 template<
typename Fstream,
typename Writer>
2867 void save_edge_to_stream(edge_type& edge, Fstream& fout, Writer writer) {
2868 std::string ret = writer.save_edge(edge);
2873 void save_bintsv4_to_stream(std::ostream& out) {
2874 for (
int i = 0; i < (int)local_graph.num_vertices(); ++i) {
2875 uint32_t src = l_vertex(i).global_id();
2876 foreach(local_edge_type e, l_vertex(i).out_edges()) {
2877 uint32_t dest = e.target().global_id();
2878 out.write(reinterpret_cast<char*>(&src), 4);
2879 out.write(reinterpret_cast<char*>(&dest), 4);
2881 if (l_vertex(i).owner() == rpc.
procid()) {
2882 vertex_type gv = vertex_type(l_vertex(i));
2884 if (gv.num_in_edges() == 0 && gv.num_out_edges() == 0) {
2885 out.write(reinterpret_cast<char*>(&src), 4);
2886 uint32_t dest = (uint32_t)(-1);
2887 out.write(reinterpret_cast<char*>(&dest), 4);
2893 bool load_bintsv4_from_stream(std::istream& in) {
2896 in.read(reinterpret_cast<char*>(&src), 4);
2897 in.read(reinterpret_cast<char*>(&dest), 4);
2898 if (in.fail())
break;
2899 if (dest == (uint32_t)(-1)) {
2920 void save_direct(
const std::string& prefix,
bool gzip,
2921 boost::function<
void (
graph_type*, std::ostream&)> saver) {
2924 timer savetime; savetime.start();
2925 std::string fname = prefix +
"_" +
tostr(rpc.
procid() + 1) +
"_of_" +
2927 if (gzip) fname = fname +
".gz";
2928 logstream(
LOG_INFO) <<
"Save graph to " << fname << std::endl;
2929 if(boost::starts_with(fname,
"hdfs://")) {
2930 graphlab::hdfs hdfs;
2931 graphlab::hdfs::fstream out_file(hdfs, fname,
true);
2932 boost::iostreams::filtering_stream<boost::iostreams::output> fout;
2933 if (gzip) fout.push(boost::iostreams::gzip_compressor());
2934 fout.push(out_file);
2936 logstream(
LOG_FATAL) <<
"\n\tError opening file: " << fname << std::endl;
2939 saver(
this, boost::ref(fout));
2941 if (gzip) fout.pop();
2944 std::ofstream out_file(fname.c_str(),
2945 std::ios_base::out | std::ios_base::binary);
2946 if (!out_file.good()) {
2947 logstream(
LOG_FATAL) <<
"\n\tError opening file: " << fname << std::endl;
2950 boost::iostreams::filtering_stream<boost::iostreams::output> fout;
2951 if (gzip) fout.push(boost::iostreams::gzip_compressor());
2952 fout.push(out_file);
2953 saver(
this, boost::ref(fout));
2955 if (gzip) fout.pop();
2958 logstream(
LOG_INFO) <<
"Finish saving graph to " << fname << std::endl
2959 <<
"Finished saving bintsv4 graph: "
2960 << savetime.current_time() << std::endl;
2972 void load_direct_from_posixfs(std::string prefix,
2973 boost::function<
bool (
graph_type*, std::istream&)> parser) {
2974 std::string directory_name; std::string original_path(prefix);
2975 boost::filesystem::path path(prefix);
2976 std::string search_prefix;
2977 if (boost::filesystem::is_directory(path)) {
2982 directory_name = path.native();
2985 directory_name = path.parent_path().native();
2986 search_prefix = path.filename().native();
2987 directory_name = (directory_name.empty() ?
"." : directory_name);
2989 std::vector<std::string> graph_files;
2990 fs_util::list_files_with_prefix(directory_name, search_prefix, graph_files);
2991 if (graph_files.size() == 0) {
2992 logstream(
LOG_WARNING) <<
"No files found matching " << original_path << std::endl;
2994 for(
size_t i = 0; i < graph_files.size(); ++i) {
2996 logstream(
LOG_EMPH) <<
"Loading graph from file: " << graph_files[i] << std::endl;
2998 const bool gzip = boost::ends_with(graph_files[i],
".gz");
3000 std::ifstream in_file(graph_files[i].c_str(),
3001 std::ios_base::in | std::ios_base::binary);
3003 boost::iostreams::filtering_stream<boost::iostreams::input> fin;
3005 if (gzip) fin.push(boost::iostreams::gzip_decompressor());
3007 const bool success = parser(
this, boost::ref(fin));
3010 <<
"\n\tError parsing file: " << graph_files[i] << std::endl;
3013 if (gzip) fin.pop();
3025 void load_direct_from_hdfs(std::string prefix,
3026 boost::function<
bool (
graph_type*, std::istream&)> parser) {
3030 std::string path = prefix;
3031 if (path.length() > 0 && path[path.length() - 1] !=
'/') path = path +
"/";
3032 if(!hdfs::has_hadoop()) {
3034 <<
"\n\tAttempting to load a graph from HDFS but GraphLab"
3035 <<
"\n\twas built without HDFS."
3038 hdfs& hdfs = hdfs::get_hdfs();
3039 std::vector<std::string> graph_files;
3040 graph_files = hdfs.list_files(path);
3041 if (graph_files.size() == 0) {
3042 logstream(
LOG_WARNING) <<
"No files found matching " << prefix << std::endl;
3044 for(
size_t i = 0; i < graph_files.size(); ++i) {
3046 logstream(
LOG_EMPH) <<
"Loading graph from file: " << graph_files[i] << std::endl;
3048 const bool gzip = boost::ends_with(graph_files[i],
".gz");
3050 graphlab::hdfs::fstream in_file(hdfs, graph_files[i]);
3051 boost::iostreams::filtering_stream<boost::iostreams::input> fin;
3052 if(gzip) fin.push(boost::iostreams::gzip_decompressor());
3054 const bool success = parser(
this, boost::ref(fin));
3057 <<
"\n\tError parsing file: " << graph_files[i] << std::endl;
3060 if (gzip) fin.pop();
3066 void load_direct(std::string prefix,
3067 boost::function<
bool (
graph_type*, std::istream&)> parser) {
3069 if(boost::starts_with(prefix,
"hdfs://")) {
3070 load_direct_from_hdfs(prefix, parser);
3072 load_direct_from_posixfs(prefix, parser);
3081 #include <graphlab/macros_undef.hpp>