RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* http_endpoint.cc 00002 Jeremy Barnes, 18 February 2010 00003 Copyright (c) 2010 Datacratic. All rights reserved. 00004 00005 */ 00006 00007 #include "soa/service//http_endpoint.h" 00008 #include "jml/arch/cmp_xchg.h" 00009 #include "jml/arch/atomic_ops.h" 00010 #include "jml/utils/parse_context.h" 00011 #include "jml/utils/string_functions.h" 00012 #include "jml/utils/exc_assert.h" 00013 #include <fstream> 00014 #include <boost/make_shared.hpp> 00015 00016 00017 using namespace std; 00018 using namespace ML; 00019 00020 00021 namespace Datacratic { 00022 00023 00024 /*****************************************************************************/ 00025 /* HTTP CONNECTION HANDLER */ 00026 /*****************************************************************************/ 00027 00028 HttpConnectionHandler:: 00029 HttpConnectionHandler() 00030 : readState(INVALID), httpEndpoint(0) 00031 { 00032 } 00033 00034 void 00035 HttpConnectionHandler:: 00036 onGotTransport() 00037 { 00038 this->httpEndpoint = dynamic_cast<HttpEndpoint *>(get_endpoint()); 00039 00040 readState = HEADER; 00041 startReading(); 00042 } 00043 00044 std::shared_ptr<ConnectionHandler> 00045 HttpConnectionHandler:: 00046 makeNewHandlerShared() 00047 { 00048 if (!this->httpEndpoint) 00049 throw Exception("HttpConnectionHandler needs to be owned by an " 00050 "HttpEndpoint for makeNewHandlerShared() to work"); 00051 return httpEndpoint->makeNewHandler(); 00052 } 00053 00054 void 00055 HttpConnectionHandler:: 00056 handleData(const std::string & data) 00057 { 00058 //cerr << "HttpConnectionHandler::handleData: got data <" << data << ">" << endl; 00059 //httpData.write(data.c_str(), data.length()); 00060 00061 if (headerText == "" && readState == HEADER) 00062 firstData = Date::now(); 00063 00064 #if 0 00065 string dataSample; 00066 dataSample.reserve(data.size() * 3 / 2); 00067 for (unsigned i = 0; i < 100 && i < data.size(); ++i) { 00068 if (data[i] == '\n') dataSample += "\\n"; 00069 else if (data[i] == '\r') dataSample += "\\r"; 00070 else if (data[i] == '\0') dataSample += "\\0"; 00071 else if (data[i] == '\t') dataSample += "\\t"; 00072 else if (data[i] < ' ' || data[i] >= 127) dataSample += '.'; 00073 else dataSample += data[i]; 00074 } 00075 00076 addActivity("got %d bytes of data: %s", (int)data.size(), 00077 dataSample.c_str()); 00078 #endif 00079 00080 if (readState == PAYLOAD 00081 || readState == CHUNK_HEADER || readState == CHUNK_BODY) { 00082 handleHttpData(data); 00083 return; 00084 } 00085 00086 if (readState != HEADER) { 00087 throw Exception("invalid read state %d handling data '%s' for %08xp", 00088 readState, data.c_str(), this); 00089 } 00090 00091 headerText += data; 00092 // The first time that we see that character sequence, it's the break 00093 // between the header and the data. 00094 string::size_type breakPos = headerText.find("\r\n\r\n"); 00095 00096 00097 if (breakPos == string::npos) 00098 { 00099 if (headerText.size() > 16384) { 00100 throw ML::Exception("HTTP header exceeds 16kb"); 00101 } 00102 //cerr << "did not find break pos " << endl; 00103 return; 00104 } 00105 // We got a header 00106 try { 00107 header.parse(headerText); 00108 } catch (...) { 00109 cerr << "problem parsing in state: " << status() << endl; 00110 throw; 00111 } 00112 00113 //cerr << "content length = " << header.contentLength << endl; 00114 00115 if (header.contentLength == -1 && !header.isChunked) 00116 header.contentLength = 0; 00117 //doError("we need a Content-Length"); 00118 00119 addActivityS("header parsing OK"); 00120 00121 //cerr << "done header" << endl; 00122 00123 handleHttpHeader(header); 00124 00125 payload = ""; 00126 00127 if (header.isChunked) 00128 readState = CHUNK_HEADER; 00129 else readState = PAYLOAD; 00130 00131 handleHttpData(header.knownData); 00132 } 00133 00134 void 00135 HttpConnectionHandler:: 00136 handleHttpHeader(const HttpHeader & header) 00137 { 00138 //cerr << "GOT HTTP HEADER[" << header << "]" << endl; 00139 } 00140 00141 void 00142 HttpConnectionHandler:: 00143 handleHttpData(const std::string & data) 00144 { 00145 //static const char *fName = "HttpConnectionHandler::handleHttpData:"; 00146 //cerr << "got HTTP data in state " << readState << " with " 00147 // << data.length() << " characters" << endl; 00148 //cerr << "data = [" << data << "]" << endl; 00149 //cerr << endl << endl << "---------------------------" << endl; 00150 00151 if (readState == PAYLOAD) { 00152 //cerr << "handleHttpData" << endl; 00153 if (readState != PAYLOAD) 00154 throw Exception("invalid state: expected payload"); 00155 00156 payload += data; 00157 #if 0 00158 cerr << "payload = " << payload << endl; 00159 cerr << "payload.length() = " << payload.length() << endl; 00160 cerr << "header.contentLength = " << header.contentLength << endl; 00161 #endif 00162 if (payload.length() > header.contentLength) { 00163 doError("extra data"); 00164 } 00165 00166 00167 if (payload.length() == header.contentLength) { 00168 addActivityS("got HTTP payload"); 00169 handleHttpPayload(header, payload); 00170 00171 //cerr << this << " switching to DONE" << endl; 00172 00173 readState = DONE; 00174 } 00175 } 00176 if (readState == CHUNK_HEADER || readState == CHUNK_BODY) { 00177 const char * current = data.c_str(); 00178 const char * end = current + data.length(); 00179 00180 //cerr << "processing " << data.length() << " characters" << endl; 00181 00182 while (current != end) { 00183 if (current >= end) 00184 throw ML::Exception("current >= end"); 00185 00186 //cerr << (end - current) << " chars left; state " 00187 // << readState << " chunkHeader = " << chunkHeader << endl; 00188 00189 if (readState == CHUNK_HEADER) { 00190 while (current != end) { 00191 char c = *current++; 00192 chunkHeader += c; 00193 if (c == '\n') break; 00194 } 00195 00196 //cerr << "chunkHeader now '" << chunkHeader << "'" << endl; 00197 00198 // Remove an extra cr/lf if there is one 00199 if (chunkHeader == "\r\n") { 00200 chunkHeader = ""; 00201 continue; 00202 } 00203 00204 //if (current == end) break; 00205 00206 //cerr << "chunkHeader now '" << chunkHeader << "'" << endl; 00207 00208 string::size_type pos = chunkHeader.find("\r\n"); 00209 if (pos == string::npos) break; 00210 current += pos + 2 - chunkHeader.length(); // skip crlf 00211 00212 chunkHeader.resize(pos); 00213 00214 // We got to the end of the chunk header... parse it 00215 string::size_type lengthPos = chunkHeader.find(';'); 00216 00217 //cerr << "chunkHeader = " << chunkHeader << endl; 00218 //cerr << "lengthPos = " << lengthPos << endl; 00219 00220 string lengthStr(chunkHeader, 0, lengthPos); 00221 //(lengthPos == string::npos 00222 // ? chunkHeader.length() : lengthPos)); 00223 //cerr << "lengthStr = " << lengthStr << endl; 00224 00225 char * endPtr = 0; 00226 chunkSize = strtol(lengthStr.c_str(), &endPtr, 16); 00227 00228 //cerr << "chunkSize = " << chunkSize << endl; 00229 00230 if (chunkSize == 0) { 00231 //readState = DONE; 00232 //return; 00233 } 00234 00235 if (*endPtr != 0) 00236 throw ML::Exception("invalid chunk length " + lengthStr); 00237 00238 readState = CHUNK_BODY; 00239 chunkBody = ""; 00240 } 00241 if (readState == CHUNK_BODY) { 00242 size_t chunkDataLeft = chunkSize - chunkBody.size(); 00243 size_t dataAvail = end - current; 00244 size_t toRead = std::min(chunkDataLeft, dataAvail); 00245 00246 chunkBody.append(current, current + toRead); 00247 current += toRead; 00248 00249 if (chunkBody.length() == chunkSize) { 00250 00251 //cerr << "got chunk " << "-------------" << endl 00252 // << chunkBody << "--------------" << endl << endl; 00253 00254 handleHttpChunk(header, chunkHeader, chunkBody); 00255 chunkBody = ""; 00256 chunkHeader = ""; 00257 readState = CHUNK_HEADER; 00258 } 00259 } 00260 } 00261 } 00262 } 00263 00264 void 00265 HttpConnectionHandler:: 00266 handleHttpChunk(const HttpHeader & header, 00267 const std::string & chunkHeader, 00268 const std::string & chunk) 00269 { 00270 //cerr << "got chunk " << chunk << endl; 00271 handleHttpPayload(header, chunk); 00272 } 00273 00274 void 00275 HttpConnectionHandler:: 00276 handleHttpPayload(const HttpHeader & header, 00277 const std::string & payload) 00278 { 00279 throw Exception("no payload handler defined"); 00280 } 00281 00282 void 00283 HttpConnectionHandler:: 00284 handleSendFinished() 00285 { 00286 } 00287 00288 void 00289 HttpConnectionHandler:: 00290 sendHttpChunk(const std::string & chunk, 00291 NextAction next, 00292 OnWriteFinished onWriteFinished) 00293 { 00294 // Add the chunk header 00295 string fullChunk = ML::format("%zx\r\n%s", chunk.length(), chunk.c_str()); 00296 send(fullChunk, next, onWriteFinished); 00297 } 00298 00299 void 00300 HttpConnectionHandler:: 00301 handleError(const std::string & message) 00302 { 00303 cerr << "error: " << message << endl; 00304 } 00305 00306 void 00307 HttpConnectionHandler:: 00308 onCleanup() 00309 { 00310 } 00311 00312 void 00313 HttpConnectionHandler:: 00314 putResponseOnWire(HttpResponse response, 00315 std::function<void ()> onSendFinished) 00316 { 00317 if (!onSendFinished) 00318 onSendFinished = [=] () 00319 { 00320 this->transport().associateWhenHandlerFinished 00321 (this->makeNewHandlerShared(), "sendFinished"); 00322 }; 00323 00324 std::string responseStr; 00325 responseStr.reserve(1024 + response.body.length()); 00326 00327 responseStr.append("HTTP/1.1 "); 00328 responseStr.append(to_string(response.responseCode)); 00329 responseStr.append(" "); 00330 responseStr.append(response.responseStatus); 00331 responseStr.append("\r\n"); 00332 00333 if (response.contentType != "") { 00334 responseStr.append("Content-Type: "); 00335 responseStr.append(response.contentType); 00336 responseStr.append("\r\n"); 00337 } 00338 responseStr.append("Content-Length: "); 00339 responseStr.append(to_string(response.body.length())); 00340 responseStr.append("\r\n"); 00341 responseStr.append("Connection: Keep-Alive\r\n"); 00342 00343 for (auto & h: response.extraHeaders) { 00344 responseStr.append(h.first); 00345 responseStr.append(": "); 00346 responseStr.append(h.second); 00347 responseStr.append("\r\n"); 00348 } 00349 00350 responseStr.append("\r\n"); 00351 responseStr.append(response.body); 00352 00353 //cerr << "sending " << responseStr << endl; 00354 00355 send(responseStr, 00356 NEXT_CONTINUE, 00357 onSendFinished); 00358 } 00359 00360 00361 /*****************************************************************************/ 00362 /* HTTP ENDPOINT */ 00363 /*****************************************************************************/ 00364 00365 HttpEndpoint:: 00366 HttpEndpoint(const std::string & name) 00367 : PassiveEndpointT<SocketTransport>(name) 00368 { 00369 handlerFactory = [] () 00370 { 00371 return std::make_shared<HttpConnectionHandler>(); 00372 }; 00373 } 00374 00375 HttpEndpoint:: 00376 ~HttpEndpoint() 00377 { 00378 } 00379 00380 template struct PassiveEndpointT<SocketTransport>; 00381 00382 } // namespace Datacratic