![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* multi_output.cc -*- C++ -*- 00002 Jeremy Barnes, 21 September 2012 00003 Copyright (c) 2012 Datacratic Inc. All rights reserved. 00004 00005 Output into multiple files based upon the fields of the message. 00006 */ 00007 00008 #include "multi_output.h" 00009 #include "log_message_splitter.h" 00010 #include "file_output.h" 00011 00012 using namespace std; 00013 00014 00015 namespace Datacratic { 00016 00017 00018 /*****************************************************************************/ 00019 /* MULTI OUTPUT */ 00020 /*****************************************************************************/ 00021 00022 MultiOutput:: 00023 MultiOutput() 00024 { 00025 } 00026 00027 MultiOutput:: 00028 ~MultiOutput() 00029 { 00030 } 00031 00032 void 00033 MultiOutput::ChannelEntry:: 00034 parse(const std::string & tmplate) 00035 { 00036 tokens.clear(); 00037 00038 const char * p = tmplate.c_str(); 00039 const char * e = p + tmplate.size(); 00040 00041 std::string currentLiteral; 00042 00043 auto pushLiteral = [&] () 00044 { 00045 if (currentLiteral.empty()) return; 00046 Token lit; 00047 lit.type = TOK_LITERAL; 00048 lit.literal = currentLiteral; 00049 tokens.push_back(lit); 00050 currentLiteral = ""; 00051 }; 00052 00053 auto pushToken = [&] (Token token) 00054 { 00055 pushLiteral(); 00056 tokens.push_back(token); 00057 }; 00058 00059 for (; p < e; ++p) { 00060 char c = *p; 00061 switch (c) { 00062 case '$': { 00063 if (p == e - 1) 00064 throw ML::Exception("channel key ended"); 00065 c = *++p; 00066 switch (c) { 00067 case '$': 00068 currentLiteral += '$'; 00069 break; 00070 case '(': { 00071 string num; 00072 for (++p; p != e && *p != ')'; ++p) 00073 num += *p; 00074 if (p == e) 00075 throw ML::Exception("didn't close field number"); 00076 int segNum = boost::lexical_cast<int>(num); 00077 Token token; 00078 if (segNum == 0) 00079 token.type = TOK_CHANNEL; 00080 else { 00081 token.type = TOK_FIELD; 00082 token.field = segNum - 1; 00083 } 00084 pushToken(token); 00085 //++p; 00086 break; 00087 } 00088 default: 00089 throw ML::Exception("unacceptable channel key after '$'"); 00090 } 00091 break; 00092 } 00093 default: 00094 currentLiteral += c; 00095 } 00096 } 00097 00098 pushLiteral(); 00099 00100 //cerr << "parsing " << tmplate << " we got " << tokens.size() << " tokens" 00101 // << endl; 00102 for (auto token: tokens) { 00103 cerr << " " << token.type << " " << token.literal << " " 00104 << token.field << endl; 00105 } 00106 } 00107 00108 std::string 00109 MultiOutput::ChannelEntry:: 00110 apply(const std::string & channel, 00111 const std::string & message) const 00112 { 00113 string result; 00114 00115 LogMessageSplitter<128> split(message); 00116 00117 for (auto token: tokens) { 00118 switch (token.type) { 00119 00120 case TOK_LITERAL: 00121 result += token.literal; 00122 break; 00123 00124 case TOK_CHANNEL: 00125 result += channel; 00126 break; 00127 00128 case TOK_FIELD: 00129 result += split[token.field]; 00130 break; 00131 00132 default: 00133 throw ML::Exception("unknown token"); 00134 } 00135 } 00136 00137 return result; 00138 } 00139 00140 void 00141 MultiOutput:: 00142 logMessage(const std::string & channel, 00143 const std::string & message) 00144 { 00145 std::unique_lock<std::mutex> guard(lock); 00146 auto it = channels.find(channel); 00147 if (it == channels.end()) 00148 it = channels.find(""); 00149 if (it == channels.end()) 00150 return; 00151 00152 00153 //cerr << "got entry " << it->first << endl; 00154 00155 auto & channelEntry = it->second; 00156 00157 // 1. Find which key the message should be logged to 00158 std::string messageKey = channelEntry.apply(channel, message); 00159 00160 //cerr << "logging " << channel << " to " << it->first 00161 // << " with messageKey " << messageKey << endl; 00162 //cerr << "messageKey = " << messageKey << endl; 00163 00164 // 2. Get the logger under the key; create if necessary 00165 std::shared_ptr<LogOutput> output; 00166 auto jt = outputs.find(messageKey); 00167 if (jt == outputs.end()) { 00168 cerr << "creating " << messageKey << endl; 00169 jt = outputs.insert(make_pair(messageKey, 00170 channelEntry.createLogger(messageKey))).first; 00171 } 00172 output = jt->second; 00173 guard.unlock(); 00174 00175 output->logMessage(channel, message); 00176 } 00177 00178 void 00179 MultiOutput:: 00180 logTo(const std::string & channel, 00181 const std::string & pattern, 00182 const CreateLogger & createLogger) 00183 { 00184 ChannelEntry entry; 00185 entry.createLogger = createLogger; 00186 entry.parse(pattern); 00187 00188 std::unique_lock<std::mutex> guard(lock); 00189 channels[channel] = entry; 00190 } 00191 00192 void 00193 MultiOutput:: 00194 close() 00195 { 00196 throw ML::Exception("MultiOutput::close() needs implementation"); 00197 } 00198 00199 Json::Value 00200 MultiOutput:: 00201 stats() const 00202 { 00203 return Json::Value(); 00204 } 00205 00206 void 00207 MultiOutput:: 00208 clearStats() 00209 { 00210 } 00211 00212 } // namespace Datacratic