![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* connection_handler.cc 00002 Jeremy Barnes, 27 February 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Implementation of connection handler. 00006 */ 00007 00008 #include "connection_handler.h" 00009 #include "jml/arch/demangle.h" 00010 #include "jml/arch/backtrace.h" 00011 #include "jml/utils/guard.h" 00012 #include "soa/service//endpoint.h" 00013 00014 00015 using namespace std; 00016 using namespace ML; 00017 00018 00019 namespace Datacratic { 00020 00021 00022 /*****************************************************************************/ 00023 /* CONNECTION HANDLER */ 00024 /*****************************************************************************/ 00025 00026 uint32_t ConnectionHandler::created = 0; 00027 uint32_t ConnectionHandler::destroyed = 0; 00028 00029 void 00030 ConnectionHandler:: 00031 handleInput() 00032 { 00033 throw Exception("ConnectionHandler of type %s needs to override " 00034 "handle_input()", type_name(*this).c_str()); 00035 } 00036 00037 void 00038 ConnectionHandler:: 00039 handleOutput() 00040 { 00041 throw Exception("ConnectionHandler of type %s needs to override " 00042 "handle_output()", type_name(*this).c_str()); 00043 } 00044 00045 void 00046 ConnectionHandler:: 00047 handlePeerShutdown() 00048 { 00049 // By default we close... 00050 closeWhenHandlerFinished(); 00051 } 00052 00053 void 00054 ConnectionHandler:: 00055 handleTimeout(Date time, size_t) 00056 { 00057 throw Exception("ConnectionHandler of type %s needs to override " 00058 "handle_timeout()", type_name(*this).c_str()); 00059 } 00060 00061 void 00062 ConnectionHandler:: 00063 closeConnection() 00064 { 00065 int code = closePeer(); 00066 if (code == -1) 00067 doError("close: " + string(strerror(errno))); 00068 } 00069 00070 void 00071 ConnectionHandler:: 00072 startReading() 00073 { 00074 addActivityS("startReading"); 00075 transport().startReading(); 00076 } 00077 00078 void 00079 ConnectionHandler:: 00080 stopReading() 00081 { 00082 addActivityS("stopReading"); 00083 transport().stopReading(); 00084 } 00085 00086 void 00087 ConnectionHandler:: 00088 startWriting() 00089 { 00090 addActivityS("startWriting"); 00091 transport().startWriting(); 00092 } 00093 00094 void 00095 ConnectionHandler:: 00096 stopWriting() 00097 { 00098 addActivityS("stopWriting"); 00099 transport().stopWriting(); 00100 } 00101 00102 void 00103 ConnectionHandler:: 00104 setTransport(TransportBase * transport) 00105 { 00106 //cerr << "setTransport : transport = " << transport << " transport_ = " 00107 // << transport_ << endl; 00108 00109 if (transport_) 00110 throw Exception("can't switch transports from %08p to %08p", 00111 transport_, transport); 00112 transport_ = transport; 00113 } 00114 00115 void 00116 ConnectionHandler:: 00117 addActivity(const std::string & activity) 00118 { 00119 if (!transport_ || !transport().debug) return; 00120 transport().addActivity(activity); 00121 } 00122 00123 void 00124 ConnectionHandler:: 00125 addActivityS(const char * act) 00126 { 00127 if (!transport_ || !transport().debug) return; 00128 transport().addActivity(act); 00129 } 00130 00131 void 00132 ConnectionHandler:: 00133 addActivity(const char * fmt, ...) 00134 { 00135 if (!transport_ || !transport().debug) return; 00136 va_list ap; 00137 va_start(ap, fmt); 00138 ML::Call_Guard cleanupAp([&] () { va_end(ap); }); 00139 transport().addActivity(ML::vformat(fmt, ap)); 00140 } 00141 00142 void 00143 ConnectionHandler:: 00144 checkMagic() const 00145 { 00146 if (magic != 0x1234) 00147 throw Exception("attempt to use deleted ConnectionHandler"); 00148 } 00149 00150 00151 /*****************************************************************************/ 00152 /* PASSIVE CONNECTION HANDLER */ 00153 /*****************************************************************************/ 00154 00155 void 00156 PassiveConnectionHandler:: 00157 doError(const std::string & error) 00158 { 00159 this->error = error; 00160 handleError(error); 00161 closeWhenHandlerFinished(); 00162 } 00163 00164 void 00165 PassiveConnectionHandler:: 00166 handleInput() 00167 { 00168 //cerr << "handle_input on " << fd << " for handler " << ML::type_name(*this) 00169 //<< endl; 00170 transport().assertLockedByThisThread(); 00171 00172 size_t chunk_size = 8192; 00173 00174 string buf; 00175 size_t done = 0; 00176 00177 ssize_t bytes_read = 0; 00178 00179 bool disconnected = false; 00180 00181 do { 00182 if (buf.length() - done < chunk_size) 00183 buf.resize(done + chunk_size); 00184 00185 errno = 0; 00186 00187 bytes_read = recv(&buf[done], chunk_size, MSG_DONTWAIT); 00188 00189 //int err = errno; 00190 00191 //cerr << "bytes_read = " << bytes_read << " on " << get_handle() 00192 // << " errno " << strerror(errno) << endl; 00193 00194 if (bytes_read == 0) { 00195 // Disconnect 00196 addActivity("readDisconnect"); 00197 //cerr << "**** DISCONNECT ON TRANSPORT " << &transport() << endl; 00198 disconnected = true; 00199 break; 00200 } 00201 if (bytes_read == -1) { 00202 // Error, interrupted or no data 00203 if (errno == EINTR) continue; // interrupted 00204 if (errno == EAGAIN || errno == EWOULDBLOCK) break; // no data 00205 00206 if (done) handleData(buf); 00207 doError("read on " + get_endpoint()->name() + ": " + string(strerror(errno))); 00208 return; 00209 } 00210 if (bytes_read > chunk_size) 00211 throw Exception("too many bytes read"); 00212 done += bytes_read; 00213 } while (bytes_read > 0); 00214 00215 try { 00216 if (done > buf.length()) 00217 throw Exception("buffer is too long"); 00218 00219 buf.resize(done); 00220 00221 if (done) handleData(buf); 00222 } catch (...) { 00223 if (disconnected) { 00224 handleDisconnect(); 00225 } 00226 throw; 00227 } 00228 00229 if (disconnected) { 00230 handleDisconnect(); 00231 } 00232 } 00233 00234 void 00235 PassiveConnectionHandler:: 00236 handleOutput() 00237 { 00238 transport().assertLockedByThisThread(); 00239 00240 if (toWrite.empty()) { 00241 #if 0 00242 // For some reason, we sometimes get a bogus call here. Deal with 00243 // it. 00244 //cerr << "BOGUS handle_output" << endl; 00245 //transport().activities.dump(); 00246 reactor()->remove_handler(get_handler(), 00247 ACE_Event_Handler::WRITE_MASK 00248 | ACE_Event_Handler::DONT_CALL); 00249 //stopWriting(); 00250 return; 00251 #endif 00252 transport().activities.dump(); 00253 00254 throw Exception("handle_output with empty buffer"); 00255 } 00256 00257 //double elapsed = Date::now().secondsSince(toWrite.front().date); 00258 //cerr << "output: elapsed = " << format("%.1fms", elapsed * 1000) 00259 // << endl; 00260 00261 const string & str = toWrite.front().data; 00262 00263 //cerr << "writing " << str << endl; 00264 00265 int len = str.length(); 00266 00267 if (done < 0 || (done >= len && len != 0)) 00268 throw Exception("invalid done"); 00269 00270 /* Send data */ 00271 ssize_t written 00272 = ConnectionHandler:: 00273 send(str.c_str() + done, len - done, MSG_NOSIGNAL | MSG_DONTWAIT); 00274 00275 if (written == -1 && errno == EWOULDBLOCK) { 00276 cerr << "write would block" << endl; 00277 return; 00278 } 00279 00280 if (written == -1) { 00281 doError("writing: " + string(strerror(errno))); 00282 return; 00283 } 00284 00285 done += written; 00286 00287 if (done == len) { 00288 //cerr << "SEND FINISHED " << str << endl; 00289 00290 WriteEntry entry = toWrite.front(); 00291 if (entry.onWriteFinished) 00292 entry.onWriteFinished(); 00293 00294 toWrite.pop_front(); 00295 done = 0; 00296 00297 if (toWrite.empty()) 00298 stopWriting(); 00299 00300 if (entry.next == NEXT_CONTINUE) 00301 return; 00302 00303 if (!toWrite.empty()) 00304 throw Exception("CLOSE or RECYCLE with data to write"); 00305 00306 if (entry.next == NEXT_CLOSE) { 00307 closeWhenHandlerFinished(); 00308 } 00309 else if (entry.next == NEXT_RECYCLE) { 00310 recycleWhenHandlerFinished(); 00311 } 00312 else throw Exception("invalid next action"); 00313 } 00314 } 00315 00316 void 00317 PassiveConnectionHandler:: 00318 send(const std::string & str, 00319 NextAction next, 00320 OnWriteFinished onWriteFinished) 00321 { 00322 //cerr << "message being sent<" << str << "> on handle" << transport().getHandle() << endl; 00323 transport().assertLockedByThisThread(); 00324 00325 WriteEntry entry; 00326 entry.date = Date::now(); 00327 entry.data = str; 00328 entry.next = next; 00329 entry.onWriteFinished = onWriteFinished; 00330 00331 //if (str.find("POST") != 0) 00332 // cerr << "SEND " << str << endl; 00333 00334 toWrite.push_back(entry); 00335 00336 if (toWrite.size() == 1) { 00337 done = 0; 00338 00339 if (transport().getHandle() == -1) { 00340 doError("send: connection closed by peer"); 00341 return; 00342 } 00343 00344 startWriting(); 00345 } 00346 00347 // Don't allow nested invocations of handle_output 00348 if (inSend) return; 00349 00350 inSend = true; 00351 Call_Guard clearInSend([&] () { inSend = false; }); 00352 00353 // Try a preemptive (non-blocking) send to avoid going through the 00354 // reactor 00355 handleOutput(); 00356 } 00357 00358 void 00359 PassiveConnectionHandler:: 00360 handleTimeout(Date time, size_t) 00361 { 00362 cerr << status() << endl; 00363 throw Exception("timeout with no handler set"); 00364 } 00365 00366 } // namespace Datacratic 00367