GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
json_parser.hpp
1  /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 #ifndef GRAPHLAB_GRAPH_JSON_PARSER_HPP
23 #define GRAPHLAB_GRAPH_JSON_PARSER_HPP
24 
25 #include <string>
26 #include <sstream>
27 #include <iostream>
28 #include <vector>
29 
30 #if defined(__clang) || (defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 6)))
31 #pragma GCC diagnostic push
32 #endif
33 
34 #pragma GCC diagnostic ignored "-Wreorder"
35 #include <libjson/libjson.h>
36 
37 #if defined(__clang) || (defined(__GNUC__) && (__GNUC__ > 4 || (__GNUC__ == 4 && __GNUC_MINOR__ >= 6)))
38 #pragma GCC diagnostic pop
39 #endif
40 
41 #include <boost/functional.hpp>
42 #include <graphlab/util/stl_util.hpp>
43 #include <graphlab/util/hdfs.hpp>
45 #include <graphlab/serialization/serialization_includes.hpp>
46 #include <graphlab/graph/distributed_graph.hpp>
47 #include <graphlab/graph/ingress/distributed_identity_ingress.hpp>
48 
49 namespace graphlab {
50 
51  namespace builtin_parsers {
52  template <typename EdgeData>
53  bool empty_edge_parser(EdgeData& e, const std::string& line) {
54  return true;
55  }
56 
57  template <typename VertexData>
58  bool empty_vertex_parser(VertexData& v, const std::string& line) {
59  return true;
60  }
61 
62  }
63 
64 
65 
66 template<typename VertexData, typename EdgeData>
67 class distributed_graph;
68 
69 
70 template <typename VertexData, typename EdgeData>
71 class json_parser {
72 
73  public:
74  typedef distributed_graph<VertexData, EdgeData> graph_type;
75  typedef EdgeData edge_data_type;
76  typedef VertexData vertex_data_type;
77 
78  typedef typename graph_type::vertex_id_type vertex_id_type;
79  typedef typename graph_type::lvid_type lvid_type;
80 
81  typedef boost::function<bool(edge_data_type&, const std::string&)> edge_parser_type;
82  typedef boost::function<bool(vertex_data_type&, const std::string&)> vertex_parser_type;
83  typedef boost::function<bool(graph_type&, const std::string&)> line_parser_type;
84 
85 
86  public:
87  json_parser (graph_type& graph, const std::string& prefix, bool gzip=false,
88  edge_parser_type edge_parser=builtin_parsers::empty_edge_parser<EdgeData>,
89  vertex_parser_type vertex_parser=builtin_parsers::empty_vertex_parser<VertexData>) :
90  graph(graph), prefix(prefix), gzip(gzip), edge_parser(edge_parser), vertex_parser(vertex_parser) {
91  }
92 
93 
94  bool load() {
95  line_parser_type graph_structure_parser = parse_graph_structure_from_json;
96  line_parser_type vid2lvid_parser = parse_vid2lvid_from_json;
97  line_parser_type edata_parser = boost::bind(parse_edatalist_from_json, _1, _2, edge_parser);
98  line_parser_type vrecord_parser = boost::bind(parse_vrecord_from_json, _1, _2, vertex_parser);
99 
100  bool success = parse_by_line(graphfilename(), graph_structure_parser);
101  success = success & parse_by_line(vid2lvidfilename(), vid2lvid_parser);
102  success = success & parse_by_line(edatafilename(), edata_parser);
103  success = success & parse_by_line(vrecordfilename(), vrecord_parser);
104 
105  if (!success) {
106  logstream(LOG_FATAL) << "Fail parsing graph json" << std::endl;
107  return false;
108  }
109 
110  graph.local_graph.finalized = true;
111 
112 
113  ASSERT_GE(graph.local_graph.num_vertices(), graph.local_graph.gstore.num_vertices);
114  ASSERT_EQ(graph.vid2lvid.size(), graph.local_graph.num_vertices());
115  ASSERT_EQ(graph.lvid2record.size(), graph.local_graph.num_vertices());
116 
117 
118 
119  logstream(LOG_INFO) << "Finished loading graph" << graph.procid()
120  << "\n\tnverts: " << graph.num_local_own_vertices()
121  << "\n\tnreplicas: " << graph.local_graph.num_vertices()
122  << "\n\tnedges: " << graph.local_graph.num_edges()
123  << std::endl;
124 
125 
126  if (graph.ingress_ptr == NULL) {
127  graph.ingress_ptr = new distributed_identity_ingress<VertexData, EdgeData>(graph.rpc.dc(), graph);
128  }
129 
130  graph.ingress_ptr->exchange_global_info();
131  delete graph.ingress_ptr;
132 
133 
134  graph.finalized = true;
135  return success;
136  }
137 
138  bool parse_by_line (const std::string& srcfilename, line_parser_type line_parser) {
139  std::string fname;
140  //check for "/" ending in directory"
141  if(!boost::ends_with(prefix,"/"))
142  fname = prefix + "/" + srcfilename;
143  else
144  fname = prefix + srcfilename;
145 
146  logstream(LOG_INFO) << "Load graph json from " << fname << std::endl;
147 
148  boost::iostreams::filtering_stream<boost::iostreams::input> fin;
149  // loading from hdfs
150  if (boost::starts_with(prefix, "hdfs://")) {
151  graphlab::hdfs& hdfs = hdfs::get_hdfs();
152  graphlab::hdfs::fstream in_file(hdfs, fname);
153  if (!in_file.good()) {
154  logstream(LOG_FATAL) << "Fail to open file " << fname << std::endl;
155  return false;
156  }
157 
158  if (gzip) fin.push(boost::iostreams::gzip_decompressor());
159  fin.push(in_file);
160 
161  if (!fin.good()) {
162  logstream(LOG_FATAL) << "Fail to read from stream " << fname << std::endl;
163  return false;
164  }
165 
166  load_from_stream(fname, fin, line_parser);
167 
168  if (gzip) fin.pop();
169  fin.pop();
170  } else { // loading from disk
171  std::ifstream in_file(fname.c_str(),
172  std::ios_base::in | std::ios_base::binary);
173 
174  if (!in_file.good()) {
175  logstream(LOG_FATAL) << "Fail to open file " << fname << std::endl;
176  return false;
177  }
178 
179  if (gzip) fin.push(boost::iostreams::gzip_decompressor());
180  fin.push(in_file);
181 
182  if (!fin.good()) {
183  logstream(LOG_FATAL) << "Fail to read from stream " << fname << std::endl;
184  return false;
185  }
186 
187  load_from_stream(fname, fin, line_parser);
188 
189  if (gzip) fin.pop();
190  fin.pop();
191  }
192 
193 
194  return true;
195  }
196  /**
197  \internal
198  This internal function is used to load a single line from an input stream
199  */
200  template<typename Fstream>
201  bool load_from_stream(std::string filename, Fstream& fin,
202  line_parser_type& line_parser) {
203  size_t linecount = 0;
204  timer ti; ti.start();
205  while(fin.good() && !fin.eof()) {
206  std::string line;
207  std::getline(fin, line);
208  if(line.empty()) continue;
209  if(fin.fail()) break;
210  const bool success = line_parser(graph, line);
211  if (!success) {
212  logstream(LOG_WARNING)
213  << "Error parsing line " << linecount << " in "
214  << filename << ": " << std::endl
215  << "\t\"" << line << "\"" << std::endl;
216  return false;
217  }
218  ++linecount;
219  if (ti.current_time() > 5.0) {
220  logstream(LOG_INFO) << linecount << " Lines read" << std::endl;
221  ti.start();
222  }
223  }
224  return true;
225  } // end of load from stream
226 
227 
228 
229  /* Parse the graph structure from json */
230  static bool parse_graph_structure_from_json (graph_type& graph, const std::string& str) {
231  JSONNode n = libjson::parse(str);
232  JSONNode::const_iterator i = n.begin();
233  typedef typename graph_type::local_graph_type local_graph_type;
234  local_graph_type& local_graph = graph.get_local_graph();
235  while(i != n.end()) {
236  if (i->name() == "numEdges") {
237  local_graph.gstore.num_edges = i->as_int();
238  } else if (i->name() == "numVertices") {
239  local_graph.gstore.num_vertices= i->as_int();
240  } else if (i->name() == "csr") {
241  // parse rowIndex -> graph.local_graph.gstore.csr_source
242  // parse colIndex -> graph.local_graph.gstore.csr_target
243  JSONNode csr = *i;
244  JSONNode::const_iterator j = csr.begin();
245  while (j != csr.end()) {
246  if (j->name() == "rowIndex") {
247  parse_vid_array (local_graph.gstore.CSR_src, *j);
248  } else if (j->name() == "colIndex") {
249  parse_vid_array (local_graph.gstore.CSR_dst, *j);
250  } else {
251  logstream(LOG_ERROR) << "Error parsing json into graph. Unknown json node name:"
252  << "CSR:" << j->name() << std::endl;
253  }
254  ++j;
255  }
256  } else if (i->name() == "csc") {
257  // parse rowIndex -> graph.local_graph.gstore.csc_target
258  // parse colIndex -> graph.local_graph.gstore.csc_source
259  JSONNode csc = *i;
260  JSONNode::const_iterator j = csc.begin();
261 
262  while (j != csc.end()) {
263  if (j->name() == "rowIndex") {
264  parse_vid_array (local_graph.gstore.CSC_dst, *j);
265  } else if (j->name() == "colIndex") {
266  parse_vid_array (local_graph.gstore.CSC_src, *j);
267  } else {
268  logstream(LOG_ERROR) << "Error parsing json into graph. Unknown json node name:"
269  << "CSC:"<<j->name() << std::endl;
270  }
271  ++j;
272  }
273  } else if (i->name() == "c2rMap") {
274  parse_vid_array (local_graph.gstore.c2r_map, *i);
275  } else {
276  logstream(LOG_ERROR) << "Error parsing json into graph. Unknown json node name:" <<
277  i->name() << std::endl;
278  }
279  ++i;
280  } // end while
281 
282 
283  ASSERT_EQ(local_graph.gstore.num_edges, local_graph.gstore.c2r_map.size());
284  ASSERT_EQ(local_graph.gstore.num_edges, local_graph.gstore.CSR_dst.size());
285  ASSERT_EQ(local_graph.gstore.num_edges, local_graph.gstore.CSC_src.size());
286 
287  graph.lvid2record.reserve(local_graph.gstore.num_vertices);
288  graph.lvid2record.resize(local_graph.gstore.num_vertices);
289  local_graph.reserve(local_graph.gstore.num_vertices);
290  // local_graph.finalized = true;
291  return true;
292  }
293 
294  /* Parse the vid2lvid map from json */
295  static bool parse_vid2lvid_from_json (graph_type& graph, const std::string& str) {
296  JSONNode n = libjson::parse(str);
297  JSONNode::const_iterator i = n.begin();
298  typedef typename graph_type::local_graph_type local_graph_type;
299  while(i != n.end()) {
300  if (i->name() == "vid2lvid") {
301  JSONNode::const_iterator j = i->begin();
302  typename graph_type::vid2lvid_map_type & map = graph.vid2lvid;
303  map.clear();
304  while(j != i->end()) {
305  graph.vid2lvid[boost::lexical_cast<vertex_id_type>(j->name())] = (boost::lexical_cast<lvid_type>)(j->as_int());
306  ++j;
307  }
308  } else {
309  // report error
310  return false;
311  }
312  ++i;
313  }
314  return true;
315  }
316 
317  /* Parse the edata list from json */
318  static bool parse_edatalist_from_json (graph_type& graph, const std::string& str,
319  edge_parser_type edge_parser) {
320  JSONNode n = libjson::parse(str);
321  JSONNode::const_iterator i = n.begin();
322  typedef typename graph_type::local_graph_type local_graph_type;
323  local_graph_type& local_graph = graph.get_local_graph();
324  while(i != n.end()) {
325  if (i->name() == "edataList") {
326  // parse edatalist -> graph.local_graph.gstore.edata
327  JSONNode edatanode= *i;
328  JSONNode::const_iterator j = edatanode.begin();
329  std::vector<edge_data_type>& edatalist = local_graph.gstore.edge_data_list;
330  edatalist.clear();
331  edatalist.reserve(local_graph.gstore.num_edges);
332  edge_data_type e;
333  while (j != edatanode.end()) {
334  edge_parser(e, j->as_string());
335  edatalist.push_back(e);
336  ++j;
337  }
338  } else {
339  return false;
340  // report error
341  }
342  ++i;
343  }
344  return true;
345  }
346 
347 
348  /* Parse the vertex record list from json */
349  static bool parse_vrecord_from_json (graph_type& graph, const std::string& str,
350  vertex_parser_type vertex_parser) {
351 
352  typedef typename graph_type::local_graph_type local_graph_type;
353  local_graph_type& local_graph = graph.get_local_graph();
354 
355  JSONNode n = libjson::parse(str);
356  JSONNode::const_iterator i = n.begin();
357 
358  vertex_data_type vdata;
359  typename graph_type::vertex_record vrecord;
360 
361  while (i != n.end()) {
362  if (i->name() == "mirrors") {
363  JSONNode::const_iterator j = (*i).begin();
364  while (j != (*i).end()) {
365  int mirror = j->as_int();
366  vrecord._mirrors.set_bit((procid_t)mirror);
367  ++j;
368  }
369  } else if (i->name() == "inEdges") {
370  vrecord.num_in_edges = i->as_int();
371  } else if (i->name() == "outEdges") {
372  vrecord.num_out_edges = i->as_int();
373  } else if (i->name() == "gvid") {
374  // Check unsafe
375  vrecord.gvid = boost::lexical_cast<vertex_id_type>(i->as_int());
376  } else if (i->name() == "owner") {
377  vrecord.owner = (procid_t)i->as_int();
378  } else if (i->name() == "VertexData") {
379  if (!(i->type() == JSON_NULL))
380  vertex_parser(vdata, i->as_string());
381  } else {
382  logstream(LOG_ERROR) << "Error parsing json into vrecord. Unknown json node name:" <<
383  i->name() << std::endl;
384  }
385  ++i;
386  }
387 
388  if (graph.vid2lvid.find(vrecord.gvid) == graph.vid2lvid.end()) {
389  // Check if this a singlton node
390  // ignore for now
391  logstream(LOG_WARNING) << "Singleton node detected: gvid = " << vrecord.gvid << ". Ignored" << std::endl;
392  } else {
393  lvid_type lvid = graph.vid2lvid[vrecord.gvid];
394  graph.lvid2record[lvid] = vrecord;
395  local_graph.add_vertex(lvid, vdata);
396  if (vrecord.owner == graph.procid()) ++graph.local_own_nverts;
397  }
398 
399  return true;
400  }
401 
402 
403  /* Helper function starts here */
404  private:
405  /* Parse an json integer array and copy to a int vector */
406  static bool parse_vid_array (std::vector<vertex_id_type>& to, const JSONNode& n) {
407  if (n.type() != JSON_ARRAY) return false;
408  to.clear();
409  JSONNode::const_iterator i = n.begin();
410  while (i != n.end()) {
411  to.push_back(i->as_int());
412  ++i;
413  }
414  return true;
415  }
416 
417 
418  std::string zeropadding(const std::string& s, int width) {
419  ASSERT_LE(s.length(), width);
420  std::ostringstream ss;
421  ss << std::setw(width) << std::setfill('0') << s;
422  return ss.str();
423  }
424 
425 
426  const std::string graphfilename() {
427  procid_t pid = graph.procid();
428  std::string suffix = gzip ? ".gz" : "";
429  return "graph/graph"+tostr(pid)+"-r-"+zeropadding(tostr(pid), 5)+suffix;
430  // return "graph/graph"+tostr(pid)+"-r-00000"+suffix;
431  }
432 
433  const std::string vid2lvidfilename() {
434  procid_t pid = graph.procid();
435  std::string suffix = gzip ? ".gz" : "";
436  return "graph/vid2lvid"+tostr(pid)+"-r-"+zeropadding(tostr(pid), 5)+suffix;
437  // return "graph/vid2lvid"+tostr(pid)+"-r-00000"+suffix;
438  }
439 
440  const std::string edatafilename() {
441  procid_t pid = graph.procid();
442  std::string suffix = gzip ? ".gz" : "";
443  return "graph/edata"+tostr(pid)+"-r-"+zeropadding(tostr(pid),5)+suffix;
444  // return "graph/edata"+tostr(pid)+"-r-00000"+suffix;
445  }
446 
447  const std::string vrecordfilename() {
448  procid_t pid = graph.procid();
449  std::string suffix = gzip ? ".gz" : "";
450  return "vrecord/vdata"+tostr(pid)+"-r-"+zeropadding(tostr(pid),5)+suffix;
451  // return "vrecord/vdata"+tostr(pid)+"-r-00000"+ suffix;
452  }
453 
454  private:
455  graph_type& graph;
456  std::string prefix;
457  bool gzip;
458  edge_parser_type edge_parser;
459  vertex_parser_type vertex_parser;
460 }; // json_parser
461 
462 
463 } // namespace graphlab
464 #endif