RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* zmq_tcp_bench.cc 00002 Wolfgang Sourdeau - April 2013 */ 00003 00004 #define BOOST_TEST_MAIN 00005 #define BOOST_TEST_DYN_LINK 00006 00007 #include <fcntl.h> 00008 #include <unistd.h> 00009 #include <sys/epoll.h> 00010 #include <sys/types.h> 00011 #include <sys/socket.h> 00012 00013 #include <string> 00014 00015 #include <boost/test/unit_test.hpp> 00016 00017 #include "jml/arch/exception.h" 00018 #include "jml/arch/futex.h" 00019 00020 #include "soa/service/service_base.h" 00021 #include "soa/service/rest_service_endpoint.h" 00022 00023 const int NbrMsgs = 1000000; 00024 00025 using namespace std; 00026 using namespace ML; 00027 00028 using namespace Datacratic; 00029 00030 #if 1 00031 BOOST_AUTO_TEST_CASE( test_zmq ) 00032 { 00033 MessageLoop mainLoop; 00034 00035 auto proxies = make_shared<ServiceProxies>(); 00036 int recvMsgs(0), sendMsgs(0); 00037 struct timeval start, end; 00038 00039 ZmqNamedEndpoint server(proxies->zmqContext); 00040 server.init(proxies->config, ZMQ_XREP, "server"); 00041 auto onServerMessage = [&] (vector<zmq::message_t> && messages) { 00042 const zmq::message_t & msg = messages[1]; 00043 string message((const char *)msg.data(), 00044 ((const char *)msg.data()) + msg.size()); 00045 string expected("test" + to_string(recvMsgs)); 00046 ExcAssertEqual(message, expected); 00047 recvMsgs++; 00048 if (recvMsgs == sendMsgs) { 00049 futex_wake(recvMsgs); 00050 } 00051 }; 00052 server.rawMessageHandler = onServerMessage; 00053 server.bindTcp(); 00054 mainLoop.addSource("server", server); 00055 00056 proxies->config->dump(cerr); 00057 00058 ZmqNamedProxy client(proxies->zmqContext); 00059 client.init(proxies->config, ZMQ_XREQ, "client"); 00060 mainLoop.addSource("client", client); 00061 00062 client.connect("server"); 00063 00064 cerr << "awaiting connection\n"; 00065 while (!client.isConnected()) { 00066 ML::sleep(0.1); 00067 } 00068 00069 mainLoop.start(); 00070 cerr << "connected and sending\n"; 00071 00072 gettimeofday(&start, NULL); 00073 00074 for (int i = 0; i < NbrMsgs; i++) { 00075 client.sendMessage("test" + to_string(sendMsgs)); 00076 sendMsgs++; 00077 } 00078 00079 while (recvMsgs < sendMsgs) { 00080 // cerr << "awaiting end of messages: " << recvMsgs << "\n"; 00081 ML::futex_wait(recvMsgs, recvMsgs); 00082 } 00083 cerr << "zmq test: received messages: " << recvMsgs << "\n"; 00084 00085 gettimeofday(&end, NULL); 00086 00087 int delta_sec = (end.tv_sec - start.tv_sec); 00088 if (start.tv_usec > end.tv_usec) { 00089 delta_sec--; 00090 end.tv_usec += 1000000; 00091 } 00092 printf ("delta: %d.%.6ld\n", delta_sec, (end.tv_usec - start.tv_usec)); 00093 } 00094 #endif 00095 00096 #if 1 00097 #include "tcpsockets.h" 00098 00099 BOOST_AUTO_TEST_CASE( test_unix_tcp ) 00100 { 00101 auto proxies = make_shared<ServiceProxies>(); 00102 MessageLoop mainLoop; 00103 int recvMsgs(0), sendMsgs(0); 00104 struct timeval start, end; 00105 00106 TcpNamedEndpoint server; 00107 server.init(proxies->config, "server"); 00108 auto onServerMessage = [&] (const string & message) { 00109 // cerr << "received tcp message: " << message << endl; 00110 string expected("test" + to_string(recvMsgs)); 00111 ExcAssertEqual(message, expected); 00112 recvMsgs++; 00113 if (recvMsgs == sendMsgs) { 00114 futex_wake(recvMsgs); 00115 } 00116 }; 00117 server.onMessage_ = onServerMessage; 00118 server.bindTcp(9876); 00119 ML::sleep(1); 00120 00121 TcpNamedProxy client; 00122 client.init(proxies->config); 00123 00124 mainLoop.addSource("server", server); 00125 mainLoop.addSource("client", client); 00126 mainLoop.start(); 00127 00128 client.connectTo("127.0.0.1", 9876); 00129 00130 cerr << "awaiting connection\n"; 00131 while (!client.isConnected()) { 00132 ML::sleep(0.1); 00133 } 00134 cerr << "connected and sending\n"; 00135 00136 gettimeofday(&start, NULL); 00137 00138 for (sendMsgs = 0; sendMsgs < NbrMsgs;) { 00139 if (client.sendMessage("test" + to_string(sendMsgs))) { 00140 sendMsgs++; 00141 } 00142 // else { 00143 // ML::sleep(0.1); 00144 // } 00145 } 00146 00147 cerr << "sent " << sendMsgs << " messages\n"; 00148 while (recvMsgs < sendMsgs) { 00149 // cerr << "awaiting end of messages: " << recvMsgs << "\n"; 00150 ML::futex_wait(recvMsgs, recvMsgs); 00151 } 00152 cerr << "tcp test: received messages: " << recvMsgs << "\n"; 00153 00154 gettimeofday(&end, NULL); 00155 00156 int delta_sec = (end.tv_sec - start.tv_sec); 00157 if (start.tv_usec > end.tv_usec) { 00158 delta_sec--; 00159 end.tv_usec += 1000000; 00160 } 00161 printf ("delta: %d.%.6ld\n", delta_sec, (end.tv_usec - start.tv_usec)); 00162 } 00163 #endif