RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/testing/zmq_tcp_bench.cc
00001 /* zmq_tcp_bench.cc
00002    Wolfgang Sourdeau - April 2013 */
00003 
00004 #define BOOST_TEST_MAIN
00005 #define BOOST_TEST_DYN_LINK
00006 
00007 #include <fcntl.h>
00008 #include <unistd.h>
00009 #include <sys/epoll.h>
00010 #include <sys/types.h>
00011 #include <sys/socket.h>
00012 
00013 #include <string>
00014 
00015 #include <boost/test/unit_test.hpp>
00016 
00017 #include "jml/arch/exception.h"
00018 #include "jml/arch/futex.h"
00019 
00020 #include "soa/service/service_base.h"
00021 #include "soa/service/rest_service_endpoint.h"
00022 
00023 const int NbrMsgs = 1000000;
00024 
00025 using namespace std;
00026 using namespace ML;
00027 
00028 using namespace Datacratic;
00029 
00030 #if 1
00031 BOOST_AUTO_TEST_CASE( test_zmq )
00032 {
00033     MessageLoop mainLoop;
00034     
00035     auto proxies = make_shared<ServiceProxies>();
00036     int recvMsgs(0), sendMsgs(0);
00037     struct timeval start, end;
00038 
00039     ZmqNamedEndpoint server(proxies->zmqContext);
00040     server.init(proxies->config, ZMQ_XREP, "server");
00041     auto onServerMessage = [&] (vector<zmq::message_t> && messages) {
00042         const zmq::message_t & msg = messages[1];
00043         string message((const char *)msg.data(),
00044                        ((const char *)msg.data()) + msg.size());
00045         string expected("test" + to_string(recvMsgs));
00046         ExcAssertEqual(message, expected);
00047         recvMsgs++;
00048         if (recvMsgs == sendMsgs) {
00049             futex_wake(recvMsgs);
00050         }
00051     };
00052     server.rawMessageHandler = onServerMessage;
00053     server.bindTcp();
00054     mainLoop.addSource("server", server);
00055 
00056     proxies->config->dump(cerr);
00057 
00058     ZmqNamedProxy client(proxies->zmqContext);
00059     client.init(proxies->config, ZMQ_XREQ, "client");
00060     mainLoop.addSource("client", client);
00061 
00062     client.connect("server");
00063 
00064     cerr << "awaiting connection\n";
00065     while (!client.isConnected()) {
00066         ML::sleep(0.1);
00067     }
00068 
00069     mainLoop.start();
00070     cerr << "connected and sending\n";
00071 
00072     gettimeofday(&start, NULL);
00073 
00074     for (int i = 0; i < NbrMsgs; i++) {
00075         client.sendMessage("test" + to_string(sendMsgs));
00076         sendMsgs++;
00077     }
00078 
00079     while (recvMsgs < sendMsgs) {
00080         // cerr << "awaiting end of messages: " << recvMsgs << "\n";
00081         ML::futex_wait(recvMsgs, recvMsgs);
00082     }
00083     cerr << "zmq test: received messages: " << recvMsgs << "\n";
00084 
00085     gettimeofday(&end, NULL);
00086 
00087     int delta_sec = (end.tv_sec - start.tv_sec);
00088     if (start.tv_usec > end.tv_usec) {
00089         delta_sec--;
00090         end.tv_usec += 1000000;
00091     }
00092     printf ("delta: %d.%.6ld\n", delta_sec, (end.tv_usec - start.tv_usec));
00093 }
00094 #endif
00095 
00096 #if 1
00097 #include "tcpsockets.h"
00098 
00099 BOOST_AUTO_TEST_CASE( test_unix_tcp )
00100 {
00101     auto proxies = make_shared<ServiceProxies>();
00102     MessageLoop mainLoop;
00103     int recvMsgs(0), sendMsgs(0);
00104     struct timeval start, end;
00105 
00106     TcpNamedEndpoint server;
00107     server.init(proxies->config, "server");
00108     auto onServerMessage = [&] (const string & message) {
00109         // cerr << "received tcp message: " << message << endl;
00110         string expected("test" + to_string(recvMsgs));
00111         ExcAssertEqual(message, expected);
00112         recvMsgs++;
00113         if (recvMsgs == sendMsgs) {
00114             futex_wake(recvMsgs);
00115         }
00116     };
00117     server.onMessage_ = onServerMessage;
00118     server.bindTcp(9876);
00119     ML::sleep(1);
00120 
00121     TcpNamedProxy client;
00122     client.init(proxies->config);
00123 
00124     mainLoop.addSource("server", server);
00125     mainLoop.addSource("client", client);
00126     mainLoop.start();
00127 
00128     client.connectTo("127.0.0.1", 9876);
00129 
00130     cerr << "awaiting connection\n";
00131     while (!client.isConnected()) {
00132         ML::sleep(0.1);
00133     }
00134     cerr << "connected and sending\n";
00135 
00136     gettimeofday(&start, NULL);
00137 
00138     for (sendMsgs = 0; sendMsgs < NbrMsgs;) {
00139         if (client.sendMessage("test" + to_string(sendMsgs))) {
00140             sendMsgs++;
00141         }
00142         // else {
00143         //     ML::sleep(0.1);
00144         // }
00145     }
00146 
00147     cerr << "sent " << sendMsgs << " messages\n";
00148     while (recvMsgs < sendMsgs) {
00149         // cerr << "awaiting end of messages: " << recvMsgs << "\n";
00150         ML::futex_wait(recvMsgs, recvMsgs);
00151     }
00152     cerr << "tcp test: received messages: " << recvMsgs << "\n";
00153 
00154     gettimeofday(&end, NULL);
00155 
00156     int delta_sec = (end.tv_sec - start.tv_sec);
00157     if (start.tv_usec > end.tv_usec) {
00158         delta_sec--;
00159         end.tv_usec += 1000000;
00160     }
00161     printf ("delta: %d.%.6ld\n", delta_sec, (end.tv_usec - start.tv_usec));
00162 }
00163 #endif
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator