GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
rpc_example7.cpp
1 /*
2  * Copyright (c) 2009 Carnegie Mellon University.
3  * All rights reserved.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing,
12  * software distributed under the License is distributed on an "AS
13  * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14  * express or implied. See the License for the specific language
15  * governing permissions and limitations under the License.
16  *
17  * For more about this software visit:
18  *
19  * http://www.graphlab.ml.cmu.edu
20  *
21  */
22 
23 
24 #include <iostream>
25 #include <cstdio>
26 #include <graphlab/serialization/serialization_includes.hpp>
27 #include <graphlab/parallel/pthread_tools.hpp>
28 #include <graphlab/rpc/dc.hpp>
29 #include <graphlab/rpc/dc_dist_object.hpp>
30 using namespace graphlab;
31 
32 template <typename T>
33 class distributed_vector {
34  private:
35  dc_dist_object<distributed_vector<T> > rmi; // The local RMI object
36  std::map<size_t, T> data; // storage
37  mutex lock; // protect the storage
38  public:
39  distributed_vector(distributed_control &dc):rmi(dc, this) { };
40 
41  ///Reads the value at key i
42  T get(size_t i) {
43  // find the owning machine
44  procid_t owningmachine = i % rmi.dc().numprocs();
45 
46  if (owningmachine == rmi.dc().procid()) {
47  // if I own the data. just read and return it
48  T ret;
49  lock.lock();
50  ret = data[i];
51  lock.unlock();
52  return ret;
53  }
54  else {
55  // otherwise I need to go to another machine
56  return rmi.remote_request(owningmachine,
57  &distributed_vector<T>::get,
58  i);
59  }
60  }
61 
62  /// Sets the value at key i
63  void set(size_t i, const T& val) {
64  // find the owning machine
65  procid_t owningmachine = i % rmi.dc().numprocs();
66 
67  if (owningmachine == rmi.dc().procid()) {
68  // if I own the data set it
69  lock.lock();
70  data[i] = val;
71  lock.unlock();
72  }
73  else {
74  // forward the write to another machine
75  rmi.remote_request(owningmachine,
76  &distributed_vector<T>::set,
77  i,
78  val);
79  }
80  }
81 };
82 
83 int main(int argc, char ** argv) {
84  mpi_tools::init(argc, argv);
86 
87  if (dc.numprocs() != 2) {
88  std::cout<< "RPC Example 7: Distributed Object\n";
89  std::cout << "Run with exactly 2 MPI nodes.\n";
90  return 0;
91  }
92 
93  size_t i = 10;
94  dc.all_reduce(i);
95  std::cout << i << "\n";
96  // create a distributed vector
97  distributed_vector<std::string> vec(dc);
98  dc.barrier();
99  if (dc.procid() == 0) {
100  vec.set(10, "set from 0");
101  vec.set(11, "set from 0");
102  }
103  else {
104  vec.set(1, "set from 1");
105  vec.set(2, "set from 1");
106  }
107  dc.barrier();
108  if (dc.procid() == 0) {
109  std::cout << vec.get(1) << "\n";
110  std::cout << vec.get(2) << "\n";
111  std::cout << vec.get(10) << "\n";
112  std::cout << vec.get(11) << std::endl;
113  }
114  dc.barrier();
115  if (dc.procid() == 1) {
116  std::cout << vec.get(1) << "\n";
117  std::cout << vec.get(2) << "\n";
118  std::cout << vec.get(10) << "\n";
119  std::cout << vec.get(11) << std::endl;
120  }
121  dc.barrier();
122 
123  mpi_tools::finalize();
124 }