GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
mpi_tools.hpp
1 /**
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #ifndef GRAPHLAB_MPI_TOOLS
25 #define GRAPHLAB_MPI_TOOLS
26 
27 #ifdef HAS_MPI
28 #include <mpi.h>
29 #endif
30 
31 #include <vector>
32 
33 #include <boost/iostreams/device/array.hpp>
34 #include <boost/iostreams/stream.hpp>
35 
36 #include <graphlab/serialization/serialization_includes.hpp>
37 #include <graphlab/util/charstream.hpp>
38 #include <graphlab/util/net_util.hpp>
39 
40 
41 
42 
43 
44 
45 #include <graphlab/macros_def.hpp>
46 
47 namespace graphlab {
48  namespace mpi_tools {
49 
50 
51  /**
52  * The init function is used to initialize MPI and must be called
53  * to clean the command line arguments.
54  */
55  inline void init(int& argc, char**& argv) {
56 #ifdef HAS_MPI
57  const int required(MPI_THREAD_SINGLE);
58  int provided(-1);
59  int error = MPI_Init_thread(&argc, &argv, required, &provided);
60  assert(provided >= required);
61  assert(error == MPI_SUCCESS);
62 #else
63  logstream(LOG_EMPH) << "MPI Support was not compiled." << std::endl;
64 #endif
65  } // end of init
66 
67  inline void finalize() {
68 #ifdef HAS_MPI
69  int error = MPI_Finalize();
70  assert(error == MPI_SUCCESS);
71 #endif
72  } // end of finalize
73 
74 
75  inline bool initialized() {
76 #ifdef HAS_MPI
77  int ret_value = 0;
78  int error = MPI_Initialized(&ret_value);
79  assert(error == MPI_SUCCESS);
80  return ret_value;
81 #else
82  return false;
83 #endif
84  } // end of initialized
85 
86  inline size_t rank() {
87 #ifdef HAS_MPI
88  int mpi_rank(-1);
89  MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
90  assert(mpi_rank >= 0);
91  return size_t(mpi_rank);
92 #else
93  return 0;
94 #endif
95  }
96 
97  inline size_t size() {
98 #ifdef HAS_MPI
99  int mpi_size(-1);
100  MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
101  assert(mpi_size >= 0);
102  return size_t(mpi_size);
103 #else
104  return 1;
105 #endif
106  }
107 
108 
109 
110  template<typename T>
111  void all_gather(const T& elem, std::vector<T>& results) {
112 #ifdef HAS_MPI
113  // Get the mpi rank and size
114  size_t mpi_size(size());
115  if(results.size() != mpi_size) results.resize(mpi_size);
116 
117  // Serialize the local map
118  graphlab::charstream cstrm(128);
119  graphlab::oarchive oarc(cstrm);
120  oarc << elem;
121  cstrm.flush();
122  char* send_buffer = cstrm->c_str();
123  int send_buffer_size = (int)cstrm->size();
124  assert(send_buffer_size >= 0);
125 
126  // compute the sizes
127  std::vector<int> recv_sizes(mpi_size, -1);
128  // Compute the sizes
129  int error = MPI_Allgather(&send_buffer_size, // Send buffer
130  1, // send count
131  MPI_INT, // send type
132  &(recv_sizes[0]), // recvbuffer
133  1, // recvcount
134  MPI_INT, // recvtype
135  MPI_COMM_WORLD);
136  assert(error == MPI_SUCCESS);
137  for(size_t i = 0; i < recv_sizes.size(); ++i)
138  assert(recv_sizes[i] >= 0);
139 
140 
141  // Construct offsets
142  std::vector<int> recv_offsets(recv_sizes);
143  int sum = 0, tmp = 0;
144  for(size_t i = 0; i < recv_offsets.size(); ++i) {
145  tmp = recv_offsets[i]; recv_offsets[i] = sum; sum += tmp;
146  }
147 
148  // if necessary realloac recv_buffer
149  std::vector<char> recv_buffer(sum);
150 
151  // recv all the maps
152  error = MPI_Allgatherv(send_buffer, // send buffer
153  send_buffer_size, // how much to send
154  MPI_BYTE, // send type
155  &(recv_buffer[0]), // recv buffer
156  &(recv_sizes[0]), // amount to recv
157  // for each cpuess
158  &(recv_offsets[0]), // where to place data
159  MPI_BYTE,
160  MPI_COMM_WORLD);
161  assert(error == MPI_SUCCESS);
162  // Update the local map
163  namespace bio = boost::iostreams;
164  typedef bio::stream<bio::array_source> icharstream;
165  icharstream strm(&(recv_buffer[0]), recv_buffer.size());
166  graphlab::iarchive iarc(strm);
167  for(size_t i = 0; i < results.size(); ++i) {
168  iarc >> results[i];
169  }
170 #else
171  logstream(LOG_FATAL) << "MPI not installed!" << std::endl;
172 #endif
173  } // end of mpi all gather
174 
175 
176 
177 
178  template<typename T>
179  void all2all(const std::vector<T>& send_data,
180  std::vector<T>& recv_data) {
181 #ifdef HAS_MPI
182  // Get the mpi rank and size
183  size_t mpi_size(size());
184  ASSERT_EQ(send_data.size(), mpi_size);
185  if(recv_data.size() != mpi_size) recv_data.resize(mpi_size);
186 
187  // Serialize the output data and compute buffer sizes
188  graphlab::charstream cstrm(128);
189  graphlab::oarchive oarc(cstrm);
190  std::vector<int> send_buffer_sizes(mpi_size);
191  for(size_t i = 0; i < mpi_size; ++i) {
192  const size_t OLD_SIZE(cstrm->size());
193  oarc << send_data[i];
194  cstrm.flush();
195  const size_t ELEM_SIZE(cstrm->size() - OLD_SIZE);
196  send_buffer_sizes[i] = ELEM_SIZE;
197  }
198  cstrm.flush();
199  char* send_buffer = cstrm->c_str();
200  std::vector<int> send_offsets(send_buffer_sizes);
201  int total_send = 0;
202  for(size_t i = 0; i < send_offsets.size(); ++i) {
203  const int tmp = send_offsets[i];
204  send_offsets[i] = total_send;
205  total_send += tmp;
206  }
207 
208  // AlltoAll scatter the buffer sizes
209  std::vector<int> recv_buffer_sizes(mpi_size);
210  int error = MPI_Alltoall(&(send_buffer_sizes[0]),
211  1,
212  MPI_INT,
213  &(recv_buffer_sizes[0]),
214  1,
215  MPI_INT,
216  MPI_COMM_WORLD);
217  ASSERT_EQ(error, MPI_SUCCESS);
218 
219  // Construct offsets
220  std::vector<int> recv_offsets(recv_buffer_sizes);
221  int total_recv = 0;
222  for(size_t i = 0; i < recv_offsets.size(); ++i){
223  const int tmp = recv_offsets[i];
224  recv_offsets[i] = total_recv;
225  total_recv += tmp;
226  }
227  // Do the massive send
228  std::vector<char> recv_buffer(total_recv);
229  error = MPI_Alltoallv(send_buffer,
230  &(send_buffer_sizes[0]),
231  &(send_offsets[0]),
232  MPI_BYTE,
233  &(recv_buffer[0]),
234  &(recv_buffer_sizes[0]),
235  &(recv_offsets[0]),
236  MPI_BYTE,
237  MPI_COMM_WORLD);
238  ASSERT_EQ(error, MPI_SUCCESS);
239 
240  // Deserialize the result
241  namespace bio = boost::iostreams;
242  typedef bio::stream<bio::array_source> icharstream;
243  icharstream strm(&(recv_buffer[0]), recv_buffer.size());
244  graphlab::iarchive iarc(strm);
245  for(size_t i = 0; i < recv_data.size(); ++i) {
246  iarc >> recv_data[i];
247  }
248 #else
249  logstream(LOG_FATAL) << "MPI not installed!" << std::endl;
250 #endif
251  } // end of mpi all to all
252 
253 
254 
255 
256 
257 
258 
259  /**
260  * called on the root. must be matched with gather(const T& elem);
261  */
262  template<typename T>
263  void gather(size_t root, const T& elem) {
264 #ifdef HAS_MPI
265  // Get the mpi rank and size
266  assert(root < size_t(std::numeric_limits<int>::max()));
267  int mpi_root(root);
268 
269  // Serialize the local map
270  graphlab::charstream cstrm(128);
271  graphlab::oarchive oarc(cstrm);
272  oarc << elem;
273  cstrm.flush();
274  char* send_buffer = cstrm->c_str();
275  int send_buffer_size = cstrm->size();
276  assert(send_buffer_size >= 0);
277 
278  // compute the sizes
279  // Compute the sizes
280  int error = MPI_Gather(&send_buffer_size, // Send buffer
281  1, // send count
282  MPI_INT, // send type
283  NULL, // recvbuffer
284  1, // recvcount
285  MPI_INT, // recvtype
286  mpi_root, // root rank
287  MPI_COMM_WORLD);
288  assert(error == MPI_SUCCESS);
289 
290 
291  // recv all the maps
292  error = MPI_Gatherv(send_buffer, // send buffer
293  send_buffer_size, // how much to send
294  MPI_BYTE, // send type
295  NULL, // recv buffer
296  NULL, // amount to recv
297  // for each cpuess
298  NULL, // where to place data
299  MPI_BYTE,
300  mpi_root, // root rank
301  MPI_COMM_WORLD);
302  assert(error == MPI_SUCCESS);
303 #else
304  logstream(LOG_FATAL) << "MPI not installed!" << std::endl;
305 #endif
306  } // end of gather
307 
308 
309 
310 
311 
312 
313  /**
314  * called on the root. must be matched with gather(const T& elem);
315  */
316  template<typename T>
317  void gather(const T& elem, std::vector<T>& results) {
318 #ifdef HAS_MPI
319  // Get the mpi rank and size
320  size_t mpi_size(size());
321  int mpi_rank(rank());
322  if(results.size() != mpi_size) results.resize(mpi_size);
323 
324  // Serialize the local map
325  graphlab::charstream cstrm(128);
326  graphlab::oarchive oarc(cstrm);
327  oarc << elem;
328  cstrm.flush();
329  char* send_buffer = cstrm->c_str();
330  int send_buffer_size = cstrm->size();
331  assert(send_buffer_size >= 0);
332 
333  // compute the sizes
334  std::vector<int> recv_sizes(mpi_size, -1);
335  // Compute the sizes
336  int error = MPI_Gather(&send_buffer_size, // Send buffer
337  1, // send count
338  MPI_INT, // send type
339  &(recv_sizes[0]), // recvbuffer
340  1, // recvcount
341  MPI_INT, // recvtype
342  mpi_rank, // root rank
343  MPI_COMM_WORLD);
344  assert(error == MPI_SUCCESS);
345  for(size_t i = 0; i < recv_sizes.size(); ++i)
346  assert(recv_sizes[i] >= 0);
347 
348 
349  // Construct offsets
350  std::vector<int> recv_offsets(recv_sizes);
351  int sum = 0, tmp = 0;
352  for(size_t i = 0; i < recv_offsets.size(); ++i) {
353  tmp = recv_offsets[i]; recv_offsets[i] = sum; sum += tmp;
354  }
355 
356  // if necessary realloac recv_buffer
357  std::vector<char> recv_buffer(sum);
358 
359  // recv all the maps
360  error = MPI_Gatherv(send_buffer, // send buffer
361  send_buffer_size, // how much to send
362  MPI_BYTE, // send type
363  &(recv_buffer[0]), // recv buffer
364  &(recv_sizes[0]), // amount to recv
365  // for each cpuess
366  &(recv_offsets[0]), // where to place data
367  MPI_BYTE,
368  mpi_rank, // root rank
369  MPI_COMM_WORLD);
370  assert(error == MPI_SUCCESS);
371  // Update the local map
372  namespace bio = boost::iostreams;
373  typedef bio::stream<bio::array_source> icharstream;
374  icharstream strm(&(recv_buffer[0]), recv_buffer.size());
375  graphlab::iarchive iarc(strm);
376  for(size_t i = 0; i < results.size(); ++i) {
377  iarc >> results[i];
378  }
379 #else
380  logstream(LOG_FATAL) << "MPI not installed!" << std::endl;
381 #endif
382  } // end of gather
383 
384 
385 
386  /**
387  * called on the root. must be matched with gather(const T& elem);
388  */
389  template<typename T>
390  void bcast(const size_t& root, T& elem) {
391 #ifdef HAS_MPI
392  // Get the mpi rank and size
393  if(mpi_tools::rank() == root) {
394  // serialize the object
395  graphlab::charstream cstrm(128);
396  graphlab::oarchive oarc(cstrm);
397  oarc << elem;
398  cstrm.flush();
399  char* send_buffer = cstrm->c_str();
400  int send_buffer_size = cstrm->size();
401  assert(send_buffer_size >= 0);
402 
403  // send the ammount to send
404  int error = MPI_Bcast(&send_buffer_size, // Send buffer
405  1, // send count
406  MPI_INT, // send type
407  root, // root rank
408  MPI_COMM_WORLD);
409  assert(error == MPI_SUCCESS);
410 
411  // send the actual data
412  error = MPI_Bcast(send_buffer, // Send buffer
413  send_buffer_size, // send count
414  MPI_BYTE, // send type
415  root, // root rank
416  MPI_COMM_WORLD);
417  assert(error == MPI_SUCCESS);
418 
419  } else {
420  int recv_buffer_size(-1);
421  // recv the ammount the required buffer size
422  int error = MPI_Bcast(&recv_buffer_size, // recvbuffer
423  1, // recvcount
424  MPI_INT, // recvtype
425  root, // root rank
426  MPI_COMM_WORLD);
427  assert(error == MPI_SUCCESS);
428  assert(recv_buffer_size >= 0);
429 
430  std::vector<char> recv_buffer(recv_buffer_size);
431  error = MPI_Bcast(&(recv_buffer[0]), // recvbuffer
432  recv_buffer_size, // recvcount
433  MPI_BYTE, // recvtype
434  root, // root rank
435  MPI_COMM_WORLD);
436  assert(error == MPI_SUCCESS);
437  // construct the local element
438  namespace bio = boost::iostreams;
439  typedef bio::stream<bio::array_source> icharstream;
440  icharstream strm(&(recv_buffer[0]), recv_buffer.size());
441  graphlab::iarchive iarc(strm);
442  iarc >> elem;
443 
444  }
445 #else
446  logstream(LOG_FATAL) << "MPI not installed!" << std::endl;
447 #endif
448  } // end of bcast
449 
450 
451 
452  template<typename T>
453  void send(const T& elem, const size_t id, const int tag = 0) {
454 #ifdef HAS_MPI
455  // Get the mpi rank and size
456  assert(id < size());
457  // Serialize the local map
458  graphlab::charstream cstrm(128);
459  graphlab::oarchive oarc(cstrm);
460  oarc << elem;
461  cstrm.flush();
462  char* send_buffer = cstrm->c_str();
463  int send_buffer_size = cstrm->size();
464  assert(send_buffer_size >= 0);
465 
466  int dest(id);
467  // send the size
468  int error = MPI_Send(&send_buffer_size, // Send buffer
469  1, // send count
470  MPI_INT, // send type
471  dest, // destination
472  tag, // tag
473  MPI_COMM_WORLD);
474  assert(error == MPI_SUCCESS);
475 
476  // send the actual content
477  error = MPI_Send(send_buffer, // send buffer
478  send_buffer_size, // how much to send
479  MPI_BYTE, // send type
480  dest,
481  tag,
482  MPI_COMM_WORLD);
483  assert(error == MPI_SUCCESS);
484 #else
485  logstream(LOG_FATAL) << "MPI not installed!" << std::endl;
486 #endif
487  } // end of send
488 
489 
490 
491  template<typename T>
492  void recv(T& elem, const size_t id, const int tag = 0) {
493 #ifdef HAS_MPI
494  // Get the mpi rank and size
495  assert(id < size());
496 
497  int recv_buffer_size(-1);
498  int dest(id);
499  MPI_Status status;
500  // recv the size
501  int error = MPI_Recv(&recv_buffer_size,
502  1,
503  MPI_INT,
504  dest,
505  tag,
506  MPI_COMM_WORLD,
507  &status);
508  assert(error == MPI_SUCCESS);
509  assert(recv_buffer_size > 0);
510 
511  std::vector<char> recv_buffer(recv_buffer_size);
512  // recv the actual content
513  error = MPI_Recv(&(recv_buffer[0]),
514  recv_buffer_size,
515  MPI_BYTE,
516  dest,
517  tag,
518  MPI_COMM_WORLD,
519  &status);
520  assert(error == MPI_SUCCESS);
521  // deserialize
522  // Update the local map
523  namespace bio = boost::iostreams;
524  typedef bio::stream<bio::array_source> icharstream;
525  icharstream strm(&(recv_buffer[0]), recv_buffer.size());
526  graphlab::iarchive iarc(strm);
527  iarc >> elem;
528 #else
529  logstream(LOG_FATAL) << "MPI not installed!" << std::endl;
530 #endif
531  }
532 
533 
534 
535 
536  void get_master_ranks(std::set<size_t>& master_ranks);
537 
538 
539 
540 
541 
542  }; // end of namespace mpi tools
543 }; //end of graphlab namespace
544 #include <graphlab/macros_undef.hpp>
545 #endif
546