GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
hdfs.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 #ifndef GRAPHLAB_HDFS_HPP
24 #define GRAPHLAB_HDFS_HPP
25 
26 // Requires the hdfs library
27 #ifdef HAS_HADOOP
28 extern "C" {
29  #include <hdfs.h>
30 }
31 #endif
32 
33 #include <vector>
34 #include <boost/iostreams/stream.hpp>
35 
36 
37 #include <graphlab/logger/assertions.hpp>
38 
39 
40 namespace graphlab {
41 
42 #ifdef HAS_HADOOP
43  class hdfs {
44  private:
45  /** the primary filesystem object */
46  hdfsFS filesystem;
47  public:
48  /** hdfs file source is used to construct boost iostreams */
49  class hdfs_device {
50  public: // boost iostream concepts
51  typedef char char_type;
52  struct category :
53  public boost::iostreams::bidirectional_device_tag,
54  public boost::iostreams::multichar_tag,
55  public boost::iostreams::closable_tag { };
56  private:
57  hdfsFS filesystem;
58 
59  hdfsFile file;
60 
61  public:
62  hdfs_device() : filesystem(NULL), file(NULL) { }
63  hdfs_device(const hdfs& hdfs_fs, const std::string& filename,
64  const bool write = false) :
65  filesystem(hdfs_fs.filesystem) {
66  ASSERT_TRUE(filesystem != NULL);
67  // open the file
68  const int flags = write? O_WRONLY : O_RDONLY;
69  const int buffer_size = 0; // use default
70  const short replication = 0; // use default
71  const tSize block_size = 0; // use default;
72  file = hdfsOpenFile(filesystem, filename.c_str(), flags, buffer_size,
73  replication, block_size);
74  }
75  // ~hdfs_device() { if(file != NULL) close(); }
76 
77  void close(std::ios_base::openmode mode = std::ios_base::openmode() ) {
78  if(file == NULL) return;
79  if(file->type == OUTPUT) {
80  const int flush_error = hdfsFlush(filesystem, file);
81  ASSERT_EQ(flush_error, 0);
82  }
83  const int close_error = hdfsCloseFile(filesystem, file);
84  ASSERT_EQ(close_error, 0);
85  file = NULL;
86  }
87 
88  /** the optimal buffer size is 0. */
89  inline std::streamsize optimal_buffer_size() const { return 0; }
90 
91  std::streamsize read(char* strm_ptr, std::streamsize n) {
92  return hdfsRead(filesystem, file, strm_ptr, n);
93  } // end of read
94  std::streamsize write(const char* strm_ptr, std::streamsize n) {
95  return hdfsWrite(filesystem, file, strm_ptr, n);
96  }
97  bool good() const { return file != NULL; }
98  }; // end of hdfs device
99 
100  /**
101  * The basic file type has constructor matching the hdfs device.
102  */
103  typedef boost::iostreams::stream<hdfs_device> fstream;
104 
105  /**
106  * Open a connection to the filesystem. The default arguments
107  * should be sufficient for most uses
108  */
109  hdfs(const std::string& host = "default", tPort port = 0) {
110  filesystem = hdfsConnect(host.c_str(), port);
111  ASSERT_TRUE(filesystem != NULL);
112  } // end of constructor
113 
114  ~hdfs() {
115  const int error = hdfsDisconnect(filesystem);
116  ASSERT_EQ(error, 0);
117  } // end of ~hdfs
118 
119  inline std::vector<std::string> list_files(const std::string& path) {
120  int num_files = 0;
121  hdfsFileInfo* hdfs_file_list_ptr =
122  hdfsListDirectory(filesystem, path.c_str(), &num_files);
123  // copy the file list to the string array
124  std::vector<std::string> files(num_files);
125  for(int i = 0; i < num_files; ++i)
126  files[i] = std::string(hdfs_file_list_ptr[i].mName);
127  // free the file list pointer
128  hdfsFreeFileInfo(hdfs_file_list_ptr, num_files);
129  return files;
130  } // end of list_files
131 
132  inline static bool has_hadoop() { return true; }
133 
134  static hdfs& get_hdfs();
135  }; // end of class hdfs
136 #else
137 
138 
139 
140  class hdfs {
141  public:
142  /** hdfs file source is used to construct boost iostreams */
143  class hdfs_device {
144  public: // boost iostream concepts
145  typedef char char_type;
146  typedef boost::iostreams::bidirectional_device_tag category;
147  public:
148  hdfs_device(const hdfs& hdfs_fs, const std::string& filename,
149  const bool write = false) {
150  logstream(LOG_FATAL) << "Libhdfs is not installed on this system."
151  << std::endl;
152  }
153  void close() { }
154  std::streamsize read(char* strm_ptr, std::streamsize n) {
155  logstream(LOG_FATAL) << "Libhdfs is not installed on this system."
156  << std::endl;
157  return 0;
158  } // end of read
159  std::streamsize write(const char* strm_ptr, std::streamsize n) {
160  logstream(LOG_FATAL) << "Libhdfs is not installed on this system."
161  << std::endl;
162  return 0;
163  }
164  bool good() const { return false; }
165  }; // end of hdfs device
166 
167  /**
168  * The basic file type has constructor matching the hdfs device.
169  */
170  typedef boost::iostreams::stream<hdfs_device> fstream;
171 
172  /**
173  * Open a connection to the filesystem. The default arguments
174  * should be sufficient for most uses
175  */
176  hdfs(const std::string& host = "default", int port = 0) {
177  logstream(LOG_FATAL) << "Libhdfs is not installed on this system."
178  << std::endl;
179  } // end of constructor
180 
181 
182 
183  inline std::vector<std::string> list_files(const std::string& path) {
184  logstream(LOG_FATAL) << "Libhdfs is not installed on this system."
185  << std::endl;
186  return std::vector<std::string>();;
187  } // end of list_files
188 
189  // No hadoop available
190  inline static bool has_hadoop() { return false; }
191 
192  static hdfs& get_hdfs();
193  }; // end of class hdfs
194 
195 
196 #endif
197 
198 }; // end of namespace graphlab
199 #endif