RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* logger.cc 00002 Jeremy Barnes, 19 May 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Various classes for logging of the RTB data. 00006 */ 00007 00008 #include "logger.h" 00009 #include "jml/utils/vector_utils.h" 00010 #include "jml/arch/atomic_ops.h" 00011 #include "jml/arch/demangle.h" 00012 #include "jml/utils/string_functions.h" 00013 #include "jml/arch/timers.h" 00014 #include "file_output.h" 00015 #include "publish_output.h" 00016 #include "callback_output.h" 00017 #include <boost/make_shared.hpp> 00018 00019 00020 using namespace std; 00021 using namespace ML; 00022 00023 00024 namespace Datacratic { 00025 00026 00027 /*****************************************************************************/ 00028 /* LOG OUTPUT */ 00029 /*****************************************************************************/ 00030 00031 Json::Value 00032 LogOutput:: 00033 stats() const 00034 { 00035 return Json::Value(); 00036 } 00037 00038 void 00039 LogOutput:: 00040 clearStats() 00041 { 00042 } 00043 00044 00045 /*****************************************************************************/ 00046 /* LOGGER */ 00047 /*****************************************************************************/ 00048 00049 Logger:: 00050 Logger() 00051 : context(std::make_shared<zmq::context_t>(1)), 00052 messages(65536), 00053 outputs(0), 00054 messagesSent(0), messagesDone(0) 00055 { 00056 doShutdown = false; 00057 } 00058 00059 Logger:: 00060 Logger(zmq::context_t & contextRef) 00061 : context(ML::make_unowned_std_sp(contextRef)), 00062 messages(65536), 00063 outputs(0), 00064 messagesSent(0), messagesDone(0) 00065 { 00066 doShutdown = false; 00067 } 00068 00069 Logger:: 00070 Logger(std::shared_ptr<zmq::context_t> & context) 00071 : context(context), 00072 messages(65536), 00073 outputs(0), 00074 messagesSent(0), messagesDone(0) 00075 { 00076 doShutdown = false; 00077 } 00078 00079 Logger:: 00080 ~Logger() 00081 { 00082 shutdown(); 00083 } 00084 00085 void 00086 Logger:: 00087 init() 00088 { 00089 messageLoop.init(); 00090 00091 messages.onEvent = [=](std::vector<std::string> && message) { 00092 handleListenerMessage(message); 00093 }; 00094 00095 messageLoop.addSource("Logger::messages", messages); 00096 } 00097 00098 void 00099 Logger:: 00100 subscribe(const std::string & uri, 00101 const std::vector<std::string> & channels, 00102 const std::string & identity) 00103 { 00104 #if 0 00105 if (logThread) 00106 throw ML::Exception("must subscribe before log thread starts"); 00107 #endif 00108 00109 //using namespace std; 00110 //cerr << "subscribing to " << uri << " on " << channels.size() 00111 // << " channels " 00112 // << channels << endl; 00113 00114 auto subscription = std::make_shared<zmq::socket_t>(*context, ZMQ_SUB); 00115 00116 setHwm(*subscription, 100000); 00117 00118 if (identity != "") 00119 setIdentity(*subscription, identity); 00120 subscription->connect(uri.c_str()); 00121 00122 if (channels.empty()) 00123 subscribeChannel(*subscription, ""); 00124 else { 00125 for (auto it = channels.begin(), end = channels.end(); it != end; 00126 ++it) 00127 subscribeChannel(*subscription, *it); 00128 } 00129 00130 subscriptions.push_back(subscription); 00131 00132 messageLoop.addSource("Logger::" + identity, 00133 std::make_shared<ZmqBinaryEventSource> 00134 (*subscription, [=] (std::vector<zmq::message_t> && message) 00135 { 00136 this->handleMessage(std::move(message)); 00137 })); 00138 } 00139 00141 struct Logger::Output { 00142 Output() 00143 : logProbability(1.0) 00144 { 00145 } 00146 00147 Output(const boost::regex & allowChannels, 00148 const boost::regex & denyChannels, 00149 std::shared_ptr<LogOutput> output, 00150 double logProbability) 00151 : allowChannels(allowChannels), denyChannels(denyChannels), 00152 output(output), logProbability(logProbability) 00153 { 00154 } 00155 00156 boost::regex allowChannels; // channels to match 00157 boost::regex denyChannels; // channels to filter out 00158 std::shared_ptr<LogOutput> output; // thing to write to 00159 double logProbability; 00160 }; 00161 00163 struct Logger::Outputs : public std::vector<Output> { 00164 Outputs() 00165 : old(0) 00166 { 00167 } 00168 00169 Outputs(Outputs * old, 00170 const Output & toAdd) 00171 : old(old) 00172 { 00173 if (old) { 00174 reserve(old->size() + 1); 00175 insert(begin(), old->begin(), old->end()); 00176 } 00177 00178 push_back(toAdd); 00179 } 00180 00181 ~Outputs() 00182 { 00183 if (old) delete old; 00184 } 00185 00186 void logMessage(const std::string & channel, 00187 const std::string & message) 00188 { 00189 for (auto it = begin(); it != end(); ++it) { 00190 try { 00191 //cerr << "channel = " << channel << endl; 00192 //cerr << "output = " << ML::type_name(*it->output) << endl; 00193 //cerr << "it->allowChannels = " << it->allowChannels.str() << endl; 00194 //cerr << "it->denyChannels = " << it->denyChannels.str() << endl; 00195 if (it->allowChannels.empty() 00196 || boost::regex_match(channel, it->allowChannels)) { 00197 //cerr << " allow" << endl; 00198 if (it->denyChannels.empty() 00199 || !boost::regex_match(channel, it->denyChannels)) { 00200 00201 if (it->logProbability == 1.0 00202 || ((random() % 100000) 00203 < (it->logProbability * 100000))) { 00204 it->output->logMessage(channel, message); 00205 } 00206 //cerr << " *** log" << endl; 00207 } 00208 } 00209 } catch (const std::exception & exc) { 00210 cerr << "error: writing message to channel " << channel 00211 << " with output " << ML::type_name(*it->output) 00212 << ": " << exc.what() << "; message = " 00213 << message << endl; 00214 } 00215 } 00216 } 00217 00218 Outputs * old; // to allow cleanup 00219 }; 00220 00221 bool startsWith(std::string & s, 00222 const std::string & prefix) 00223 { 00224 if (s.find(prefix) == 0) { 00225 s.erase(0, prefix.size()); 00226 return true; 00227 } 00228 return false; 00229 } 00230 00231 void 00232 Logger:: 00233 logTo(const std::string & uri, 00234 const boost::regex & allowChannels, 00235 const boost::regex & denyChannels, 00236 double logProbability) 00237 { 00238 string rest = uri; 00239 if (startsWith(rest, "file://")) 00240 addOutput(ML::make_std_sp(new FileOutput(rest)), 00241 allowChannels, denyChannels, logProbability); 00242 else if (startsWith(rest, "pub://")) { 00243 auto output = ML::make_std_sp(new PublishOutput(context)); 00244 output->bind(rest); 00245 addOutput(output, allowChannels, denyChannels, logProbability); 00246 } 00247 else throw Exception("don't know how to interpret output " + uri); 00248 } 00249 00250 void 00251 Logger:: 00252 addOutput(std::shared_ptr<LogOutput> output, 00253 const boost::regex & allowChannels, 00254 const boost::regex & denyChannels, 00255 double logProbability) 00256 { 00257 Outputs * current = outputs; 00258 00259 for (;;) { 00260 auto_ptr<Outputs> newOutputs 00261 (new Outputs(current, Output(allowChannels, denyChannels, output, 00262 logProbability))); 00263 if (ML::cmp_xchg(outputs, current, newOutputs.get())) { 00264 newOutputs.release(); 00265 break; 00266 } 00267 } 00268 } 00269 00270 void 00271 Logger:: 00272 addCallback(boost::function<void (std::string, std::string)> callback, 00273 const boost::regex & allowChannels, 00274 const boost::regex & denyChannels, 00275 double logProbability) 00276 { 00277 addOutput(std::make_shared<CallbackOutput>(callback), 00278 allowChannels, denyChannels, logProbability); 00279 } 00280 00281 void 00282 Logger:: 00283 clearOutputs() 00284 { 00285 auto_ptr<Outputs> newOutputs(new Outputs()); 00286 00287 Outputs * current = outputs; 00288 00289 for (;;) { 00290 newOutputs->old = current; 00291 00292 if (ML::cmp_xchg(outputs, current, newOutputs.get())) break; 00293 } 00294 00295 newOutputs.release(); 00296 } 00297 00298 void 00299 Logger:: 00300 start(std::function<void ()> onStop) 00301 { 00302 messagesSent = messagesDone = 0; 00303 doShutdown = false; 00304 00305 messageLoop.start(onStop); 00306 00307 00308 #if 0 00309 ACE_Semaphore sem(0); 00310 00311 // NOTE: we can pass by reference since the log thread never touches 00312 // sem until this function has exited 00313 logThread.reset(new boost::thread([&](){ this->runLogThread(sem); })); 00314 00315 // Wait until we're ready 00316 sem.acquire(); 00317 #endif 00318 } 00319 00320 void 00321 Logger:: 00322 waitUntilFinished() 00323 { 00324 while (messagesDone < messagesSent) { 00325 //cerr << "sent " << messagesSent << " done " 00326 // << messagesDone << endl; 00327 ML::sleep(0.01); 00328 } 00329 00330 //cerr << "finished: sent " << messagesSent << " done " 00331 // << messagesDone << endl; 00332 } 00333 00334 void 00335 Logger:: 00336 shutdown() 00337 { 00338 messageLoop.shutdown(); 00339 00340 doShutdown = true; 00341 00342 delete outputs; outputs = 0; 00343 00344 doShutdown = false; 00345 } 00346 00347 void 00348 Logger:: 00349 replay(const std::string & filename, ssize_t maxEvents) 00350 { 00351 if (!outputs) return; 00352 00353 filter_istream stream(filename); 00354 00355 for (ssize_t i = 0; stream && (maxEvents == -1 || i < maxEvents); ++i) { 00356 string line; 00357 getline(stream, line); 00358 atomic_add(messagesSent, 1); 00359 messages.push({ line }); 00360 } 00361 00362 cerr << "replay: sent " << messagesSent << " done: " 00363 << messagesDone << endl; 00364 } 00365 00366 void 00367 Logger:: 00368 replayDirect(const std::string & filename, ssize_t maxEvents) const 00369 { 00370 #if 0 00371 if (logThread) 00372 throw ML::Exception("log thread already up for replayDirect"); 00373 #endif 00374 00375 if (!outputs) return; 00376 00377 filter_istream stream(filename); 00378 00379 for (ssize_t i = 0; stream && (maxEvents == -1 || i < maxEvents); ++i) { 00380 string line; 00381 getline(stream, line); 00382 atomic_add(messagesSent, 1); 00383 00384 Outputs * current = outputs; 00385 00386 if (!current) continue; 00387 00388 string channel, content; 00389 string::size_type pos = line.find('\t'); 00390 00391 if (pos != string::npos) { 00392 channel = string(line, 0, pos); 00393 content = string(line, pos + 1); 00394 } 00395 00396 current->logMessage(channel, content); 00397 } 00398 00399 cerr << "replay: sent " << messagesSent << " done: " 00400 << messagesDone << endl; 00401 } 00402 00403 void 00404 Logger:: 00405 handleListenerMessage(std::vector<std::string> const & message) 00406 { 00407 Outputs * current = outputs; 00408 00409 if (!current) return; 00410 00411 if (current->empty()) { 00412 current = 0; // TODO: delete it 00413 } 00414 else if (current->old) { 00415 delete current->old; 00416 current->old = 0; 00417 } 00418 00419 if (message.size() == 1 && message[0] == "SHUTDOWN") 00420 return; 00421 00422 atomic_add(messagesDone, 1); 00423 00424 if (!current) return; 00425 00426 string const & channel = message[0]; 00427 00428 string toLog; 00429 toLog.reserve(1024); 00430 00431 for (unsigned i = 1; i < message.size(); ++i) { 00432 string const & strMessage = message[i]; 00433 if (strMessage.find_first_of("\n\t\0\r") != string::npos) { 00434 cerr << "warning: part " << i << " of message " 00435 << channel << " has illegal char: '" 00436 << strMessage << "'" << endl; 00437 } 00438 if (i > 1) toLog += '\t'; 00439 toLog += strMessage; 00440 } 00441 00442 current->logMessage(channel, toLog); 00443 } 00444 00445 void 00446 Logger:: 00447 handleRawListenerMessage(std::vector<std::string> const & message) 00448 { 00449 Outputs * current = outputs; 00450 00451 if (!current) return; 00452 00453 if (current->empty()) { 00454 current = 0; // TODO: delete it 00455 } 00456 else if (current->old) { 00457 delete current->old; 00458 current->old = 0; 00459 } 00460 00461 atomic_add(messagesDone, 1); 00462 00463 if (message.size() == 1) { 00464 cerr << "ignored message with excessive elements: " 00465 << message.size() 00466 << endl; 00467 return; 00468 } 00469 00470 string const & rawMessage = message[0]; 00471 00472 if (!current) return; 00473 00474 string channel, content; 00475 string::size_type pos = rawMessage.find('\t'); 00476 00477 if (pos != string::npos) { 00478 channel = string(rawMessage, 0, pos); 00479 content = string(rawMessage, pos + 1); 00480 } 00481 00482 current->logMessage(channel, content); 00483 } 00484 00485 void 00486 Logger:: 00487 handleMessage(std::vector<zmq::message_t> && message) 00488 { 00489 Outputs * current = outputs; 00490 00491 if (!current) return; 00492 00493 if (current->empty()) { 00494 current = 0; // TODO: delete it 00495 } 00496 else if (current->old) { 00497 delete current->old; 00498 current->old = 0; 00499 } 00500 00501 //cerr << "got subscription message " << message << endl; 00502 00503 if (!current) return; 00504 00505 if (message.size() != 2) { 00506 vector<string> strMessages; 00507 for (auto & it: message) { 00508 strMessages.push_back(it.toString()); 00509 } 00510 00511 cerr << "ignoring invalid subscription message " 00512 << strMessages << endl; 00513 return; 00514 } 00515 00516 //cerr << "logging subscription message " << message << endl; 00517 00518 current->logMessage(message[0].toString(), message[1].toString()); 00519 } 00520 00521 #if 0 00522 void 00523 Logger:: 00524 runLogThread(ACE_Semaphore & sem) 00525 { 00526 using namespace std; 00527 00528 zmq::socket_t sock(*context, ZMQ_PULL); 00529 sock.bind(ML::format("inproc://logger@%p", this).c_str()); 00530 00531 zmq::socket_t raw_sock(*context, ZMQ_PULL); 00532 raw_sock.bind(ML::format("inproc://logger@%p-RAW", this).c_str()); 00533 00534 //cerr << "done bind" << endl; 00535 00536 sem.release(); 00537 00538 int nitems = subscribers.size() + 2; 00539 zmq_pollitem_t items [nitems]; 00540 zmq_pollitem_t item0 = { sock, 0, ZMQ_POLLIN, 0 }; 00541 zmq_pollitem_t item1 = { raw_sock, 0, ZMQ_POLLIN, 0 }; 00542 items[0] = item0; 00543 items[1] = item1; 00544 for (unsigned i = 0; i < subscribers.size(); ++i) { 00545 zmq_pollitem_t item = { *subscriptions[i], 0, ZMQ_POLLIN, 0 }; 00546 items[i + 2] = item; 00547 } 00548 00549 //bool shutdown = false; 00550 00551 //struct timeval beforeSleep, afterSleep; 00552 //gettimeofday(&afterSleep, 0); 00553 00554 //cerr << "starting logging thread" << endl; 00555 00556 while (!doShutdown) { 00557 //gettimeofday(&beforeSleep, 0); 00558 00559 //dutyCycleCurrent.nsProcessing += timeDiff(afterSleep, beforeSleep); 00560 00561 int rc = zmq_poll(items, nitems, 500 /* milliseconds */); 00562 00563 //cerr << "rc = " << rc << endl; 00564 00565 //gettimeofday(&afterSleep, 0); 00566 00567 //dutyCycleCurrent.nsSleeping += timeDiff(beforeSleep, afterSleep); 00568 //dutyCycleCurrent.nEvents += 1; 00569 00570 if (rc == -1 && zmq_errno() != EINTR) { 00571 cerr << "zeromq log error: " << zmq_strerror(zmq_errno()) << endl; 00572 } 00573 00574 Outputs * current = outputs; 00575 00576 if (!current) continue; 00577 00578 if (current->empty()) { 00579 current = 0; // TODO: delete it 00580 } 00581 00582 if (current->old) { 00583 delete current->old; 00584 current->old = 0; 00585 } 00586 00587 if (items[0].revents & ZMQ_POLLIN) { 00588 vector<string> message = recvAll(sock); 00589 00590 if (message.size() == 1 && message[0] == "SHUTDOWN") 00591 return; 00592 00593 atomic_add(messagesDone, 1); 00594 00595 if (!current) continue; 00596 00597 string toLog; 00598 toLog.reserve(1024); 00599 00600 for (unsigned i = 1; i < message.size(); ++i) { 00601 if (message[i].find_first_of("\n\t\0\r") != string::npos) { 00602 cerr << "warning: part " << i << " of message " 00603 << message[0] << " has illegal char: '" 00604 << message[i] << "'" << endl; 00605 } 00606 if (i > 1) toLog += '\t'; 00607 toLog += message[i]; 00608 } 00609 00610 current->logMessage(message[0], toLog); 00611 } 00612 if (items[1].revents & ZMQ_POLLIN) { 00613 string message = recvMesg(raw_sock); 00614 00615 atomic_add(messagesDone, 1); 00616 00617 if (!current) continue; 00618 00619 string channel, content; 00620 string::size_type pos = message.find('\t'); 00621 00622 if (pos != string::npos) { 00623 channel = string(message, 0, pos); 00624 content = string(message, pos + 1); 00625 } 00626 00627 current->logMessage(channel, content); 00628 } 00629 for (unsigned i = 0; i < subscriptions.size(); ++i) { 00630 if (items[i + 2].revents & ZMQ_POLLIN) { 00631 vector<string> message = recvAll(*subscriptions[i]); 00632 00633 //cerr << "got subscription message " << message << endl; 00634 00635 if (!current) continue; 00636 00637 if (message.size() != 2) { 00638 cerr << "ignoring invalid subscription message " 00639 << message << endl; 00640 } 00641 00642 //cerr << "logging subscription message " << message << endl; 00643 00644 current->logMessage(message[0], message[1]); 00645 } 00646 } 00647 } 00648 } 00649 #endif 00650 00651 } // namespace Datacratic