GraphLab: Distributed Graph-Parallel API  2.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
rpc_call_perf_test.cpp
1 #include <graphlab/rpc/dc.hpp>
2 #include <graphlab/rpc/dc_dist_object.hpp>
3 #include <graphlab/rpc/dc_init_from_mpi.hpp>
4 #include <graphlab/util/mpi_tools.hpp>
5 #include <graphlab/util/timer.hpp>
6 using namespace graphlab;
7 
8 #define SEND_LIMIT (64 * 1024 * 1024)
9 #define SEND_LIMIT_PRINT "64MB"
10 struct teststruct {
11 
13  teststruct(distributed_control &dc):rmi(dc, this) {
14  dc.barrier();
15  }
16 
17  /**
18  * Receiver
19  */
20 
21  atomic<size_t> ctr;
22  void receive_ints(size_t i0, size_t i1, size_t i2, size_t i3) {
23  ctr.inc();
24  }
25 
26 
27  void receive_vector(const std::vector<size_t> &s) {
28  ctr.inc();
29  }
30 
31  void receive_string(const std::string &s) {
32  ctr.inc();
33  }
34 
35 
36  /**
37  * Short Sends With Remote Call
38  */
39 
40  void perform_short_sends_0(size_t number) {
41  for (size_t i = 0;i < number; ++i) {
42  rmi.remote_call(1, &teststruct::receive_ints, 100,100,1000,5000000);
43  }
44  }
45 
46  void perform_short_pod_sends_0(size_t number) {
47  for (size_t i = 0;i < number; ++i) {
48  rmi.pod_call(1, &teststruct::receive_ints, 100,100,1000,5000000);
49  }
50  }
51 
52 
53  void perform_long_sends_0(size_t length, size_t number) {
54  std::vector<size_t> v(length, 5000000);
55  for (size_t i = 0;i < number; ++i) {
56  rmi.remote_call(1, &teststruct::receive_vector, v);
57  }
58  }
59 
60 
61  void perform_string_sends_0(size_t length, size_t number) {
62  std::string s(length, 1);
63  for (size_t i = 0;i < number; ++i) {
64  rmi.remote_call(1, &teststruct::receive_string, s);
65  }
66  }
67 
68 
69 
70  void print_res(double t1, double t2, double t3) {
71  std::cout << "Calls Sent at ";
72  std::cout << SEND_LIMIT / t1 / 1024 / 1024 << " MB/s\n";
73  std::cout << "Receive Completed at ";
74  std::cout << SEND_LIMIT / t3 / 1024 / 1024 << " MB/s\n\n";
75 
76  }
77  void run_short_sends_0() {
78  if (rmi.procid() == 1) {
79  rmi.full_barrier();
80  return;
81  }
82  timer ti;
83  std::cout << "Single Threaded " << SEND_LIMIT_PRINT << " sends, 4 integer blocks\n";
84  ti.start();
85  size_t numsends = SEND_LIMIT / (sizeof(size_t) * 4);
86  perform_short_sends_0(numsends);
87  double t1 = ti.current_time();
88  rmi.dc().flush();
89  double t2 = ti.current_time();
90  rmi.full_barrier();
91  double t3 = ti.current_time();
92  print_res(t1,t2,t3);
93  }
94 
95 
96  void run_threaded_short_sends_0(size_t numthreads) {
97  if (rmi.procid() == 1) {
98  rmi.full_barrier();
99  return;
100  }
101  timer ti;
102  std::cout << numthreads << " threaded " << SEND_LIMIT_PRINT << " sends, 4 integer blocks\n";
103  ti.start();
104  thread_group thrgrp;
105  size_t numsends = SEND_LIMIT / (sizeof(size_t) * 4 * numthreads);
106  for (size_t i = 0; i < numthreads; ++i) {
107  thrgrp.launch(boost::bind(&teststruct::perform_short_sends_0, this, numsends));
108  }
109  thrgrp.join();
110  double t1 = ti.current_time();
111  rmi.dc().flush();
112  double t2 = ti.current_time();
113  rmi.full_barrier();
114  double t3 = ti.current_time();
115  print_res(t1,t2,t3);
116  }
117 
118 
119  void run_short_pod_sends_0() {
120  if (rmi.procid() == 1) {
121  rmi.full_barrier();
122  return;
123  }
124  timer ti;
125  std::cout << "Single Threaded "<< SEND_LIMIT_PRINT <<" POD sends, 4 integers\n";
126  ti.start();
127  size_t numsends = SEND_LIMIT / (sizeof(size_t) * 4);
128  perform_short_pod_sends_0(numsends);
129  double t1 = ti.current_time();
130  rmi.dc().flush();
131  double t2 = ti.current_time();
132  rmi.full_barrier();
133  double t3 = ti.current_time();
134  print_res(t1,t2,t3);
135  }
136 
137 
138  void run_threaded_short_pod_sends_0(size_t numthreads) {
139  if (rmi.procid() == 1) {
140  rmi.full_barrier();
141  return;
142  }
143  timer ti;
144  std::cout << numthreads << " threaded "<< SEND_LIMIT_PRINT <<" POD sends, 4 integers\n";
145  size_t numsends = SEND_LIMIT / (sizeof(size_t) * 4 * numthreads);
146  ti.start();
147  thread_group thrgrp;
148  for (size_t i = 0; i < numthreads; ++i) {
149  thrgrp.launch(boost::bind(&teststruct::perform_short_pod_sends_0, this, numsends));
150  }
151  thrgrp.join();
152  double t1 = ti.current_time();
153  rmi.dc().flush();
154  double t2 = ti.current_time();
155  rmi.full_barrier();
156  double t3 = ti.current_time();
157  print_res(t1,t2,t3);
158  }
159 
160 
161 
162 
163  void run_string_sends_0(size_t length) {
164  if (rmi.procid() == 1) {
165  rmi.full_barrier();
166  return;
167  }
168  rmi.dc().stop_handler_threads(1,1);
169  timer ti;
170  std::cout << "Single Threaded " << SEND_LIMIT_PRINT <<" sends, " << length << " bytes\n";
171  ti.start();
172  size_t numsends = SEND_LIMIT / (length);
173  size_t rd = rdtsc();
174  perform_string_sends_0(length, numsends);
175  size_t rd2 = rdtsc();
176  std::cout << (rd2 - rd) / numsends << " cycles per call\n";
177  double t1 = ti.current_time();
178  rmi.dc().start_handler_threads(1,1);
179  rmi.dc().flush();
180  double t2 = ti.current_time();
181  rmi.full_barrier();
182  double t3 = ti.current_time();
183  print_res(t1,t2,t3);
184  }
185 
186 
187  void run_threaded_string_sends_0(size_t length, size_t numthreads) {
188  if (rmi.procid() == 1) {
189  rmi.full_barrier();
190  return;
191  }
192  timer ti;
193  std::cout << numthreads << " threaded " << SEND_LIMIT_PRINT <<" sends, "
194  << length << " bytes\n";
195  ti.start();
196  size_t numsends = SEND_LIMIT / (length * numthreads);
197  rmi.dc().stop_handler_threads(1,1);
198  size_t rd = rdtsc();
199  thread_group thrgrp;
200  for (size_t i = 0; i < numthreads; ++i) {
201  thrgrp.launch(boost::bind(&teststruct::perform_string_sends_0, this, length, numsends));
202  }
203  thrgrp.join();
204  size_t rd2 = rdtsc();
205  std::cout << (rd2 - rd) / (numthreads * numsends) << " cycles per call\n";
206  double t1 = ti.current_time();
207  rmi.dc().start_handler_threads(1,1);
208  rmi.dc().flush();
209  double t2 = ti.current_time();
210  rmi.full_barrier();
211  double t3 = ti.current_time();
212  print_res(t1,t2,t3);
213  }
214 
215 };
216 
217 
218 int main(int argc, char** argv) {
219  // init MPI
220  mpi_tools::init(argc, argv);
222 
223  if (dc.numprocs() != 2) {
224  std::cout << "Run with exactly 2 MPI nodes.\n";
225  return 0;
226  }
227  dc.barrier();
228  teststruct ts(dc);
229  /*
230  ts.run_short_sends_0();
231  ts.run_threaded_short_sends_0(2);
232  ts.run_threaded_short_sends_0(4);
233  ts.run_threaded_short_sends_0(8);
234  ts.run_threaded_short_sends_0(16);
235  ts.run_short_pod_sends_0();
236  ts.run_threaded_short_pod_sends_0(2);
237  ts.run_threaded_short_pod_sends_0(4);
238  ts.run_threaded_short_pod_sends_0(8);
239  ts.run_threaded_short_pod_sends_0(16);
240  ts.run_long_sends_0(1024);
241  ts.run_threaded_long_sends_0(1024, 2);
242  ts.run_threaded_long_sends_0(1024, 4);
243  ts.run_threaded_long_sends_0(1024, 8);
244  ts.run_threaded_long_sends_0(1024, 16);
245  ts.run_long_sends_0(10240);
246  ts.run_threaded_long_sends_0(10240, 2);
247  ts.run_threaded_long_sends_0(10240, 4);
248  ts.run_threaded_long_sends_0(10240, 8);
249  ts.run_threaded_long_sends_0(10240, 16);
250  */
251  for (size_t i = 4; i < 24; ++i) {
252  ts.run_string_sends_0(1<<i);
253  }
254 
255 
256  ts.run_threaded_string_sends_0(16, 1);
257  ts.run_threaded_string_sends_0(16, 2);
258  ts.run_threaded_string_sends_0(16, 4);
259  ts.run_threaded_string_sends_0(16, 8);
260  ts.run_threaded_string_sends_0(16, 16);
261  dc.barrier();
262  mpi_tools::finalize();
263 }