22 #ifndef GRAPHLAB_GRAPH_JSON_PARSER_HPP
23 #define GRAPHLAB_GRAPH_JSON_PARSER_HPP
30 #if defined(__clang) || (defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 6)))
31 #pragma GCC diagnostic push
34 #pragma GCC diagnostic ignored "-Wreorder"
35 #include <libjson/libjson.h>
37 #if defined(__clang) || (defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 6)))
38 #pragma GCC diagnostic pop
41 #include <boost/functional.hpp>
42 #include <graphlab/util/stl_util.hpp>
43 #include <graphlab/util/hdfs.hpp>
45 #include <graphlab/serialization/serialization_includes.hpp>
46 #include <graphlab/graph/distributed_graph.hpp>
47 #include <graphlab/graph/ingress/distributed_identity_ingress.hpp>
51 namespace builtin_parsers {
52 template <
typename EdgeData>
53 bool empty_edge_parser(EdgeData& e,
const std::string& line) {
57 template <
typename VertexData>
58 bool empty_vertex_parser(VertexData& v,
const std::string& line) {
66 template<
typename VertexData,
typename EdgeData>
67 class distributed_graph;
70 template <
typename VertexData,
typename EdgeData>
74 typedef distributed_graph<VertexData, EdgeData>
graph_type;
75 typedef EdgeData edge_data_type;
76 typedef VertexData vertex_data_type;
79 typedef typename graph_type::lvid_type
lvid_type;
81 typedef boost::function<bool(edge_data_type&, const std::string&)> edge_parser_type;
82 typedef boost::function<bool(vertex_data_type&, const std::string&)> vertex_parser_type;
83 typedef boost::function<bool(graph_type&, const std::string&)> line_parser_type;
87 json_parser (
graph_type& graph,
const std::string& prefix,
bool gzip=
false,
88 edge_parser_type edge_parser=builtin_parsers::empty_edge_parser<EdgeData>,
89 vertex_parser_type vertex_parser=builtin_parsers::empty_vertex_parser<VertexData>) :
90 graph(graph), prefix(prefix), gzip(gzip), edge_parser(edge_parser), vertex_parser(vertex_parser) {
95 line_parser_type graph_structure_parser = parse_graph_structure_from_json;
96 line_parser_type vid2lvid_parser = parse_vid2lvid_from_json;
97 line_parser_type edata_parser = boost::bind(parse_edatalist_from_json, _1, _2, edge_parser);
98 line_parser_type vrecord_parser = boost::bind(parse_vrecord_from_json, _1, _2, vertex_parser);
100 bool success = parse_by_line(graphfilename(), graph_structure_parser);
101 success = success & parse_by_line(vid2lvidfilename(), vid2lvid_parser);
102 success = success & parse_by_line(edatafilename(), edata_parser);
103 success = success & parse_by_line(vrecordfilename(), vrecord_parser);
106 logstream(
LOG_FATAL) <<
"Fail parsing graph json" << std::endl;
110 graph.local_graph.finalized =
true;
113 ASSERT_GE(graph.local_graph.num_vertices(), graph.local_graph.gstore.num_vertices);
114 ASSERT_EQ(graph.vid2lvid.size(), graph.local_graph.num_vertices());
115 ASSERT_EQ(graph.lvid2record.size(), graph.local_graph.num_vertices());
119 logstream(
LOG_INFO) <<
"Finished loading graph" << graph.procid()
120 <<
"\n\tnverts: " << graph.num_local_own_vertices()
121 <<
"\n\tnreplicas: " << graph.local_graph.num_vertices()
122 <<
"\n\tnedges: " << graph.local_graph.num_edges()
126 if (graph.ingress_ptr == NULL) {
127 graph.ingress_ptr =
new distributed_identity_ingress<VertexData, EdgeData>(graph.rpc.dc(), graph);
130 graph.ingress_ptr->exchange_global_info();
131 delete graph.ingress_ptr;
134 graph.finalized =
true;
138 bool parse_by_line (
const std::string& srcfilename, line_parser_type line_parser) {
141 if(!boost::ends_with(prefix,
"/"))
142 fname = prefix +
"/" + srcfilename;
144 fname = prefix + srcfilename;
146 logstream(
LOG_INFO) <<
"Load graph json from " << fname << std::endl;
148 boost::iostreams::filtering_stream<boost::iostreams::input> fin;
150 if (boost::starts_with(prefix,
"hdfs://")) {
151 graphlab::hdfs& hdfs = hdfs::get_hdfs();
152 graphlab::hdfs::fstream in_file(hdfs, fname);
153 if (!in_file.good()) {
154 logstream(
LOG_FATAL) <<
"Fail to open file " << fname << std::endl;
158 if (gzip) fin.push(boost::iostreams::gzip_decompressor());
162 logstream(
LOG_FATAL) <<
"Fail to read from stream " << fname << std::endl;
166 load_from_stream(fname, fin, line_parser);
171 std::ifstream in_file(fname.c_str(),
172 std::ios_base::in | std::ios_base::binary);
174 if (!in_file.good()) {
175 logstream(
LOG_FATAL) <<
"Fail to open file " << fname << std::endl;
179 if (gzip) fin.push(boost::iostreams::gzip_decompressor());
183 logstream(
LOG_FATAL) <<
"Fail to read from stream " << fname << std::endl;
187 load_from_stream(fname, fin, line_parser);
200 template<
typename Fstream>
201 bool load_from_stream(std::string filename, Fstream& fin,
202 line_parser_type& line_parser) {
203 size_t linecount = 0;
204 timer ti; ti.start();
205 while(fin.good() && !fin.eof()) {
207 std::getline(fin, line);
208 if(line.empty())
continue;
209 if(fin.fail())
break;
210 const bool success = line_parser(graph, line);
213 <<
"Error parsing line " << linecount <<
" in "
214 << filename <<
": " << std::endl
215 <<
"\t\"" << line <<
"\"" << std::endl;
219 if (ti.current_time() > 5.0) {
220 logstream(
LOG_INFO) << linecount <<
" Lines read" << std::endl;
230 static bool parse_graph_structure_from_json (
graph_type& graph,
const std::string& str) {
231 JSONNode n = libjson::parse(str);
232 JSONNode::const_iterator i = n.begin();
234 local_graph_type& local_graph = graph.get_local_graph();
235 while(i != n.end()) {
236 if (i->name() ==
"numEdges") {
237 local_graph.gstore.num_edges = i->as_int();
238 }
else if (i->name() ==
"numVertices") {
239 local_graph.gstore.num_vertices= i->as_int();
240 }
else if (i->name() ==
"csr") {
244 JSONNode::const_iterator j = csr.begin();
245 while (j != csr.end()) {
246 if (j->name() ==
"rowIndex") {
247 parse_vid_array (local_graph.gstore.CSR_src, *j);
248 }
else if (j->name() ==
"colIndex") {
249 parse_vid_array (local_graph.gstore.CSR_dst, *j);
251 logstream(
LOG_ERROR) <<
"Error parsing json into graph. Unknown json node name:"
252 <<
"CSR:" << j->name() << std::endl;
256 }
else if (i->name() ==
"csc") {
260 JSONNode::const_iterator j = csc.begin();
262 while (j != csc.end()) {
263 if (j->name() ==
"rowIndex") {
264 parse_vid_array (local_graph.gstore.CSC_dst, *j);
265 }
else if (j->name() ==
"colIndex") {
266 parse_vid_array (local_graph.gstore.CSC_src, *j);
268 logstream(
LOG_ERROR) <<
"Error parsing json into graph. Unknown json node name:"
269 <<
"CSC:"<<j->name() << std::endl;
273 }
else if (i->name() ==
"c2rMap") {
274 parse_vid_array (local_graph.gstore.c2r_map, *i);
276 logstream(
LOG_ERROR) <<
"Error parsing json into graph. Unknown json node name:" <<
277 i->name() << std::endl;
283 ASSERT_EQ(local_graph.gstore.num_edges, local_graph.gstore.c2r_map.size());
284 ASSERT_EQ(local_graph.gstore.num_edges, local_graph.gstore.CSR_dst.size());
285 ASSERT_EQ(local_graph.gstore.num_edges, local_graph.gstore.CSC_src.size());
287 graph.lvid2record.reserve(local_graph.gstore.num_vertices);
288 graph.lvid2record.resize(local_graph.gstore.num_vertices);
289 local_graph.reserve(local_graph.gstore.num_vertices);
295 static bool parse_vid2lvid_from_json (
graph_type& graph,
const std::string& str) {
296 JSONNode n = libjson::parse(str);
297 JSONNode::const_iterator i = n.begin();
299 while(i != n.end()) {
300 if (i->name() ==
"vid2lvid") {
301 JSONNode::const_iterator j = i->begin();
304 while(j != i->end()) {
305 graph.vid2lvid[boost::lexical_cast<
vertex_id_type>(j->name())] = (boost::lexical_cast<lvid_type>)(j->as_int());
318 static bool parse_edatalist_from_json (
graph_type& graph,
const std::string& str,
319 edge_parser_type edge_parser) {
320 JSONNode n = libjson::parse(str);
321 JSONNode::const_iterator i = n.begin();
323 local_graph_type& local_graph = graph.get_local_graph();
324 while(i != n.end()) {
325 if (i->name() ==
"edataList") {
327 JSONNode edatanode= *i;
328 JSONNode::const_iterator j = edatanode.begin();
329 std::vector<edge_data_type>& edatalist = local_graph.gstore.edge_data_list;
331 edatalist.reserve(local_graph.gstore.num_edges);
333 while (j != edatanode.end()) {
334 edge_parser(e, j->as_string());
335 edatalist.push_back(e);
349 static bool parse_vrecord_from_json (
graph_type& graph,
const std::string& str,
350 vertex_parser_type vertex_parser) {
353 local_graph_type& local_graph = graph.get_local_graph();
355 JSONNode n = libjson::parse(str);
356 JSONNode::const_iterator i = n.begin();
358 vertex_data_type vdata;
359 typename graph_type::vertex_record vrecord;
361 while (i != n.end()) {
362 if (i->name() ==
"mirrors") {
363 JSONNode::const_iterator j = (*i).begin();
364 while (j != (*i).end()) {
365 int mirror = j->as_int();
366 vrecord._mirrors.set_bit((
procid_t)mirror);
369 }
else if (i->name() ==
"inEdges") {
370 vrecord.num_in_edges = i->as_int();
371 }
else if (i->name() ==
"outEdges") {
372 vrecord.num_out_edges = i->as_int();
373 }
else if (i->name() ==
"gvid") {
376 }
else if (i->name() ==
"owner") {
377 vrecord.owner = (
procid_t)i->as_int();
378 }
else if (i->name() ==
"VertexData") {
379 if (!(i->type() == JSON_NULL))
380 vertex_parser(vdata, i->as_string());
382 logstream(
LOG_ERROR) <<
"Error parsing json into vrecord. Unknown json node name:" <<
383 i->name() << std::endl;
388 if (graph.vid2lvid.find(vrecord.gvid) == graph.vid2lvid.end()) {
391 logstream(
LOG_WARNING) <<
"Singleton node detected: gvid = " << vrecord.gvid <<
". Ignored" << std::endl;
393 lvid_type lvid = graph.vid2lvid[vrecord.gvid];
394 graph.lvid2record[lvid] = vrecord;
395 local_graph.add_vertex(lvid, vdata);
396 if (vrecord.owner == graph.procid()) ++graph.local_own_nverts;
406 static bool parse_vid_array (std::vector<vertex_id_type>& to,
const JSONNode& n) {
407 if (n.type() != JSON_ARRAY)
return false;
409 JSONNode::const_iterator i = n.begin();
410 while (i != n.end()) {
411 to.push_back(i->as_int());
418 std::string zeropadding(
const std::string& s,
int width) {
419 ASSERT_LE(s.length(), width);
420 std::ostringstream ss;
421 ss << std::setw(width) << std::setfill(
'0') << s;
426 const std::string graphfilename() {
428 std::string suffix = gzip ?
".gz" :
"";
429 return "graph/graph"+
tostr(pid)+
"-r-"+zeropadding(
tostr(pid), 5)+suffix;
433 const std::string vid2lvidfilename() {
435 std::string suffix = gzip ?
".gz" :
"";
436 return "graph/vid2lvid"+
tostr(pid)+
"-r-"+zeropadding(
tostr(pid), 5)+suffix;
440 const std::string edatafilename() {
442 std::string suffix = gzip ?
".gz" :
"";
443 return "graph/edata"+
tostr(pid)+
"-r-"+zeropadding(
tostr(pid),5)+suffix;
447 const std::string vrecordfilename() {
449 std::string suffix = gzip ?
".gz" :
"";
450 return "vrecord/vdata"+
tostr(pid)+
"-r-"+zeropadding(
tostr(pid),5)+suffix;
458 edge_parser_type edge_parser;
459 vertex_parser_type vertex_parser;