RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/http_endpoint.cc
00001 /* http_endpoint.cc
00002    Jeremy Barnes, 18 February 2010
00003    Copyright (c) 2010 Datacratic.  All rights reserved.
00004 
00005 */
00006 
00007 #include "soa/service//http_endpoint.h"
00008 #include "jml/arch/cmp_xchg.h"
00009 #include "jml/arch/atomic_ops.h"
00010 #include "jml/utils/parse_context.h"
00011 #include "jml/utils/string_functions.h"
00012 #include "jml/utils/exc_assert.h"
00013 #include <fstream>
00014 #include <boost/make_shared.hpp>
00015 
00016 
00017 using namespace std;
00018 using namespace ML;
00019 
00020 
00021 namespace Datacratic {
00022 
00023 
00024 /*****************************************************************************/
00025 /* HTTP CONNECTION HANDLER                                                   */
00026 /*****************************************************************************/
00027 
00028 HttpConnectionHandler::
00029 HttpConnectionHandler()
00030     : readState(INVALID), httpEndpoint(0)
00031 {
00032 }
00033 
00034 void
00035 HttpConnectionHandler::
00036 onGotTransport()
00037 {
00038     this->httpEndpoint = dynamic_cast<HttpEndpoint *>(get_endpoint());
00039     
00040     readState = HEADER;
00041     startReading();
00042 }
00043 
00044 std::shared_ptr<ConnectionHandler>
00045 HttpConnectionHandler::
00046 makeNewHandlerShared()
00047 {
00048     if (!this->httpEndpoint)
00049         throw Exception("HttpConnectionHandler needs to be owned by an "
00050                         "HttpEndpoint for makeNewHandlerShared() to work");
00051     return httpEndpoint->makeNewHandler();
00052 }
00053 
00054 void
00055 HttpConnectionHandler::
00056 handleData(const std::string & data)
00057 {
00058    //cerr << "HttpConnectionHandler::handleData: got data <" << data << ">" << endl;
00059     //httpData.write(data.c_str(), data.length());
00060 
00061     if (headerText == "" && readState == HEADER)
00062         firstData = Date::now();
00063 
00064 #if 0
00065     string dataSample;
00066     dataSample.reserve(data.size() * 3 / 2);
00067     for (unsigned i = 0;  i < 100 && i < data.size();  ++i) {
00068         if (data[i] == '\n') dataSample += "\\n";
00069         else if (data[i] == '\r') dataSample += "\\r";
00070         else if (data[i] == '\0') dataSample += "\\0";
00071         else if (data[i] == '\t') dataSample += "\\t";
00072         else if (data[i] < ' ' || data[i] >= 127) dataSample += '.';
00073         else dataSample += data[i];
00074     }
00075 
00076     addActivity("got %d bytes of data: %s", (int)data.size(),
00077                 dataSample.c_str());
00078 #endif
00079 
00080     if (readState == PAYLOAD
00081         || readState == CHUNK_HEADER || readState == CHUNK_BODY) {
00082         handleHttpData(data);
00083         return;
00084     }
00085     
00086     if (readState != HEADER) {
00087         throw Exception("invalid read state %d handling data '%s' for %08xp",
00088                         readState, data.c_str(), this);
00089     }
00090     
00091     headerText += data;
00092     // The first time that we see that character sequence, it's the break
00093     // between the header and the data.
00094     string::size_type breakPos = headerText.find("\r\n\r\n");
00095     
00096 
00097     if (breakPos == string::npos)
00098     {
00099         if (headerText.size() > 16384) {
00100             throw ML::Exception("HTTP header exceeds 16kb");
00101         }
00102         //cerr << "did not find break pos " << endl;
00103         return;
00104     }
00105     // We got a header
00106     try {
00107         header.parse(headerText);
00108     } catch (...) {
00109         cerr << "problem parsing in state: " << status() << endl;
00110         throw;
00111     }
00112 
00113     //cerr << "content length = " << header.contentLength << endl;
00114 
00115     if (header.contentLength == -1 && !header.isChunked)
00116         header.contentLength = 0;
00117     //doError("we need a Content-Length");
00118     
00119     addActivityS("header parsing OK");
00120 
00121     //cerr << "done header" << endl;
00122 
00123     handleHttpHeader(header);
00124 
00125     payload = "";
00126 
00127     if (header.isChunked)
00128         readState = CHUNK_HEADER;
00129     else readState = PAYLOAD;
00130 
00131     handleHttpData(header.knownData);
00132 }
00133 
00134 void
00135 HttpConnectionHandler::
00136 handleHttpHeader(const HttpHeader & header)
00137 {
00138     //cerr << "GOT HTTP HEADER[" << header << "]" << endl;
00139 }
00140 
00141 void
00142 HttpConnectionHandler::
00143 handleHttpData(const std::string & data)
00144 {
00145     //static const char *fName = "HttpConnectionHandler::handleHttpData:";
00146     //cerr << "got HTTP data in state " << readState << " with "
00147     //     << data.length() << " characters" << endl;
00148     //cerr << "data = [" << data << "]" << endl;
00149     //cerr << endl << endl << "---------------------------" << endl;
00150     
00151     if (readState == PAYLOAD) {
00152         //cerr << "handleHttpData" << endl;
00153         if (readState != PAYLOAD)
00154             throw Exception("invalid state: expected payload");
00155 
00156         payload += data;
00157 #if 0
00158         cerr << "payload = " << payload << endl;
00159         cerr << "payload.length() = " << payload.length() << endl;
00160         cerr << "header.contentLength = " << header.contentLength << endl;
00161 #endif
00162         if (payload.length() > header.contentLength) {
00163             doError("extra data");
00164         }
00165     
00166 
00167         if (payload.length() == header.contentLength) {
00168             addActivityS("got HTTP payload");
00169             handleHttpPayload(header, payload);
00170 
00171             //cerr << this << " switching to DONE" << endl;
00172 
00173             readState = DONE;
00174         }
00175     }
00176     if (readState == CHUNK_HEADER || readState == CHUNK_BODY) {
00177         const char * current = data.c_str();
00178         const char * end = current + data.length();
00179 
00180         //cerr << "processing " << data.length() << " characters" << endl;
00181 
00182         while (current != end) {
00183             if (current >= end)
00184                 throw ML::Exception("current >= end");
00185 
00186             //cerr << (end - current) << " chars left; state "
00187             //     << readState << " chunkHeader = " << chunkHeader << endl;
00188 
00189             if (readState == CHUNK_HEADER) {
00190                 while (current != end) {
00191                     char c = *current++;
00192                     chunkHeader += c;
00193                     if (c == '\n') break;
00194                 }
00195 
00196                 //cerr << "chunkHeader now '" << chunkHeader << "'" << endl;
00197 
00198                 // Remove an extra cr/lf if there is one
00199                 if (chunkHeader == "\r\n") {
00200                     chunkHeader = "";
00201                     continue;
00202                 }
00203 
00204                 //if (current == end) break;
00205 
00206                 //cerr << "chunkHeader now '" << chunkHeader << "'" << endl;
00207 
00208                 string::size_type pos = chunkHeader.find("\r\n");
00209                 if (pos == string::npos) break;
00210                 current += pos + 2 - chunkHeader.length();  // skip crlf
00211                 
00212                 chunkHeader.resize(pos);
00213 
00214                 // We got to the end of the chunk header... parse it
00215                 string::size_type lengthPos = chunkHeader.find(';');
00216 
00217                 //cerr << "chunkHeader = " << chunkHeader << endl;
00218                 //cerr << "lengthPos = " << lengthPos << endl;
00219 
00220                 string lengthStr(chunkHeader, 0, lengthPos);
00221                 //(lengthPos == string::npos
00222                 //                  ? chunkHeader.length() : lengthPos));
00223                 //cerr << "lengthStr = " << lengthStr << endl;
00224 
00225                 char * endPtr = 0;
00226                 chunkSize = strtol(lengthStr.c_str(), &endPtr, 16);
00227 
00228                 //cerr << "chunkSize = " << chunkSize << endl;
00229 
00230                 if (chunkSize == 0) {
00231                     //readState = DONE;
00232                     //return;
00233                 }
00234 
00235                 if (*endPtr != 0)
00236                     throw ML::Exception("invalid chunk length " + lengthStr);
00237 
00238                 readState = CHUNK_BODY;
00239                 chunkBody = "";
00240             }
00241             if (readState == CHUNK_BODY) {
00242                 size_t chunkDataLeft = chunkSize - chunkBody.size();
00243                 size_t dataAvail = end - current;
00244                 size_t toRead = std::min(chunkDataLeft, dataAvail);
00245 
00246                 chunkBody.append(current, current + toRead);
00247                 current += toRead;
00248 
00249                 if (chunkBody.length() == chunkSize) {
00250 
00251                     //cerr << "got chunk " << "-------------" << endl
00252                     //     << chunkBody << "--------------" << endl << endl;
00253 
00254                     handleHttpChunk(header, chunkHeader, chunkBody);
00255                     chunkBody = "";
00256                     chunkHeader = "";
00257                     readState = CHUNK_HEADER;
00258                 }
00259             }
00260         }
00261     }
00262 }
00263 
00264 void
00265 HttpConnectionHandler::
00266 handleHttpChunk(const HttpHeader & header,
00267                 const std::string & chunkHeader,
00268                 const std::string & chunk)
00269 {
00270     //cerr << "got chunk " << chunk << endl;
00271     handleHttpPayload(header, chunk);
00272 }
00273 
00274 void
00275 HttpConnectionHandler::
00276 handleHttpPayload(const HttpHeader & header,
00277                   const std::string & payload)
00278 {
00279     throw Exception("no payload handler defined");
00280 }
00281 
00282 void
00283 HttpConnectionHandler::
00284 handleSendFinished()
00285 {
00286 }
00287 
00288 void
00289 HttpConnectionHandler::
00290 sendHttpChunk(const std::string & chunk,
00291               NextAction next,
00292               OnWriteFinished onWriteFinished)
00293 {
00294     // Add the chunk header
00295     string fullChunk = ML::format("%zx\r\n%s", chunk.length(), chunk.c_str());
00296     send(fullChunk, next, onWriteFinished);
00297 }
00298 
00299 void
00300 HttpConnectionHandler::
00301 handleError(const std::string & message)
00302 {
00303     cerr << "error: " << message << endl;
00304 }
00305 
00306 void
00307 HttpConnectionHandler::
00308 onCleanup()
00309 {
00310 }
00311 
00312 void
00313 HttpConnectionHandler::
00314 putResponseOnWire(HttpResponse response,
00315                   std::function<void ()> onSendFinished)
00316 {
00317     if (!onSendFinished)
00318         onSendFinished = [=] ()
00319             {
00320                 this->transport().associateWhenHandlerFinished
00321                     (this->makeNewHandlerShared(), "sendFinished");
00322             };
00323 
00324     std::string responseStr;
00325     responseStr.reserve(1024 + response.body.length());
00326 
00327     responseStr.append("HTTP/1.1 ");
00328     responseStr.append(to_string(response.responseCode));
00329     responseStr.append(" ");
00330     responseStr.append(response.responseStatus);
00331     responseStr.append("\r\n");
00332 
00333     if (response.contentType != "") {
00334         responseStr.append("Content-Type: ");
00335         responseStr.append(response.contentType);
00336         responseStr.append("\r\n");
00337     }
00338     responseStr.append("Content-Length: ");
00339     responseStr.append(to_string(response.body.length()));
00340     responseStr.append("\r\n");
00341     responseStr.append("Connection: Keep-Alive\r\n");
00342 
00343     for (auto & h: response.extraHeaders) {
00344         responseStr.append(h.first);
00345         responseStr.append(": ");
00346         responseStr.append(h.second);
00347         responseStr.append("\r\n");
00348     }
00349 
00350     responseStr.append("\r\n");
00351     responseStr.append(response.body);
00352 
00353     //cerr << "sending " << responseStr << endl;
00354     
00355     send(responseStr,
00356          NEXT_CONTINUE,
00357          onSendFinished);
00358 }
00359 
00360 
00361 /*****************************************************************************/
00362 /* HTTP ENDPOINT                                                             */
00363 /*****************************************************************************/
00364 
00365 HttpEndpoint::
00366 HttpEndpoint(const std::string & name)
00367     : PassiveEndpointT<SocketTransport>(name)
00368 {
00369     handlerFactory = [] ()
00370         {
00371             return std::make_shared<HttpConnectionHandler>();
00372         };
00373 }
00374 
00375 HttpEndpoint::
00376 ~HttpEndpoint()
00377 {
00378 }
00379 
00380 template struct PassiveEndpointT<SocketTransport>;
00381 
00382 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator