24 #ifndef GRAPHLAB_MPI_TOOLS
25 #define GRAPHLAB_MPI_TOOLS
33 #include <boost/iostreams/device/array.hpp>
34 #include <boost/iostreams/stream.hpp>
36 #include <graphlab/serialization/serialization_includes.hpp>
37 #include <graphlab/util/charstream.hpp>
38 #include <graphlab/util/net_util.hpp>
45 #include <graphlab/macros_def.hpp>
55 inline void init(
int& argc,
char**& argv) {
57 const int required(MPI_THREAD_SINGLE);
59 int error = MPI_Init_thread(&argc, &argv, required, &provided);
60 assert(provided >= required);
61 assert(error == MPI_SUCCESS);
63 logstream(
LOG_EMPH) <<
"MPI Support was not compiled." << std::endl;
67 inline void finalize() {
69 int error = MPI_Finalize();
70 assert(error == MPI_SUCCESS);
75 inline bool initialized() {
78 int error = MPI_Initialized(&ret_value);
79 assert(error == MPI_SUCCESS);
86 inline size_t rank() {
89 MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
90 assert(mpi_rank >= 0);
91 return size_t(mpi_rank);
97 inline size_t size() {
100 MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
101 assert(mpi_size >= 0);
102 return size_t(mpi_size);
111 void all_gather(
const T& elem, std::vector<T>& results) {
114 size_t mpi_size(size());
115 if(results.size() != mpi_size) results.resize(mpi_size);
122 char* send_buffer = cstrm->c_str();
123 int send_buffer_size = (int)cstrm->size();
124 assert(send_buffer_size >= 0);
127 std::vector<int> recv_sizes(mpi_size, -1);
129 int error = MPI_Allgather(&send_buffer_size,
136 assert(error == MPI_SUCCESS);
137 for(
size_t i = 0; i < recv_sizes.size(); ++i)
138 assert(recv_sizes[i] >= 0);
142 std::vector<int> recv_offsets(recv_sizes);
143 int sum = 0, tmp = 0;
144 for(
size_t i = 0; i < recv_offsets.size(); ++i) {
145 tmp = recv_offsets[i]; recv_offsets[i] = sum; sum += tmp;
149 std::vector<char> recv_buffer(sum);
152 error = MPI_Allgatherv(send_buffer,
161 assert(error == MPI_SUCCESS);
163 namespace bio = boost::iostreams;
164 typedef bio::stream<bio::array_source> icharstream;
165 icharstream strm(&(recv_buffer[0]), recv_buffer.size());
167 for(
size_t i = 0; i < results.size(); ++i) {
171 logstream(
LOG_FATAL) <<
"MPI not installed!" << std::endl;
179 void all2all(
const std::vector<T>& send_data,
180 std::vector<T>& recv_data) {
183 size_t mpi_size(size());
184 ASSERT_EQ(send_data.size(), mpi_size);
185 if(recv_data.size() != mpi_size) recv_data.resize(mpi_size);
190 std::vector<int> send_buffer_sizes(mpi_size);
191 for(
size_t i = 0; i < mpi_size; ++i) {
192 const size_t OLD_SIZE(cstrm->size());
193 oarc << send_data[i];
195 const size_t ELEM_SIZE(cstrm->size() - OLD_SIZE);
196 send_buffer_sizes[i] = ELEM_SIZE;
199 char* send_buffer = cstrm->c_str();
200 std::vector<int> send_offsets(send_buffer_sizes);
202 for(
size_t i = 0; i < send_offsets.size(); ++i) {
203 const int tmp = send_offsets[i];
204 send_offsets[i] = total_send;
209 std::vector<int> recv_buffer_sizes(mpi_size);
210 int error = MPI_Alltoall(&(send_buffer_sizes[0]),
213 &(recv_buffer_sizes[0]),
217 ASSERT_EQ(error, MPI_SUCCESS);
220 std::vector<int> recv_offsets(recv_buffer_sizes);
222 for(
size_t i = 0; i < recv_offsets.size(); ++i){
223 const int tmp = recv_offsets[i];
224 recv_offsets[i] = total_recv;
228 std::vector<char> recv_buffer(total_recv);
229 error = MPI_Alltoallv(send_buffer,
230 &(send_buffer_sizes[0]),
234 &(recv_buffer_sizes[0]),
238 ASSERT_EQ(error, MPI_SUCCESS);
241 namespace bio = boost::iostreams;
242 typedef bio::stream<bio::array_source> icharstream;
243 icharstream strm(&(recv_buffer[0]), recv_buffer.size());
245 for(
size_t i = 0; i < recv_data.size(); ++i) {
246 iarc >> recv_data[i];
249 logstream(
LOG_FATAL) <<
"MPI not installed!" << std::endl;
263 void gather(
size_t root,
const T& elem) {
266 assert(root <
size_t(std::numeric_limits<int>::max()));
274 char* send_buffer = cstrm->c_str();
275 int send_buffer_size = cstrm->size();
276 assert(send_buffer_size >= 0);
280 int error = MPI_Gather(&send_buffer_size,
288 assert(error == MPI_SUCCESS);
292 error = MPI_Gatherv(send_buffer,
302 assert(error == MPI_SUCCESS);
304 logstream(
LOG_FATAL) <<
"MPI not installed!" << std::endl;
317 void gather(
const T& elem, std::vector<T>& results) {
320 size_t mpi_size(size());
321 int mpi_rank(rank());
322 if(results.size() != mpi_size) results.resize(mpi_size);
329 char* send_buffer = cstrm->c_str();
330 int send_buffer_size = cstrm->size();
331 assert(send_buffer_size >= 0);
334 std::vector<int> recv_sizes(mpi_size, -1);
336 int error = MPI_Gather(&send_buffer_size,
344 assert(error == MPI_SUCCESS);
345 for(
size_t i = 0; i < recv_sizes.size(); ++i)
346 assert(recv_sizes[i] >= 0);
350 std::vector<int> recv_offsets(recv_sizes);
351 int sum = 0, tmp = 0;
352 for(
size_t i = 0; i < recv_offsets.size(); ++i) {
353 tmp = recv_offsets[i]; recv_offsets[i] = sum; sum += tmp;
357 std::vector<char> recv_buffer(sum);
360 error = MPI_Gatherv(send_buffer,
370 assert(error == MPI_SUCCESS);
372 namespace bio = boost::iostreams;
373 typedef bio::stream<bio::array_source> icharstream;
374 icharstream strm(&(recv_buffer[0]), recv_buffer.size());
376 for(
size_t i = 0; i < results.size(); ++i) {
380 logstream(
LOG_FATAL) <<
"MPI not installed!" << std::endl;
390 void bcast(
const size_t& root, T& elem) {
393 if(mpi_tools::rank() == root) {
399 char* send_buffer = cstrm->c_str();
400 int send_buffer_size = cstrm->size();
401 assert(send_buffer_size >= 0);
404 int error = MPI_Bcast(&send_buffer_size,
409 assert(error == MPI_SUCCESS);
412 error = MPI_Bcast(send_buffer,
417 assert(error == MPI_SUCCESS);
420 int recv_buffer_size(-1);
422 int error = MPI_Bcast(&recv_buffer_size,
427 assert(error == MPI_SUCCESS);
428 assert(recv_buffer_size >= 0);
430 std::vector<char> recv_buffer(recv_buffer_size);
431 error = MPI_Bcast(&(recv_buffer[0]),
436 assert(error == MPI_SUCCESS);
438 namespace bio = boost::iostreams;
439 typedef bio::stream<bio::array_source> icharstream;
440 icharstream strm(&(recv_buffer[0]), recv_buffer.size());
446 logstream(
LOG_FATAL) <<
"MPI not installed!" << std::endl;
453 void send(
const T& elem,
const size_t id,
const int tag = 0) {
462 char* send_buffer = cstrm->c_str();
463 int send_buffer_size = cstrm->size();
464 assert(send_buffer_size >= 0);
468 int error = MPI_Send(&send_buffer_size,
474 assert(error == MPI_SUCCESS);
477 error = MPI_Send(send_buffer,
483 assert(error == MPI_SUCCESS);
485 logstream(
LOG_FATAL) <<
"MPI not installed!" << std::endl;
492 void recv(T& elem,
const size_t id,
const int tag = 0) {
497 int recv_buffer_size(-1);
501 int error = MPI_Recv(&recv_buffer_size,
508 assert(error == MPI_SUCCESS);
509 assert(recv_buffer_size > 0);
511 std::vector<char> recv_buffer(recv_buffer_size);
513 error = MPI_Recv(&(recv_buffer[0]),
520 assert(error == MPI_SUCCESS);
523 namespace bio = boost::iostreams;
524 typedef bio::stream<bio::array_source> icharstream;
525 icharstream strm(&(recv_buffer[0]), recv_buffer.size());
529 logstream(
LOG_FATAL) <<
"MPI not installed!" << std::endl;
536 void get_master_ranks(std::set<size_t>& master_ranks);
544 #include <graphlab/macros_undef.hpp>