RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/logger/logger.cc
00001 /* logger.cc
00002    Jeremy Barnes, 19 May 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Various classes for logging of the RTB data.
00006 */
00007 
00008 #include "logger.h"
00009 #include "jml/utils/vector_utils.h"
00010 #include "jml/arch/atomic_ops.h"
00011 #include "jml/arch/demangle.h"
00012 #include "jml/utils/string_functions.h"
00013 #include "jml/arch/timers.h"
00014 #include "file_output.h"
00015 #include "publish_output.h"
00016 #include "callback_output.h"
00017 #include <boost/make_shared.hpp>
00018 
00019 
00020 using namespace std;
00021 using namespace ML;
00022 
00023 
00024 namespace Datacratic {
00025 
00026 
00027 /*****************************************************************************/
00028 /* LOG OUTPUT                                                                */
00029 /*****************************************************************************/
00030 
00031 Json::Value
00032 LogOutput::
00033 stats() const
00034 {
00035     return Json::Value();
00036 }
00037 
00038 void
00039 LogOutput::
00040 clearStats()
00041 {
00042 }
00043 
00044 
00045 /*****************************************************************************/
00046 /* LOGGER                                                                    */
00047 /*****************************************************************************/
00048 
00049 Logger::
00050 Logger()
00051     : context(std::make_shared<zmq::context_t>(1)),
00052       messages(65536),
00053       outputs(0),
00054       messagesSent(0), messagesDone(0)
00055 {
00056     doShutdown = false;
00057 }
00058 
00059 Logger::
00060 Logger(zmq::context_t & contextRef)
00061     : context(ML::make_unowned_std_sp(contextRef)),
00062       messages(65536),
00063       outputs(0),
00064       messagesSent(0), messagesDone(0)
00065 {
00066     doShutdown = false;
00067 }
00068 
00069 Logger::
00070 Logger(std::shared_ptr<zmq::context_t> & context)
00071     : context(context),
00072       messages(65536),
00073       outputs(0),
00074       messagesSent(0), messagesDone(0)
00075 {
00076     doShutdown = false;
00077 }
00078 
00079 Logger::
00080 ~Logger()
00081 {
00082     shutdown();
00083 }
00084 
00085 void
00086 Logger::
00087 init()
00088 {
00089     messageLoop.init();
00090 
00091     messages.onEvent = [=](std::vector<std::string> && message) {
00092         handleListenerMessage(message);
00093     };
00094 
00095     messageLoop.addSource("Logger::messages", messages);
00096 }
00097 
00098 void
00099 Logger::
00100 subscribe(const std::string & uri,
00101           const std::vector<std::string> & channels,
00102           const std::string & identity)
00103 {
00104 #if 0
00105     if (logThread)
00106         throw ML::Exception("must subscribe before log thread starts");
00107 #endif
00108 
00109     //using namespace std;
00110     //cerr << "subscribing to " << uri << " on " << channels.size()
00111     //     << " channels "
00112     //     << channels << endl;
00113 
00114     auto subscription = std::make_shared<zmq::socket_t>(*context, ZMQ_SUB);
00115 
00116     setHwm(*subscription, 100000);
00117 
00118     if (identity != "")
00119         setIdentity(*subscription, identity);
00120     subscription->connect(uri.c_str());
00121 
00122     if (channels.empty())
00123         subscribeChannel(*subscription, "");
00124     else {
00125         for (auto it = channels.begin(), end = channels.end(); it != end;
00126              ++it)
00127             subscribeChannel(*subscription, *it);
00128     }
00129 
00130     subscriptions.push_back(subscription);
00131 
00132     messageLoop.addSource("Logger::" + identity,
00133                           std::make_shared<ZmqBinaryEventSource>
00134                           (*subscription, [=] (std::vector<zmq::message_t> && message)
00135                            {
00136                                this->handleMessage(std::move(message));
00137                            }));
00138 }
00139 
00141 struct Logger::Output {
00142     Output()
00143         : logProbability(1.0)
00144     {
00145     }
00146     
00147     Output(const boost::regex & allowChannels,
00148            const boost::regex & denyChannels,
00149            std::shared_ptr<LogOutput> output,
00150            double logProbability)
00151         : allowChannels(allowChannels), denyChannels(denyChannels),
00152           output(output), logProbability(logProbability)
00153     {
00154     }
00155     
00156     boost::regex allowChannels;  // channels to match
00157     boost::regex denyChannels;  // channels to filter out
00158     std::shared_ptr<LogOutput> output;  // thing to write to
00159     double logProbability;
00160 };
00161 
00163 struct Logger::Outputs : public std::vector<Output> {
00164     Outputs()
00165         : old(0)
00166     {
00167     }
00168     
00169     Outputs(Outputs * old,
00170             const Output & toAdd)
00171         : old(old)
00172     {
00173         if (old) {
00174             reserve(old->size() + 1);
00175             insert(begin(), old->begin(), old->end());
00176         }
00177 
00178         push_back(toAdd);
00179     }
00180 
00181     ~Outputs()
00182     {
00183         if (old) delete old;
00184     }
00185     
00186     void logMessage(const std::string & channel,
00187                     const std::string & message)
00188     {
00189         for (auto it = begin(); it != end();  ++it) {
00190             try {
00191                 //cerr << "channel = " << channel << endl;
00192                 //cerr << "output = " << ML::type_name(*it->output) << endl;
00193                 //cerr << "it->allowChannels = " << it->allowChannels.str() << endl;
00194                 //cerr << "it->denyChannels = " << it->denyChannels.str() << endl;
00195                 if (it->allowChannels.empty()
00196                     || boost::regex_match(channel, it->allowChannels)) {
00197                     //cerr << "   allow" << endl;
00198                     if (it->denyChannels.empty()
00199                         || !boost::regex_match(channel, it->denyChannels)) {
00200 
00201                         if (it->logProbability == 1.0
00202                             || ((random() % 100000)
00203                                 < (it->logProbability * 100000))) {
00204                             it->output->logMessage(channel, message);
00205                         }
00206                         //cerr << "  *** log" << endl;
00207                     }
00208                 }
00209             } catch (const std::exception & exc) {
00210                 cerr << "error: writing message to channel " << channel
00211                      << " with output " << ML::type_name(*it->output)
00212                      << ": " << exc.what() << "; message = "
00213                      << message << endl;
00214             }
00215         }
00216     }
00217     
00218     Outputs * old;   // to allow cleanup
00219 };
00220 
00221 bool startsWith(std::string & s,
00222                 const std::string & prefix)
00223 {
00224     if (s.find(prefix) == 0) {
00225         s.erase(0, prefix.size());
00226         return true;
00227     }
00228     return false;
00229 }
00230 
00231 void
00232 Logger::
00233 logTo(const std::string & uri,
00234       const boost::regex & allowChannels,
00235       const boost::regex & denyChannels,
00236       double logProbability)
00237 {
00238     string rest = uri;
00239     if (startsWith(rest, "file://"))
00240         addOutput(ML::make_std_sp(new FileOutput(rest)),
00241                   allowChannels, denyChannels, logProbability);
00242     else if (startsWith(rest, "pub://")) {
00243         auto output = ML::make_std_sp(new PublishOutput(context));
00244         output->bind(rest);
00245         addOutput(output, allowChannels, denyChannels, logProbability);
00246     }
00247     else throw Exception("don't know how to interpret output " + uri);
00248 }
00249 
00250 void
00251 Logger::
00252 addOutput(std::shared_ptr<LogOutput> output,
00253           const boost::regex & allowChannels,
00254           const boost::regex & denyChannels,
00255           double logProbability)
00256 {
00257     Outputs * current = outputs;
00258 
00259     for (;;) {
00260         auto_ptr<Outputs> newOutputs
00261             (new Outputs(current, Output(allowChannels, denyChannels, output,
00262                                          logProbability)));
00263         if (ML::cmp_xchg(outputs, current, newOutputs.get())) {
00264             newOutputs.release();
00265             break;
00266         }
00267     }
00268 }
00269 
00270 void
00271 Logger::
00272 addCallback(boost::function<void (std::string, std::string)> callback,
00273             const boost::regex & allowChannels,
00274             const boost::regex & denyChannels,
00275             double logProbability)
00276 {
00277     addOutput(std::make_shared<CallbackOutput>(callback),
00278               allowChannels, denyChannels, logProbability);
00279 }
00280 
00281 void
00282 Logger::
00283 clearOutputs()
00284 {
00285     auto_ptr<Outputs> newOutputs(new Outputs());
00286 
00287     Outputs * current = outputs;
00288 
00289     for (;;) {
00290         newOutputs->old = current;
00291         
00292         if (ML::cmp_xchg(outputs, current, newOutputs.get())) break;
00293     }
00294 
00295     newOutputs.release();
00296 }
00297 
00298 void
00299 Logger::
00300 start(std::function<void ()> onStop)
00301 {
00302     messagesSent = messagesDone = 0;
00303     doShutdown = false;
00304 
00305     messageLoop.start(onStop);
00306 
00307 
00308 #if 0
00309     ACE_Semaphore sem(0);
00310         
00311     // NOTE: we can pass by reference since the log thread never touches
00312     // sem until this function has exited
00313     logThread.reset(new boost::thread([&](){ this->runLogThread(sem); }));
00314 
00315     // Wait until we're ready
00316     sem.acquire();
00317 #endif
00318 }
00319 
00320 void
00321 Logger::
00322 waitUntilFinished()
00323 {
00324     while (messagesDone < messagesSent) {
00325         //cerr << "sent " << messagesSent << " done "
00326         //     << messagesDone << endl;
00327         ML::sleep(0.01);
00328     }
00329 
00330     //cerr << "finished: sent " << messagesSent << " done "
00331     //     << messagesDone << endl;
00332 }
00333 
00334 void
00335 Logger::
00336 shutdown()
00337 {
00338     messageLoop.shutdown();
00339 
00340     doShutdown = true;
00341 
00342     delete outputs;  outputs = 0;
00343 
00344     doShutdown = false;
00345 }
00346 
00347 void
00348 Logger::
00349 replay(const std::string & filename, ssize_t maxEvents)
00350 {
00351     if (!outputs) return;
00352 
00353     filter_istream stream(filename);
00354 
00355     for (ssize_t i = 0;  stream && (maxEvents == -1 || i < maxEvents);  ++i) {
00356         string line;
00357         getline(stream, line);
00358         atomic_add(messagesSent, 1);
00359         messages.push({ line });
00360     }
00361 
00362     cerr << "replay: sent " << messagesSent << " done: "
00363          << messagesDone << endl;
00364 }
00365 
00366 void
00367 Logger::
00368 replayDirect(const std::string & filename, ssize_t maxEvents) const
00369 {
00370 #if 0
00371     if (logThread)
00372         throw ML::Exception("log thread already up for replayDirect");
00373 #endif
00374 
00375     if (!outputs) return;
00376 
00377     filter_istream stream(filename);
00378 
00379     for (ssize_t i = 0;  stream && (maxEvents == -1 || i < maxEvents);  ++i) {
00380         string line;
00381         getline(stream, line);
00382         atomic_add(messagesSent, 1);
00383 
00384         Outputs * current = outputs;
00385         
00386         if (!current) continue;
00387         
00388         string channel, content;
00389         string::size_type pos = line.find('\t');
00390 
00391         if (pos != string::npos) {
00392             channel = string(line, 0, pos);
00393             content = string(line, pos + 1);
00394         }
00395         
00396         current->logMessage(channel, content);
00397     }
00398 
00399     cerr << "replay: sent " << messagesSent << " done: "
00400          << messagesDone << endl;
00401 }
00402 
00403 void
00404 Logger::
00405 handleListenerMessage(std::vector<std::string> const & message)
00406 {
00407     Outputs * current = outputs;
00408         
00409     if (!current) return;
00410 
00411     if (current->empty()) {
00412         current = 0;  // TODO: delete it
00413     }
00414     else if (current->old) {
00415         delete current->old;
00416         current->old = 0;
00417     }
00418 
00419     if (message.size() == 1 && message[0] == "SHUTDOWN")
00420         return;
00421 
00422     atomic_add(messagesDone, 1);
00423 
00424     if (!current) return;
00425 
00426     string const & channel = message[0];
00427 
00428     string toLog;
00429     toLog.reserve(1024);
00430 
00431     for (unsigned i = 1;  i < message.size();  ++i) {
00432         string const & strMessage = message[i];
00433         if (strMessage.find_first_of("\n\t\0\r") != string::npos) {
00434             cerr << "warning: part " << i << " of message "
00435                  << channel << " has illegal char: '"
00436                  << strMessage << "'" << endl;
00437         }
00438         if (i > 1) toLog += '\t';
00439         toLog += strMessage;
00440     }
00441 
00442     current->logMessage(channel, toLog);
00443 }
00444 
00445 void
00446 Logger::
00447 handleRawListenerMessage(std::vector<std::string> const & message)
00448 {
00449     Outputs * current = outputs;
00450         
00451     if (!current) return;
00452 
00453     if (current->empty()) {
00454         current = 0;  // TODO: delete it
00455     }
00456     else if (current->old) {
00457         delete current->old;
00458         current->old = 0;
00459     }
00460     
00461     atomic_add(messagesDone, 1);
00462 
00463     if (message.size() == 1) {
00464         cerr << "ignored message with excessive elements: "
00465              << message.size()
00466              << endl;
00467         return;
00468     }
00469 
00470     string const & rawMessage = message[0];
00471 
00472     if (!current) return;
00473 
00474     string channel, content;
00475     string::size_type pos = rawMessage.find('\t');
00476 
00477     if (pos != string::npos) {
00478         channel = string(rawMessage, 0, pos);
00479         content = string(rawMessage, pos + 1);
00480     }
00481 
00482     current->logMessage(channel, content);
00483 }
00484 
00485 void
00486 Logger::
00487 handleMessage(std::vector<zmq::message_t> && message)
00488 {
00489     Outputs * current = outputs;
00490         
00491     if (!current) return;
00492 
00493     if (current->empty()) {
00494         current = 0;  // TODO: delete it
00495     }
00496     else if (current->old) {
00497         delete current->old;
00498         current->old = 0;
00499     }
00500     
00501     //cerr << "got subscription message " << message << endl;
00502                 
00503     if (!current) return;
00504 
00505     if (message.size() != 2) {
00506         vector<string> strMessages;
00507         for (auto & it: message) {
00508             strMessages.push_back(it.toString());
00509         }
00510 
00511         cerr << "ignoring invalid subscription message "
00512              << strMessages << endl;
00513         return;
00514     }
00515 
00516     //cerr << "logging subscription message " << message << endl;
00517 
00518     current->logMessage(message[0].toString(), message[1].toString());
00519 }
00520 
00521 #if 0
00522 void
00523 Logger::
00524 runLogThread(ACE_Semaphore & sem)
00525 {
00526     using namespace std;
00527 
00528     zmq::socket_t sock(*context, ZMQ_PULL);
00529     sock.bind(ML::format("inproc://logger@%p", this).c_str());
00530 
00531     zmq::socket_t raw_sock(*context, ZMQ_PULL);
00532     raw_sock.bind(ML::format("inproc://logger@%p-RAW", this).c_str());
00533 
00534     //cerr << "done bind" << endl;
00535 
00536     sem.release();
00537 
00538     int nitems = subscribers.size() + 2;
00539     zmq_pollitem_t items [nitems];
00540     zmq_pollitem_t item0 = { sock, 0, ZMQ_POLLIN, 0 };
00541     zmq_pollitem_t item1 = { raw_sock, 0, ZMQ_POLLIN, 0 };
00542     items[0] = item0;
00543     items[1] = item1;
00544     for (unsigned i = 0;  i < subscribers.size();  ++i) {
00545         zmq_pollitem_t item = { *subscriptions[i], 0, ZMQ_POLLIN, 0 };
00546         items[i + 2] = item;
00547     }
00548 
00549     //bool shutdown = false;
00550 
00551     //struct timeval beforeSleep, afterSleep;
00552     //gettimeofday(&afterSleep, 0);
00553 
00554     //cerr << "starting logging thread" << endl;
00555     
00556     while (!doShutdown) {
00557         //gettimeofday(&beforeSleep, 0);
00558 
00559         //dutyCycleCurrent.nsProcessing += timeDiff(afterSleep, beforeSleep);
00560         
00561         int rc = zmq_poll(items, nitems, 500 /* milliseconds */);
00562 
00563         //cerr << "rc = " << rc << endl;
00564         
00565         //gettimeofday(&afterSleep, 0);
00566 
00567         //dutyCycleCurrent.nsSleeping += timeDiff(beforeSleep, afterSleep);
00568         //dutyCycleCurrent.nEvents += 1;
00569 
00570         if (rc == -1 && zmq_errno() != EINTR) {
00571             cerr << "zeromq log error: " << zmq_strerror(zmq_errno()) << endl;
00572         }
00573 
00574         Outputs * current = outputs;
00575         
00576         if (!current) continue;
00577 
00578         if (current->empty()) {
00579             current = 0;  // TODO: delete it
00580         }
00581 
00582         if (current->old) {
00583             delete current->old;
00584             current->old = 0;
00585         }
00586         
00587         if (items[0].revents & ZMQ_POLLIN) {
00588             vector<string> message = recvAll(sock);
00589 
00590             if (message.size() == 1 && message[0] == "SHUTDOWN")
00591                 return;
00592 
00593             atomic_add(messagesDone, 1);
00594 
00595             if (!current) continue;
00596 
00597             string toLog;
00598             toLog.reserve(1024);
00599 
00600             for (unsigned i = 1;  i < message.size();  ++i) {
00601                 if (message[i].find_first_of("\n\t\0\r") != string::npos) {
00602                     cerr << "warning: part " << i << " of message "
00603                          << message[0] << " has illegal char: '"
00604                          << message[i] << "'" << endl;
00605                 }
00606                 if (i > 1) toLog += '\t';
00607                 toLog += message[i];
00608             }
00609             
00610             current->logMessage(message[0], toLog);
00611         }
00612         if (items[1].revents & ZMQ_POLLIN) {
00613             string message = recvMesg(raw_sock);
00614 
00615             atomic_add(messagesDone, 1);
00616 
00617             if (!current) continue;
00618 
00619             string channel, content;
00620             string::size_type pos = message.find('\t');
00621 
00622             if (pos != string::npos) {
00623                 channel = string(message, 0, pos);
00624                 content = string(message, pos + 1);
00625             }
00626 
00627             current->logMessage(channel, content);
00628         }
00629         for (unsigned i = 0;  i < subscriptions.size();  ++i) {
00630             if (items[i + 2].revents & ZMQ_POLLIN) {
00631                 vector<string> message = recvAll(*subscriptions[i]);
00632                 
00633                 //cerr << "got subscription message " << message << endl;
00634                 
00635                 if (!current) continue;
00636                 
00637                 if (message.size() != 2) {
00638                     cerr << "ignoring invalid subscription message "
00639                          << message << endl;
00640                 }
00641                 
00642                 //cerr << "logging subscription message " << message << endl;
00643                 
00644                 current->logMessage(message[0], message[1]);
00645             }
00646         }
00647     }
00648 }
00649 #endif
00650 
00651 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator