RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* logger.h -*- C++ -*- 00002 Jeremy Barnes, 20 May 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 */ 00006 00007 #pragma once 00008 00009 #include "soa/service/zmq_named_pub_sub.h" 00010 #include "soa/service/zmq_utils.h" 00011 #include "soa/service/socket_per_thread.h" 00012 #include <sstream> 00013 #include "jml/utils/filter_streams.h" 00014 #include <boost/thread/thread.hpp> 00015 #include "jml/utils/smart_ptr_utils.h" 00016 #include "jml/utils/vector_utils.h" 00017 #include "jml/arch/atomic_ops.h" 00018 #include "ace/Synch.h" 00019 #include <boost/function.hpp> 00020 #include <boost/regex.hpp> 00021 #include <boost/shared_ptr.hpp> 00022 #include "soa/jsoncpp/json.h" 00023 00024 00025 namespace Datacratic { 00026 00027 00028 /*****************************************************************************/ 00029 /* LOG OUTPUT */ 00030 /*****************************************************************************/ 00031 00034 struct LogOutput { 00035 LogOutput() 00036 { 00037 } 00038 00039 virtual ~LogOutput() 00040 { 00041 } 00042 00046 virtual void logMessage(const std::string & channel, 00047 const std::string & message) = 0; 00048 00052 virtual void close() = 0; 00053 00059 virtual Json::Value stats() const; 00060 00064 virtual void clearStats(); 00065 }; 00066 00067 00068 /*****************************************************************************/ 00069 /* LOGGER */ 00070 /*****************************************************************************/ 00071 00081 struct Logger { 00082 00084 Logger(); 00085 00086 Logger(zmq::context_t & contextRef); 00087 00088 Logger(std::shared_ptr<zmq::context_t> & context); 00089 00090 ~Logger(); 00091 00092 void init(); 00093 00103 void subscribe(const std::string & uri, 00104 const std::vector<std::string> & channels, 00105 const std::string & identity = ""); 00106 00113 void logTo(const std::string & uri, 00114 const boost::regex & allowChannels = boost::regex(), 00115 const boost::regex & denyChannels = boost::regex(), 00116 double logProbability = 1.0); 00117 00126 void addOutput(std::shared_ptr<LogOutput> output, 00127 const boost::regex & allowChannels = boost::regex(), 00128 const boost::regex & denyChannels = boost::regex(), 00129 double logProbability = 1.0); 00130 00134 void addCallback(boost::function<void (std::string, std::string)> callback, 00135 const boost::regex & allowChannels = boost::regex(), 00136 const boost::regex & denyChannels = boost::regex(), 00137 double logProbability = 1.0); 00138 00140 void clearOutputs(); 00141 00145 template<typename... Args> 00146 void operator () (const std::string & channel, Args... args) 00147 { 00148 logMessage(channel, args...); 00149 } 00150 00151 template<typename... Args> 00152 void logMessage(const std::string & channel, Args... args) 00153 { 00154 if (!outputs) return; 00155 ML::atomic_add(messagesSent, 1); 00156 messages.push({ channel, Date::now().print(5), args... }); 00157 } 00158 00159 template<typename... Args> 00160 void logMessageNoTimestamp(const std::string & channel, Args... args) 00161 { 00162 if (!outputs) return; 00163 ML::atomic_add(messagesSent, 1); 00164 messages.push({ channel, args... }); 00165 } 00166 00167 void logMessageNoTimestamp(const std::vector<std::string> & message) 00168 { 00169 if (!outputs) return; 00170 00171 if (message.empty()) 00172 throw ML::Exception("can't log empty message"); 00173 00174 ML::atomic_add(messagesSent, 1); 00175 messages.push(message); 00176 } 00177 00178 template<typename GetEl> 00179 void logMessage(const std::string & channel, 00180 int numElements, 00181 GetEl getElement) 00182 { 00183 if (!outputs) return; 00184 00185 std::vector<std::string> message; 00186 message.push_back(channel); 00187 message.push_back(Date::now().print(5)); 00188 00189 for (unsigned i = 0; i < numElements; ++i) { 00190 message.push_back(getElement(i)); 00191 } 00192 00193 messages.push(message); 00194 } 00195 00196 void start(std::function<void ()> onStop = 0); 00197 00198 void waitUntilFinished(); 00199 00200 void shutdown(); 00201 00202 std::map<std::string, size_t> getStats(); 00203 void resetStats(); 00204 00206 void replay(const std::string & filename, 00207 ssize_t maxEvents = -1); 00208 00211 void replayDirect(const std::string & filename, 00212 ssize_t maxEvents = -1) const; 00213 00214 uint64_t numMessagesSent() const { return messagesSent; } 00215 uint64_t numMessagesDone() const { return messagesDone; } 00216 00217 void handleListenerMessage(std::vector<std::string> const & message); 00218 void handleRawListenerMessage(std::vector<std::string> const & message); 00219 void handleMessage(std::vector<zmq::message_t> && message); 00220 00221 MessageLoop messageLoop; 00222 00223 private: 00224 std::shared_ptr<zmq::context_t> context; 00225 00226 std::map<std::string, size_t> stats; 00227 00229 TypedMessageSink<std::vector<std::string>> messages; 00230 00231 #if 0 00232 00233 boost::scoped_ptr<boost::thread> logThread; 00234 00236 void runLogThread(ACE_Semaphore & sem); 00237 #endif 00238 00239 struct Output; 00240 struct Outputs; 00241 00243 Outputs * outputs; 00244 00246 std::vector<std::shared_ptr<zmq::socket_t> > subscriptions; 00247 00249 bool doShutdown; 00250 00252 mutable uint64_t messagesSent; 00253 00255 uint64_t messagesDone; 00256 00257 }; 00258 00259 00260 } // namespace Datacratic