![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* logger_spam_test.cc 00002 Remi Attab, 5 December 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Tests to flush out any potential deadlocks with the log output threads. 00006 Note that fo here is shorthand for FileOutput. 00007 */ 00008 00009 #define BOOST_TEST_MAIN 00010 #define BOOST_TEST_DYN_LINK 00011 00012 #include "soa/logger/logger.h" 00013 #include "soa/logger/file_output.h" 00014 #include "jml/arch/atomic_ops.h" 00015 00016 #include <boost/test/unit_test.hpp> 00017 #include <boost/filesystem.hpp> 00018 #include <boost/smart_ptr.hpp> 00019 #include <boost/thread/barrier.hpp> 00020 #include <boost/lexical_cast.hpp> 00021 #include <boost/regex.hpp> 00022 #include <vector> 00023 #include <map> 00024 #include <algorithm> 00025 #include <random> 00026 #include <tuple> 00027 #include <string> 00028 00029 00030 using namespace Datacratic; 00031 using namespace ML; 00032 using namespace std; 00033 namespace fs = boost::filesystem; 00034 00035 00036 00037 struct TestFolderFixture { 00038 00039 TestFolderFixture () : 00040 testFolder("./logtest-dl") 00041 { 00042 if (fs::exists(testFolder)) { 00043 fs::remove_all(testFolder); 00044 } 00045 BOOST_REQUIRE(fs::create_directory(testFolder)); 00046 } 00047 00048 ~TestFolderFixture () { 00049 BOOST_WARN(fs::remove_all(testFolder) > 0); 00050 } 00051 00052 fs::path testFolder; 00053 }; 00054 00055 00056 00057 struct SimpleNextMessage { 00058 map<int,string> msgCache; 00059 00060 tuple<string, string> operator () (int index) { 00061 string message; 00062 int msgSize = index % 100; 00063 00064 if (index == msgSize) { 00065 message = "asd"; 00066 for (int i = 0; i < msgSize; ++i) 00067 message += "dsa"; 00068 msgCache[msgSize] = message; 00069 } 00070 else 00071 message = msgCache[msgSize]; 00072 00073 return make_tuple("TEST", message); 00074 }; 00075 00076 } simpleNextMessage; 00077 00078 00079 00080 BOOST_FIXTURE_TEST_CASE(test_fo_logging, TestFolderFixture) { 00081 const int messagesToSend = 100000; 00082 00083 Logger logger; 00084 boost::barrier barrier(2); 00085 00086 // Setup the output 00087 std::shared_ptr<FileOutput> fo (new FileOutput((testFolder / "a").string())); 00088 fo->onFileWrite = [&](string channel, size_t bytesWritten) { 00089 if (channel == "KILL") barrier.wait(); 00090 }; 00091 logger.addOutput(fo); // Accepts everything. 00092 00093 00094 logger.start(); 00095 00096 // Start spamming 00097 for (int i = 0; i < messagesToSend; ++i) { 00098 string channel, message; 00099 tie(channel, message) = simpleNextMessage(i); 00100 logger.logMessage(channel, message); 00101 } 00102 00103 logger.logMessage("KILL", ""); 00104 barrier.wait(); 00105 00106 logger.waitUntilFinished(); 00107 logger.shutdown(); 00108 } 00109 00110 00111 00112 BOOST_FIXTURE_TEST_CASE(test_rotatingfo_logging, TestFolderFixture) { 00113 const int messagesToSend = 100; 00114 00115 Logger logger; 00116 boost::barrier barrier(2); 00117 00118 // Setup the output 00119 std::shared_ptr<RotatingFileOutput> rfo (new RotatingFileOutput()); 00120 rfo->onFileWrite = [&](string channel, size_t bytesWritten) { 00121 if (channel == "KILL") barrier.wait(); 00122 }; 00123 rfo->open(testFolder.string() + "/rfo-deadlock-%s.log", "200x"); // rotate every 200ms. 00124 logger.addOutput(rfo); // Accepts everything. 00125 00126 00127 logger.start(); 00128 00129 // Start spamming 00130 for (int i = 0; i < messagesToSend; ++i) { 00131 string channel, message; 00132 tie(channel, message) = simpleNextMessage(i); 00133 logger.logMessage(channel, message); 00134 } 00135 00136 logger.logMessage("KILL", ""); 00137 barrier.wait(); 00138 00139 logger.waitUntilFinished(); 00140 logger.shutdown(); 00141 } 00142 00143 00144 00145 BOOST_FIXTURE_TEST_CASE(test_multi_rotatingfo_logging, TestFolderFixture) { 00146 const int messagesToSend = 1000000; 00147 const int outputThreadCount = 64; 00148 00149 vector<string> channelList = { 00150 "CHAN_A", "CHAN_B", "CHAN_C", "CHAN_D" 00151 }; 00152 vector<string> compressionList = { "", "xz", "gz" }; 00153 vector<int> levelList = { -1, 2, -1 } ; 00154 00155 // Overkill for our simple rand needs but fun none-the-less 00156 mt19937 engine; 00157 uniform_int_distribution<int> regexDist(1, channelList.size()-1); 00158 uniform_int_distribution<int> compressionDist(0, compressionList.size()-1); 00159 00160 vector< tuple<string, string, int>> outputConfigList; 00161 for (int i = 0; i < outputThreadCount; ++i) { 00162 00163 int randRegex = regexDist(engine); 00164 string allowRegex = "KILL|CHAN_A|" + channelList[randRegex]; 00165 00166 int randComp = compressionDist(engine); 00167 00168 auto thConfig = make_tuple(allowRegex, compressionList[randComp], levelList[randComp]); 00169 outputConfigList.push_back(thConfig); 00170 } 00171 00172 Logger logger; 00173 boost::barrier barrier(outputConfigList.size() + 1); 00174 00175 00176 // Setup the output 00177 for (int i = 0; i < outputConfigList.size(); ++i) { 00178 std::shared_ptr<RotatingFileOutput> rfo (new RotatingFileOutput()); 00179 rfo->onFileWrite = [&](string channel, size_t bytesWritten) { 00180 if (channel == "KILL") barrier.wait(); 00181 }; 00182 00183 int level; 00184 string allowRegex, compression; 00185 tie(allowRegex, compression, level) = outputConfigList[i]; 00186 00187 string path = testFolder.string(); 00188 path += "/" + boost::lexical_cast<string>(i) + "-" + compression; 00189 path += "/rfo-deadlock-%s.log"; 00190 00191 rfo->open(path, "2x", compression, level); // rotate every 2ms 00192 logger.addOutput(rfo, boost::regex(allowRegex)); 00193 } 00194 00195 00196 logger.start(); 00197 00198 auto nextMessage = [&](int index) -> tuple<string, string> { 00199 string message, channel; 00200 tie(channel, message) = simpleNextMessage(index); 00201 00202 int messagesPerRound = messagesToSend /2; 00203 00204 // Round robin message distribution for the first half of the test. 00205 if (index < messagesPerRound) 00206 channel = channelList[index % channelList.size()]; 00207 00208 // Spam one channel at the time for the second half of the test. 00209 else { 00210 int adjIndex = index - messagesPerRound; 00211 int messagesForChannel = (messagesPerRound / channelList.size()) + 1; 00212 channel = channelList[adjIndex / messagesForChannel]; 00213 } 00214 00215 return make_tuple(channel, message); 00216 }; 00217 00218 // Start spamming 00219 for (int i = 0; i < messagesToSend; ++i) { 00220 string channel, message; 00221 tie(channel, message) = nextMessage(i); 00222 logger.logMessage(channel, message); 00223 } 00224 00225 // Cleanup 00226 logger.logMessage("KILL", ""); 00227 barrier.wait(); 00228 00229 logger.waitUntilFinished(); 00230 logger.shutdown(); 00231 } 00232