RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/logger/multi_output.cc
00001 /* multi_output.cc                                                 -*- C++ -*-
00002    Jeremy Barnes, 21 September 2012
00003    Copyright (c) 2012 Datacratic Inc.  All rights reserved.
00004 
00005    Output into multiple files based upon the fields of the message.
00006 */
00007 
00008 #include "multi_output.h"
00009 #include "log_message_splitter.h"
00010 #include "file_output.h"
00011 
00012 using namespace std;
00013 
00014 
00015 namespace Datacratic {
00016 
00017 
00018 /*****************************************************************************/
00019 /* MULTI OUTPUT                                                              */
00020 /*****************************************************************************/
00021 
00022 MultiOutput::
00023 MultiOutput()
00024 {
00025 }
00026 
00027 MultiOutput::
00028 ~MultiOutput()
00029 {
00030 }
00031 
00032 void
00033 MultiOutput::ChannelEntry::
00034 parse(const std::string & tmplate)
00035 {
00036     tokens.clear();
00037     
00038     const char * p = tmplate.c_str();
00039     const char * e = p + tmplate.size();
00040 
00041     std::string currentLiteral;
00042 
00043     auto pushLiteral = [&] ()
00044         {
00045             if (currentLiteral.empty()) return;
00046             Token lit;
00047             lit.type = TOK_LITERAL;
00048             lit.literal = currentLiteral;
00049             tokens.push_back(lit);
00050             currentLiteral = "";
00051         };
00052 
00053     auto pushToken = [&] (Token token)
00054         {
00055             pushLiteral();
00056             tokens.push_back(token);
00057         };
00058 
00059     for (; p < e;  ++p) {
00060         char c = *p;
00061         switch (c) {
00062         case '$': {
00063             if (p == e - 1)
00064                 throw ML::Exception("channel key ended");
00065             c = *++p;
00066             switch (c) {
00067             case '$':
00068                 currentLiteral += '$';
00069                 break;
00070             case '(': {
00071                 string num;
00072                 for (++p; p != e && *p != ')';  ++p)
00073                     num += *p;
00074                 if (p == e)
00075                     throw ML::Exception("didn't close field number");
00076                 int segNum = boost::lexical_cast<int>(num);
00077                 Token token;
00078                 if (segNum == 0)
00079                     token.type = TOK_CHANNEL;
00080                 else {
00081                     token.type = TOK_FIELD;
00082                     token.field = segNum - 1;
00083                 }
00084                 pushToken(token);
00085                 //++p;
00086                 break;
00087             }
00088             default:
00089                 throw ML::Exception("unacceptable channel key after '$'");
00090             }
00091             break;
00092         }
00093         default:
00094             currentLiteral += c;
00095         }
00096     }
00097 
00098     pushLiteral();
00099 
00100     //cerr << "parsing " << tmplate << " we got " << tokens.size() << " tokens"
00101     //     << endl;
00102     for (auto token: tokens) {
00103         cerr << "  " << token.type << " " << token.literal << " "
00104              << token.field << endl;
00105     }
00106 }
00107 
00108 std::string
00109 MultiOutput::ChannelEntry::
00110 apply(const std::string & channel,
00111       const std::string & message) const
00112 {
00113     string result;
00114 
00115     LogMessageSplitter<128> split(message);
00116 
00117     for (auto token: tokens) {
00118         switch (token.type) {
00119 
00120         case TOK_LITERAL:
00121             result += token.literal;
00122             break;
00123 
00124         case TOK_CHANNEL:
00125             result += channel;
00126             break;
00127 
00128         case TOK_FIELD:
00129             result += split[token.field];
00130             break;
00131 
00132         default:
00133             throw ML::Exception("unknown token");
00134         }
00135     }
00136 
00137     return result;
00138 }
00139 
00140 void
00141 MultiOutput::
00142 logMessage(const std::string & channel,
00143            const std::string & message)
00144 {
00145     std::unique_lock<std::mutex> guard(lock);
00146     auto it = channels.find(channel);
00147     if (it == channels.end())
00148         it = channels.find("");
00149     if (it == channels.end())
00150         return;
00151 
00152 
00153     //cerr << "got entry " << it->first << endl;
00154 
00155     auto & channelEntry = it->second;
00156 
00157     // 1.  Find which key the message should be logged to
00158     std::string messageKey = channelEntry.apply(channel, message);
00159     
00160     //cerr << "logging " << channel << " to " << it->first
00161     //     << " with messageKey " << messageKey << endl;
00162     //cerr << "messageKey = " << messageKey << endl;
00163 
00164     // 2.  Get the logger under the key; create if necessary
00165     std::shared_ptr<LogOutput> output;
00166     auto jt = outputs.find(messageKey);
00167     if (jt == outputs.end()) {
00168         cerr << "creating " << messageKey << endl;
00169         jt = outputs.insert(make_pair(messageKey,
00170                                       channelEntry.createLogger(messageKey))).first;
00171     }
00172     output = jt->second;
00173     guard.unlock();
00174 
00175     output->logMessage(channel, message);
00176 }
00177 
00178 void
00179 MultiOutput::
00180 logTo(const std::string & channel,
00181       const std::string & pattern,
00182       const CreateLogger & createLogger)
00183 {
00184     ChannelEntry entry;
00185     entry.createLogger = createLogger;
00186     entry.parse(pattern);
00187     
00188     std::unique_lock<std::mutex> guard(lock);
00189     channels[channel] = entry;
00190 }
00191 
00192 void
00193 MultiOutput::
00194 close()
00195 {
00196     throw ML::Exception("MultiOutput::close() needs implementation");
00197 }
00198 
00199 Json::Value
00200 MultiOutput::
00201 stats() const
00202 {
00203     return Json::Value();
00204 }
00205 
00206 void
00207 MultiOutput::
00208 clearStats()
00209 {
00210 }
00211 
00212 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator