RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* message_channel_test.cc -*- C++ -*- 00002 Jeremy Barnes, 24 September 2012 00003 Copyright (c) 2012 Datacratic Inc. All rights reserved. 00004 00005 Test for message channel () 00006 */ 00007 00008 #define BOOST_TEST_MAIN 00009 #define BOOST_TEST_DYN_LINK 00010 00011 #include <boost/test/unit_test.hpp> 00012 #include <boost/make_shared.hpp> 00013 #include "soa/service/named_endpoint.h" 00014 #include "soa/service/message_loop.h" 00015 #include "soa/service/typed_message_channel.h" 00016 #include <sys/socket.h> 00017 #include "jml/utils/guard.h" 00018 #include "jml/arch/exception_handler.h" 00019 #include "jml/utils/testing/watchdog.h" 00020 #include "jml/utils/testing/fd_exhauster.h" 00021 #include "jml/utils/vector_utils.h" 00022 #include "jml/arch/timers.h" 00023 #include <thread> 00024 #include "soa/service/zmq_utils.h" 00025 #include <boost/thread/thread.hpp> 00026 #include "jml/utils/testing/watchdog.h" 00027 00028 00029 using namespace std; 00030 using namespace ML; 00031 using namespace Datacratic; 00032 00033 00034 00035 BOOST_AUTO_TEST_CASE( test_message_channel ) 00036 { 00037 TypedMessageSink<std::string> sink(1000); 00038 00039 int numSent = 0; 00040 int numReceived = 0; 00041 00042 00043 sink.onEvent = [&] (const std::string & str) 00044 { 00045 ML::atomic_inc(numReceived); 00046 }; 00047 00048 volatile bool finished = false; 00049 00050 auto pushThread = [&] () 00051 { 00052 for (unsigned i = 0; i < 1000; ++i) { 00053 sink.push("hello"); 00054 ML::atomic_inc(numSent); 00055 } 00056 }; 00057 00058 auto processThread = [&] () 00059 { 00060 while (!finished) { 00061 sink.processOne(); 00062 } 00063 }; 00064 00065 int numPushThreads = 2; 00066 int numProcessThreads = 1; 00067 00068 for (unsigned i = 0; i < 100; ++i) { 00069 // Test for PLAT-106; the expected behaviour is no deadlock. 00070 ML::Watchdog watchdog(2.0); 00071 00072 finished = false; 00073 00074 boost::thread_group pushThreads; 00075 for (unsigned i = 0; i < numPushThreads; ++i) 00076 pushThreads.create_thread(pushThread); 00077 00078 boost::thread_group processThreads; 00079 for (unsigned i = 0; i < numProcessThreads; ++i) 00080 processThreads.create_thread(processThread); 00081 00082 pushThreads.join_all(); 00083 00084 cerr << "finished push threads" << endl; 00085 00086 finished = true; 00087 00088 processThreads.join_all(); 00089 } 00090 }