23 #ifndef GRAPHLAB_HDFS_HPP
24 #define GRAPHLAB_HDFS_HPP
34 #include <boost/iostreams/stream.hpp>
37 #include <graphlab/logger/assertions.hpp>
51 typedef char char_type;
53 public boost::iostreams::bidirectional_device_tag,
54 public boost::iostreams::multichar_tag,
55 public boost::iostreams::closable_tag { };
62 hdfs_device() : filesystem(NULL), file(NULL) { }
63 hdfs_device(
const hdfs& hdfs_fs,
const std::string& filename,
64 const bool write =
false) :
65 filesystem(hdfs_fs.filesystem) {
66 ASSERT_TRUE(filesystem != NULL);
68 const int flags = write? O_WRONLY : O_RDONLY;
69 const int buffer_size = 0;
70 const short replication = 0;
71 const tSize block_size = 0;
72 file = hdfsOpenFile(filesystem, filename.c_str(), flags, buffer_size,
73 replication, block_size);
77 void close(std::ios_base::openmode mode = std::ios_base::openmode() ) {
78 if(file == NULL)
return;
79 if(file->type == OUTPUT) {
80 const int flush_error = hdfsFlush(filesystem, file);
81 ASSERT_EQ(flush_error, 0);
83 const int close_error = hdfsCloseFile(filesystem, file);
84 ASSERT_EQ(close_error, 0);
89 inline std::streamsize optimal_buffer_size()
const {
return 0; }
91 std::streamsize read(
char* strm_ptr, std::streamsize n) {
92 return hdfsRead(filesystem, file, strm_ptr, n);
94 std::streamsize write(
const char* strm_ptr, std::streamsize n) {
95 return hdfsWrite(filesystem, file, strm_ptr, n);
97 bool good()
const {
return file != NULL; }
103 typedef boost::iostreams::stream<hdfs_device> fstream;
109 hdfs(
const std::string& host =
"default", tPort port = 0) {
110 filesystem = hdfsConnect(host.c_str(), port);
111 ASSERT_TRUE(filesystem != NULL);
115 const int error = hdfsDisconnect(filesystem);
119 inline std::vector<std::string> list_files(
const std::string& path) {
121 hdfsFileInfo* hdfs_file_list_ptr =
122 hdfsListDirectory(filesystem, path.c_str(), &num_files);
124 std::vector<std::string> files(num_files);
125 for(
int i = 0; i < num_files; ++i)
126 files[i] = std::string(hdfs_file_list_ptr[i].mName);
128 hdfsFreeFileInfo(hdfs_file_list_ptr, num_files);
132 inline static bool has_hadoop() {
return true; }
134 static hdfs& get_hdfs();
145 typedef char char_type;
146 typedef boost::iostreams::bidirectional_device_tag category;
148 hdfs_device(
const hdfs& hdfs_fs,
const std::string& filename,
149 const bool write =
false) {
150 logstream(
LOG_FATAL) <<
"Libhdfs is not installed on this system."
154 std::streamsize read(
char* strm_ptr, std::streamsize n) {
155 logstream(
LOG_FATAL) <<
"Libhdfs is not installed on this system."
159 std::streamsize write(
const char* strm_ptr, std::streamsize n) {
160 logstream(
LOG_FATAL) <<
"Libhdfs is not installed on this system."
164 bool good()
const {
return false; }
170 typedef boost::iostreams::stream<hdfs_device> fstream;
176 hdfs(
const std::string& host =
"default",
int port = 0) {
177 logstream(
LOG_FATAL) <<
"Libhdfs is not installed on this system."
183 inline std::vector<std::string> list_files(
const std::string& path) {
184 logstream(
LOG_FATAL) <<
"Libhdfs is not installed on this system."
186 return std::vector<std::string>();;
190 inline static bool has_hadoop() {
return false; }
192 static hdfs& get_hdfs();