1 #include <graphlab/rpc/dc.hpp>
2 #include <graphlab/rpc/dc_dist_object.hpp>
3 #include <graphlab/rpc/dc_init_from_mpi.hpp>
4 #include <graphlab/util/mpi_tools.hpp>
5 #include <graphlab/util/timer.hpp>
6 using namespace graphlab;
8 #define SEND_LIMIT (64 * 1024 * 1024)
9 #define SEND_LIMIT_PRINT "64MB"
22 void receive_ints(
size_t i0,
size_t i1,
size_t i2,
size_t i3) {
27 void receive_vector(
const std::vector<size_t> &s) {
31 void receive_string(
const std::string &s) {
40 void perform_short_sends_0(
size_t number) {
41 for (
size_t i = 0;i < number; ++i) {
42 rmi.remote_call(1, &teststruct::receive_ints, 100,100,1000,5000000);
46 void perform_short_pod_sends_0(
size_t number) {
47 for (
size_t i = 0;i < number; ++i) {
48 rmi.pod_call(1, &teststruct::receive_ints, 100,100,1000,5000000);
53 void perform_long_sends_0(
size_t length,
size_t number) {
54 std::vector<size_t> v(length, 5000000);
55 for (
size_t i = 0;i < number; ++i) {
56 rmi.remote_call(1, &teststruct::receive_vector, v);
61 void perform_string_sends_0(
size_t length,
size_t number) {
62 std::string s(length, 1);
63 for (
size_t i = 0;i < number; ++i) {
64 rmi.remote_call(1, &teststruct::receive_string, s);
70 void print_res(
double t1,
double t2,
double t3) {
71 std::cout <<
"Calls Sent at ";
72 std::cout << SEND_LIMIT / t1 / 1024 / 1024 <<
" MB/s\n";
73 std::cout <<
"Receive Completed at ";
74 std::cout << SEND_LIMIT / t3 / 1024 / 1024 <<
" MB/s\n\n";
77 void run_short_sends_0() {
78 if (rmi.procid() == 1) {
83 std::cout <<
"Single Threaded " << SEND_LIMIT_PRINT <<
" sends, 4 integer blocks\n";
85 size_t numsends = SEND_LIMIT / (
sizeof(size_t) * 4);
86 perform_short_sends_0(numsends);
96 void run_threaded_short_sends_0(
size_t numthreads) {
97 if (rmi.procid() == 1) {
102 std::cout << numthreads <<
" threaded " << SEND_LIMIT_PRINT <<
" sends, 4 integer blocks\n";
105 size_t numsends = SEND_LIMIT / (
sizeof(size_t) * 4 * numthreads);
106 for (
size_t i = 0; i < numthreads; ++i) {
107 thrgrp.
launch(boost::bind(&teststruct::perform_short_sends_0,
this, numsends));
119 void run_short_pod_sends_0() {
120 if (rmi.procid() == 1) {
125 std::cout <<
"Single Threaded "<< SEND_LIMIT_PRINT <<
" POD sends, 4 integers\n";
127 size_t numsends = SEND_LIMIT / (
sizeof(size_t) * 4);
128 perform_short_pod_sends_0(numsends);
138 void run_threaded_short_pod_sends_0(
size_t numthreads) {
139 if (rmi.procid() == 1) {
144 std::cout << numthreads <<
" threaded "<< SEND_LIMIT_PRINT <<
" POD sends, 4 integers\n";
145 size_t numsends = SEND_LIMIT / (
sizeof(size_t) * 4 * numthreads);
148 for (
size_t i = 0; i < numthreads; ++i) {
149 thrgrp.
launch(boost::bind(&teststruct::perform_short_pod_sends_0,
this, numsends));
163 void run_string_sends_0(
size_t length) {
164 if (rmi.procid() == 1) {
168 rmi.dc().stop_handler_threads(1,1);
170 std::cout <<
"Single Threaded " << SEND_LIMIT_PRINT <<
" sends, " << length <<
" bytes\n";
172 size_t numsends = SEND_LIMIT / (length);
174 perform_string_sends_0(length, numsends);
175 size_t rd2 = rdtsc();
176 std::cout << (rd2 - rd) / numsends <<
" cycles per call\n";
178 rmi.dc().start_handler_threads(1,1);
187 void run_threaded_string_sends_0(
size_t length,
size_t numthreads) {
188 if (rmi.procid() == 1) {
193 std::cout << numthreads <<
" threaded " << SEND_LIMIT_PRINT <<
" sends, "
194 << length <<
" bytes\n";
196 size_t numsends = SEND_LIMIT / (length * numthreads);
197 rmi.dc().stop_handler_threads(1,1);
200 for (
size_t i = 0; i < numthreads; ++i) {
201 thrgrp.
launch(boost::bind(&teststruct::perform_string_sends_0,
this, length, numsends));
204 size_t rd2 = rdtsc();
205 std::cout << (rd2 - rd) / (numthreads * numsends) <<
" cycles per call\n";
207 rmi.dc().start_handler_threads(1,1);
218 int main(
int argc,
char** argv) {
220 mpi_tools::init(argc, argv);
224 std::cout <<
"Run with exactly 2 MPI nodes.\n";
251 for (
size_t i = 4; i < 24; ++i) {
252 ts.run_string_sends_0(1<<i);
256 ts.run_threaded_string_sends_0(16, 1);
257 ts.run_threaded_string_sends_0(16, 2);
258 ts.run_threaded_string_sends_0(16, 4);
259 ts.run_threaded_string_sends_0(16, 8);
260 ts.run_threaded_string_sends_0(16, 16);
262 mpi_tools::finalize();