RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/logger/testing/logger_deadlock_test.cc
00001 /* logger_spam_test.cc
00002    Remi Attab, 5 December 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Tests to flush out any potential deadlocks with the log output threads.
00006    Note that fo here is shorthand for FileOutput.
00007 */
00008 
00009 #define BOOST_TEST_MAIN
00010 #define BOOST_TEST_DYN_LINK
00011 
00012 #include "soa/logger/logger.h"
00013 #include "soa/logger/file_output.h"
00014 #include "jml/arch/atomic_ops.h"
00015 
00016 #include <boost/test/unit_test.hpp>
00017 #include <boost/filesystem.hpp>
00018 #include <boost/smart_ptr.hpp>
00019 #include <boost/thread/barrier.hpp>
00020 #include <boost/lexical_cast.hpp>
00021 #include <boost/regex.hpp>
00022 #include <vector>
00023 #include <map>
00024 #include <algorithm>
00025 #include <random>
00026 #include <tuple>
00027 #include <string>
00028 
00029 
00030 using namespace Datacratic;
00031 using namespace ML;
00032 using namespace std;
00033 namespace fs = boost::filesystem;
00034 
00035 
00036 
00037 struct TestFolderFixture {
00038 
00039     TestFolderFixture () : 
00040     testFolder("./logtest-dl")
00041     {
00042     if (fs::exists(testFolder)) {
00043         fs::remove_all(testFolder);
00044     }
00045     BOOST_REQUIRE(fs::create_directory(testFolder));
00046     }
00047 
00048     ~TestFolderFixture () {
00049     BOOST_WARN(fs::remove_all(testFolder) > 0);
00050     }
00051     
00052     fs::path testFolder;
00053 };
00054 
00055 
00056 
00057 struct SimpleNextMessage {
00058     map<int,string> msgCache;
00059 
00060     tuple<string, string> operator () (int index) {
00061     string message; 
00062     int msgSize = index % 100;
00063 
00064     if (index == msgSize) {
00065         message = "asd";
00066         for (int i = 0; i < msgSize; ++i)
00067         message += "dsa";
00068         msgCache[msgSize] = message;
00069     }
00070     else
00071         message = msgCache[msgSize];
00072 
00073     return make_tuple("TEST", message);
00074     };
00075 
00076 } simpleNextMessage;
00077 
00078 
00079 
00080 BOOST_FIXTURE_TEST_CASE(test_fo_logging, TestFolderFixture) {
00081     const int messagesToSend = 100000;
00082 
00083     Logger logger;
00084     boost::barrier barrier(2);
00085 
00086     // Setup the output
00087     std::shared_ptr<FileOutput> fo (new FileOutput((testFolder / "a").string()));
00088     fo->onFileWrite = [&](string channel, size_t bytesWritten) {
00089         if (channel == "KILL") barrier.wait();
00090     };
00091     logger.addOutput(fo); // Accepts everything.
00092 
00093 
00094     logger.start();
00095 
00096     // Start spamming
00097     for (int i = 0; i < messagesToSend; ++i) {
00098     string channel, message;
00099     tie(channel, message) = simpleNextMessage(i);
00100     logger.logMessage(channel, message);
00101     }
00102 
00103     logger.logMessage("KILL", "");
00104     barrier.wait();
00105 
00106     logger.waitUntilFinished();
00107     logger.shutdown();
00108 }
00109 
00110 
00111 
00112 BOOST_FIXTURE_TEST_CASE(test_rotatingfo_logging, TestFolderFixture) {
00113     const int messagesToSend = 100;
00114 
00115     Logger logger;
00116     boost::barrier barrier(2);
00117 
00118     // Setup the output
00119     std::shared_ptr<RotatingFileOutput> rfo (new RotatingFileOutput());
00120     rfo->onFileWrite = [&](string channel, size_t bytesWritten) {
00121         if (channel == "KILL") barrier.wait();
00122     };
00123     rfo->open(testFolder.string() + "/rfo-deadlock-%s.log", "200x"); // rotate every 200ms.
00124     logger.addOutput(rfo); // Accepts everything.
00125 
00126 
00127     logger.start();
00128 
00129     // Start spamming
00130     for (int i = 0; i < messagesToSend; ++i) {
00131     string channel, message;
00132     tie(channel, message) = simpleNextMessage(i);
00133     logger.logMessage(channel, message);
00134     }
00135 
00136     logger.logMessage("KILL", "");
00137     barrier.wait();
00138 
00139     logger.waitUntilFinished();
00140     logger.shutdown();
00141 }
00142 
00143 
00144 
00145 BOOST_FIXTURE_TEST_CASE(test_multi_rotatingfo_logging, TestFolderFixture) {
00146     const int messagesToSend = 1000000;
00147     const int outputThreadCount = 64;
00148 
00149     vector<string> channelList = {
00150         "CHAN_A", "CHAN_B", "CHAN_C", "CHAN_D"
00151     };
00152     vector<string> compressionList = { "", "xz", "gz" };
00153     vector<int> levelList = { -1, 2, -1 } ;
00154 
00155     // Overkill for our simple rand needs but fun none-the-less
00156     mt19937 engine;
00157     uniform_int_distribution<int> regexDist(1, channelList.size()-1);
00158     uniform_int_distribution<int> compressionDist(0, compressionList.size()-1);
00159 
00160     vector< tuple<string, string, int>> outputConfigList;
00161     for (int i = 0; i < outputThreadCount; ++i) {
00162 
00163         int randRegex = regexDist(engine);
00164         string allowRegex = "KILL|CHAN_A|" + channelList[randRegex];
00165 
00166         int randComp = compressionDist(engine);
00167 
00168         auto thConfig = make_tuple(allowRegex, compressionList[randComp], levelList[randComp]);
00169         outputConfigList.push_back(thConfig);
00170     }
00171 
00172     Logger logger;
00173     boost::barrier barrier(outputConfigList.size() + 1);
00174 
00175 
00176     // Setup the output
00177     for (int i = 0; i < outputConfigList.size(); ++i) {
00178         std::shared_ptr<RotatingFileOutput> rfo (new RotatingFileOutput());
00179         rfo->onFileWrite = [&](string channel, size_t bytesWritten) {
00180             if (channel == "KILL") barrier.wait();
00181         };
00182 
00183         int level;
00184         string allowRegex, compression;
00185         tie(allowRegex, compression, level) = outputConfigList[i];
00186 
00187         string path = testFolder.string();
00188         path += "/" + boost::lexical_cast<string>(i) + "-" + compression;
00189         path += "/rfo-deadlock-%s.log";
00190 
00191         rfo->open(path, "2x", compression, level); // rotate every 2ms
00192         logger.addOutput(rfo, boost::regex(allowRegex));
00193     }
00194     
00195 
00196     logger.start();
00197 
00198     auto nextMessage = [&](int index) -> tuple<string, string> {
00199         string message, channel;
00200         tie(channel, message) = simpleNextMessage(index);
00201 
00202         int messagesPerRound = messagesToSend /2;
00203 
00204         // Round robin message distribution for the first half of the test.
00205         if (index < messagesPerRound)
00206             channel = channelList[index % channelList.size()];
00207 
00208         // Spam one channel at the time for the second half of the test.
00209         else {
00210             int adjIndex = index - messagesPerRound;
00211             int messagesForChannel = (messagesPerRound / channelList.size()) + 1;
00212             channel = channelList[adjIndex / messagesForChannel];
00213         }
00214 
00215         return make_tuple(channel, message);
00216     };
00217 
00218     // Start spamming
00219     for (int i = 0; i < messagesToSend; ++i) {
00220     string channel, message;
00221     tie(channel, message) = nextMessage(i);
00222     logger.logMessage(channel, message);
00223     }
00224 
00225     // Cleanup
00226     logger.logMessage("KILL", "");
00227     barrier.wait();
00228 
00229     logger.waitUntilFinished();
00230     logger.shutdown();
00231 }
00232 
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator