RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/testing/message_channel_test.cc
00001 /* message_channel_test.cc                                         -*- C++ -*-
00002    Jeremy Barnes, 24 September 2012
00003    Copyright (c) 2012 Datacratic Inc.  All rights reserved.
00004 
00005    Test for message channel ()
00006 */
00007 
00008 #define BOOST_TEST_MAIN
00009 #define BOOST_TEST_DYN_LINK
00010 
00011 #include <boost/test/unit_test.hpp>
00012 #include <boost/make_shared.hpp>
00013 #include "soa/service/named_endpoint.h"
00014 #include "soa/service/message_loop.h"
00015 #include "soa/service/typed_message_channel.h"
00016 #include <sys/socket.h>
00017 #include "jml/utils/guard.h"
00018 #include "jml/arch/exception_handler.h"
00019 #include "jml/utils/testing/watchdog.h"
00020 #include "jml/utils/testing/fd_exhauster.h"
00021 #include "jml/utils/vector_utils.h"
00022 #include "jml/arch/timers.h"
00023 #include <thread>
00024 #include "soa/service/zmq_utils.h"
00025 #include <boost/thread/thread.hpp>
00026 #include "jml/utils/testing/watchdog.h"
00027 
00028 
00029 using namespace std;
00030 using namespace ML;
00031 using namespace Datacratic;
00032 
00033 
00034 
00035 BOOST_AUTO_TEST_CASE( test_message_channel )
00036 {
00037     TypedMessageSink<std::string> sink(1000);
00038     
00039     int numSent = 0;
00040     int numReceived = 0;
00041     
00042     
00043     sink.onEvent = [&] (const std::string & str)
00044         {
00045             ML::atomic_inc(numReceived);
00046         };
00047 
00048     volatile bool finished = false;
00049 
00050     auto pushThread = [&] ()
00051         {
00052             for (unsigned i = 0;  i < 1000;  ++i) {
00053                 sink.push("hello");
00054                 ML::atomic_inc(numSent);
00055             }
00056         };
00057 
00058     auto processThread = [&] ()
00059         {
00060             while (!finished) {
00061                 sink.processOne();
00062             }
00063         };
00064 
00065     int numPushThreads = 2;
00066     int numProcessThreads = 1;
00067 
00068     for (unsigned i = 0;  i < 100;  ++i) {
00069         // Test for PLAT-106; the expected behaviour is no deadlock.
00070         ML::Watchdog watchdog(2.0);
00071 
00072         finished = false;
00073 
00074         boost::thread_group pushThreads;
00075         for (unsigned i = 0;  i < numPushThreads;  ++i)
00076             pushThreads.create_thread(pushThread);
00077 
00078         boost::thread_group processThreads;
00079         for (unsigned i = 0;  i < numProcessThreads;  ++i)
00080             processThreads.create_thread(processThread);
00081     
00082         pushThreads.join_all();
00083 
00084         cerr << "finished push threads" << endl;
00085     
00086         finished = true;
00087 
00088         processThreads.join_all();
00089     }
00090 }
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator