RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/connection_handler.cc
00001 /* connection_handler.cc
00002    Jeremy Barnes, 27 February 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Implementation of connection handler.
00006 */
00007 
00008 #include "connection_handler.h"
00009 #include "jml/arch/demangle.h"
00010 #include "jml/arch/backtrace.h"
00011 #include "jml/utils/guard.h"
00012 #include "soa/service//endpoint.h"
00013 
00014 
00015 using namespace std;
00016 using namespace ML;
00017 
00018 
00019 namespace Datacratic {
00020 
00021 
00022 /*****************************************************************************/
00023 /* CONNECTION HANDLER                                                        */
00024 /*****************************************************************************/
00025 
00026 uint32_t ConnectionHandler::created = 0;
00027 uint32_t ConnectionHandler::destroyed = 0;
00028 
00029 void
00030 ConnectionHandler::
00031 handleInput()
00032 {
00033     throw Exception("ConnectionHandler of type %s needs to override "
00034                     "handle_input()", type_name(*this).c_str());
00035 }
00036 
00037 void
00038 ConnectionHandler::
00039 handleOutput()
00040 {
00041     throw Exception("ConnectionHandler of type %s needs to override "
00042                     "handle_output()", type_name(*this).c_str());
00043 }
00044 
00045 void
00046 ConnectionHandler::
00047 handlePeerShutdown()
00048 {
00049     // By default we close...
00050     closeWhenHandlerFinished();
00051 }
00052 
00053 void
00054 ConnectionHandler::
00055 handleTimeout(Date time, size_t)
00056 {
00057     throw Exception("ConnectionHandler of type %s needs to override "
00058                     "handle_timeout()", type_name(*this).c_str());
00059 }
00060 
00061 void
00062 ConnectionHandler::
00063 closeConnection()
00064 {
00065     int code = closePeer();
00066     if (code == -1)
00067         doError("close: " + string(strerror(errno)));
00068 }
00069 
00070 void
00071 ConnectionHandler::
00072 startReading()
00073 {
00074     addActivityS("startReading");
00075     transport().startReading();
00076 }
00077 
00078 void
00079 ConnectionHandler::
00080 stopReading()
00081 {
00082     addActivityS("stopReading");
00083     transport().stopReading();
00084 }
00085 
00086 void
00087 ConnectionHandler::
00088 startWriting()
00089 {
00090     addActivityS("startWriting");
00091     transport().startWriting();
00092 }
00093 
00094 void
00095 ConnectionHandler::
00096 stopWriting()
00097 {
00098     addActivityS("stopWriting");
00099     transport().stopWriting();
00100 }
00101 
00102 void
00103 ConnectionHandler::
00104 setTransport(TransportBase * transport)
00105 {
00106     //cerr << "setTransport : transport = " << transport << " transport_ = "
00107     //     << transport_ << endl;
00108 
00109     if (transport_)
00110         throw Exception("can't switch transports from %08p to %08p",
00111                         transport_, transport);
00112     transport_ = transport;
00113 }
00114 
00115 void
00116 ConnectionHandler::
00117 addActivity(const std::string & activity)
00118 {
00119     if (!transport_ || !transport().debug) return;
00120     transport().addActivity(activity);
00121 }
00122 
00123 void
00124 ConnectionHandler::
00125 addActivityS(const char * act)
00126 {
00127     if (!transport_ || !transport().debug) return;
00128     transport().addActivity(act);
00129 }
00130 
00131 void
00132 ConnectionHandler::
00133 addActivity(const char * fmt, ...)
00134 {
00135     if (!transport_ || !transport().debug) return;
00136     va_list ap;
00137     va_start(ap, fmt);
00138     ML::Call_Guard cleanupAp([&] () { va_end(ap); });
00139     transport().addActivity(ML::vformat(fmt, ap));
00140 }
00141 
00142 void
00143 ConnectionHandler::
00144 checkMagic() const
00145 {
00146     if (magic != 0x1234)
00147         throw Exception("attempt to use deleted ConnectionHandler");
00148 }
00149 
00150 
00151 /*****************************************************************************/
00152 /* PASSIVE CONNECTION HANDLER                                                */
00153 /*****************************************************************************/
00154 
00155 void
00156 PassiveConnectionHandler::
00157 doError(const std::string & error)
00158 {
00159     this->error = error;
00160     handleError(error);
00161     closeWhenHandlerFinished();
00162 }
00163 
00164 void
00165 PassiveConnectionHandler::
00166 handleInput()
00167 {
00168     //cerr << "handle_input on " << fd << " for handler " << ML::type_name(*this)
00169     //<< endl;
00170     transport().assertLockedByThisThread();
00171 
00172     size_t chunk_size = 8192;
00173 
00174     string buf;
00175     size_t done = 0;
00176 
00177     ssize_t bytes_read = 0;
00178 
00179     bool disconnected = false;
00180 
00181     do {
00182         if (buf.length() - done < chunk_size)
00183             buf.resize(done + chunk_size);
00184 
00185         errno = 0;
00186 
00187         bytes_read = recv(&buf[done], chunk_size, MSG_DONTWAIT);
00188 
00189         //int err = errno;
00190 
00191         //cerr << "bytes_read = " << bytes_read << " on " << get_handle()
00192         //     << " errno " << strerror(errno) << endl;
00193 
00194         if (bytes_read == 0) {
00195             // Disconnect
00196             addActivity("readDisconnect");
00197             //cerr << "**** DISCONNECT ON TRANSPORT " << &transport() << endl;
00198             disconnected = true;
00199             break;
00200         }
00201         if (bytes_read == -1) {
00202             // Error, interrupted or no data
00203             if (errno == EINTR) continue; // interrupted
00204             if (errno == EAGAIN || errno == EWOULDBLOCK) break; // no data
00205 
00206             if (done) handleData(buf);
00207             doError("read on " + get_endpoint()->name() + ": " + string(strerror(errno)));
00208             return;
00209         }
00210         if (bytes_read > chunk_size)
00211             throw Exception("too many bytes read");
00212         done += bytes_read;
00213     } while (bytes_read > 0);
00214 
00215     try {
00216         if (done > buf.length())
00217             throw Exception("buffer is too long");
00218 
00219         buf.resize(done);
00220 
00221         if (done) handleData(buf);
00222     } catch (...) {
00223         if (disconnected) {
00224             handleDisconnect();
00225         }
00226         throw;
00227     }
00228 
00229     if (disconnected) {
00230         handleDisconnect();
00231     }
00232 }
00233 
00234 void
00235 PassiveConnectionHandler::
00236 handleOutput()
00237 {
00238     transport().assertLockedByThisThread();
00239         
00240     if (toWrite.empty()) {
00241 #if 0
00242         // For some reason, we sometimes get a bogus call here.  Deal with
00243         // it.
00244         //cerr << "BOGUS handle_output" << endl;
00245         //transport().activities.dump();
00246         reactor()->remove_handler(get_handler(),
00247                                   ACE_Event_Handler::WRITE_MASK
00248                                   | ACE_Event_Handler::DONT_CALL);
00249         //stopWriting();
00250         return;
00251 #endif
00252         transport().activities.dump();
00253         
00254         throw Exception("handle_output with empty buffer");
00255     }
00256 
00257     //double elapsed = Date::now().secondsSince(toWrite.front().date);
00258     //cerr << "output: elapsed = " << format("%.1fms", elapsed * 1000)
00259     //     << endl;
00260 
00261     const string & str = toWrite.front().data;
00262 
00263     //cerr << "writing " << str << endl;
00264 
00265     int len = str.length();
00266 
00267     if (done < 0 || (done >= len && len != 0))
00268         throw Exception("invalid done");
00269 
00270     /* Send data */
00271     ssize_t written
00272         = ConnectionHandler::
00273         send(str.c_str() + done, len - done, MSG_NOSIGNAL | MSG_DONTWAIT);
00274 
00275     if (written == -1 && errno == EWOULDBLOCK) {
00276         cerr << "write would block" << endl;
00277         return;
00278     }    
00279 
00280     if (written == -1) {
00281         doError("writing: " + string(strerror(errno)));
00282         return;
00283     }
00284             
00285     done += written;
00286         
00287     if (done == len) {
00288         //cerr << "SEND FINISHED " << str << endl;
00289 
00290         WriteEntry entry = toWrite.front();
00291         if (entry.onWriteFinished)
00292             entry.onWriteFinished();
00293 
00294         toWrite.pop_front();
00295         done = 0;
00296         
00297         if (toWrite.empty())
00298             stopWriting();
00299 
00300         if (entry.next == NEXT_CONTINUE)
00301             return;
00302 
00303         if (!toWrite.empty())
00304             throw Exception("CLOSE or RECYCLE with data to write");
00305 
00306         if (entry.next == NEXT_CLOSE) {
00307             closeWhenHandlerFinished();
00308         }
00309         else if (entry.next == NEXT_RECYCLE) {
00310             recycleWhenHandlerFinished();
00311         }
00312         else throw Exception("invalid next action");
00313     }
00314 }
00315 
00316 void
00317 PassiveConnectionHandler::
00318 send(const std::string & str,
00319      NextAction next,
00320      OnWriteFinished onWriteFinished)
00321 {
00322     //cerr << "message being sent<" << str << "> on handle" << transport().getHandle() <<  endl;
00323     transport().assertLockedByThisThread();
00324 
00325     WriteEntry entry;
00326     entry.date = Date::now();
00327     entry.data = str;
00328     entry.next = next;
00329     entry.onWriteFinished = onWriteFinished;
00330 
00331     //if (str.find("POST") != 0)
00332     //    cerr << "SEND " << str << endl;
00333 
00334     toWrite.push_back(entry);
00335 
00336     if (toWrite.size() == 1) {
00337         done = 0;
00338 
00339         if (transport().getHandle() == -1) {
00340             doError("send: connection closed by peer");
00341             return;
00342         }
00343         
00344         startWriting();
00345     }
00346 
00347     // Don't allow nested invocations of handle_output
00348     if (inSend) return;
00349 
00350     inSend = true;
00351     Call_Guard clearInSend([&] () { inSend = false; });
00352 
00353     // Try a preemptive (non-blocking) send to avoid going through the
00354     // reactor
00355     handleOutput();
00356 }
00357 
00358 void
00359 PassiveConnectionHandler::
00360 handleTimeout(Date time, size_t)
00361 {
00362     cerr << status() << endl;
00363     throw Exception("timeout with no handler set");
00364 }
00365 
00366 } // namespace Datacratic
00367 
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator