RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/logger/logger.h
00001 /* logger.h                                                        -*- C++ -*-
00002    Jeremy Barnes, 20 May 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005 */
00006 
00007 #pragma once
00008 
00009 #include "soa/service/zmq_named_pub_sub.h"
00010 #include "soa/service/zmq_utils.h"
00011 #include "soa/service/socket_per_thread.h"
00012 #include <sstream>
00013 #include "jml/utils/filter_streams.h"
00014 #include <boost/thread/thread.hpp>
00015 #include "jml/utils/smart_ptr_utils.h"
00016 #include "jml/utils/vector_utils.h"
00017 #include "jml/arch/atomic_ops.h"
00018 #include "ace/Synch.h"
00019 #include <boost/function.hpp>
00020 #include <boost/regex.hpp>
00021 #include <boost/shared_ptr.hpp>
00022 #include "soa/jsoncpp/json.h"
00023 
00024 
00025 namespace Datacratic {
00026 
00027 
00028 /*****************************************************************************/
00029 /* LOG OUTPUT                                                                */
00030 /*****************************************************************************/
00031 
00034 struct LogOutput {
00035     LogOutput()
00036     {
00037     }
00038 
00039     virtual ~LogOutput()
00040     {
00041     }
00042 
00046     virtual void logMessage(const std::string & channel,
00047                             const std::string & message) = 0;
00048 
00052     virtual void close() = 0;
00053 
00059     virtual Json::Value stats() const;
00060 
00064     virtual void clearStats();
00065 };
00066 
00067 
00068 /*****************************************************************************/
00069 /* LOGGER                                                                    */
00070 /*****************************************************************************/
00071 
00081 struct Logger {
00082 
00084     Logger();
00085 
00086     Logger(zmq::context_t & contextRef);
00087 
00088     Logger(std::shared_ptr<zmq::context_t> & context);
00089 
00090     ~Logger();
00091 
00092     void init();
00093 
00103     void subscribe(const std::string & uri,
00104                    const std::vector<std::string> & channels,
00105                    const std::string & identity = "");
00106 
00113     void logTo(const std::string & uri,
00114                const boost::regex & allowChannels = boost::regex(),
00115                const boost::regex & denyChannels = boost::regex(),
00116                double logProbability = 1.0);
00117 
00126     void addOutput(std::shared_ptr<LogOutput> output,
00127                    const boost::regex & allowChannels = boost::regex(),
00128                    const boost::regex & denyChannels = boost::regex(),
00129                    double logProbability = 1.0);
00130 
00134     void addCallback(boost::function<void (std::string, std::string)> callback,
00135                      const boost::regex & allowChannels = boost::regex(),
00136                      const boost::regex & denyChannels = boost::regex(),
00137                      double logProbability = 1.0);
00138 
00140     void clearOutputs();
00141 
00145     template<typename... Args>
00146     void operator () (const std::string & channel, Args... args)
00147     {
00148         logMessage(channel, args...);
00149     }
00150 
00151     template<typename... Args>
00152     void logMessage(const std::string & channel, Args... args)
00153     {
00154         if (!outputs) return;
00155         ML::atomic_add(messagesSent, 1);
00156         messages.push({ channel, Date::now().print(5), args... });
00157     }
00158 
00159     template<typename... Args>
00160     void logMessageNoTimestamp(const std::string & channel, Args... args)
00161     {
00162         if (!outputs) return;
00163         ML::atomic_add(messagesSent, 1);
00164         messages.push({ channel, args... });
00165     }
00166 
00167     void logMessageNoTimestamp(const std::vector<std::string> & message)
00168     {
00169         if (!outputs) return;
00170 
00171         if (message.empty())
00172             throw ML::Exception("can't log empty message");
00173 
00174         ML::atomic_add(messagesSent, 1);
00175         messages.push(message);
00176     }
00177 
00178     template<typename GetEl>
00179     void logMessage(const std::string & channel,
00180                     int numElements,
00181                     GetEl getElement)
00182     {
00183         if (!outputs) return;
00184 
00185         std::vector<std::string> message;
00186         message.push_back(channel);
00187         message.push_back(Date::now().print(5));
00188 
00189         for (unsigned i = 0;  i < numElements;  ++i) {
00190             message.push_back(getElement(i));
00191         }
00192 
00193         messages.push(message);
00194     }
00195 
00196     void start(std::function<void ()> onStop = 0);
00197 
00198     void waitUntilFinished();
00199 
00200     void shutdown();
00201     
00202     std::map<std::string, size_t> getStats();
00203     void resetStats();
00204 
00206     void replay(const std::string & filename,
00207                 ssize_t maxEvents = -1);
00208 
00211     void replayDirect(const std::string & filename,
00212                       ssize_t maxEvents = -1) const;
00213     
00214     uint64_t numMessagesSent() const { return messagesSent; }
00215     uint64_t numMessagesDone() const { return messagesDone; }
00216 
00217     void handleListenerMessage(std::vector<std::string> const & message);
00218     void handleRawListenerMessage(std::vector<std::string> const & message);
00219     void handleMessage(std::vector<zmq::message_t> && message);
00220 
00221     MessageLoop messageLoop;
00222 
00223 private:
00224     std::shared_ptr<zmq::context_t> context;
00225 
00226     std::map<std::string, size_t> stats;
00227 
00229     TypedMessageSink<std::vector<std::string>> messages;
00230 
00231 #if 0
00232 
00233     boost::scoped_ptr<boost::thread> logThread;
00234 
00236     void runLogThread(ACE_Semaphore & sem);
00237 #endif
00238 
00239     struct Output;
00240     struct Outputs;
00241 
00243     Outputs * outputs;
00244 
00246     std::vector<std::shared_ptr<zmq::socket_t> > subscriptions;
00247 
00249     bool doShutdown;
00250 
00252     mutable uint64_t messagesSent;
00253 
00255     uint64_t messagesDone;
00256 
00257 };
00258 
00259 
00260 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator