![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* connection_handler.h -*- C++ -*- 00002 Jeremy Barnes, 27 February 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Connection handler logic. 00006 */ 00007 00008 #ifndef __rtb__connection_handler_h__ 00009 #define __rtb__connection_handler_h__ 00010 00011 #include "transport.h" 00012 #include <iostream> 00013 #include <boost/function.hpp> 00014 #include <list> 00015 #include "jml/arch/format.h" 00016 #include "jml/arch/demangle.h" 00017 #include "jml/arch/atomic_ops.h" 00018 00019 namespace Datacratic { 00020 00021 00022 /*****************************************************************************/ 00023 /* CONNECTION HANDLER */ 00024 /*****************************************************************************/ 00025 00026 struct ConnectionHandler { 00027 00028 static uint32_t created, destroyed; 00029 00030 ConnectionHandler() 00031 : transport_(0), magic(0x1234) 00032 { 00033 ML::atomic_add(created, 1); 00034 } 00035 00036 virtual ~ConnectionHandler() 00037 { 00038 ML::atomic_add(destroyed, 1); 00039 00040 if (magic != 0x1234) 00041 throw ML::Exception("Attempt to double free connection handler"); 00042 magic = 0; 00043 } 00044 00046 virtual void onGotTransport() 00047 { 00048 } 00049 00051 virtual void onDisassociate() 00052 { 00053 } 00054 00056 virtual void onHandlerException(const std::string & handler, 00057 const std::exception & exc) 00058 { 00059 //using namespace std; 00060 //cerr << "handler " << handler << " threw exception " 00061 // << exc.what() << endl; 00062 doError("handler " + handler + " had exception " + exc.what()); 00063 } 00064 00068 virtual void onCleanup() 00069 { 00070 } 00071 00073 virtual void handleError(const std::string & message) 00074 { 00075 doError(message); 00076 closeWhenHandlerFinished(); 00077 } 00078 00080 virtual void handleDisconnect() 00081 { 00082 closeWhenHandlerFinished(); 00083 } 00084 00085 virtual std::string status() const 00086 { 00087 return ML::format("%p of type %s", this, ML::type_name(*this).c_str()); 00088 } 00089 00091 virtual void doError(const std::string & error) = 0; 00092 00094 void closeConnection(); 00095 00097 ssize_t send(const char * buf, size_t len, int flags) 00098 { 00099 return transport().send(buf, len, flags); 00100 } 00101 00103 ssize_t recv(char * buf, size_t buf_size, int flags) 00104 { 00105 return transport().recv(buf, buf_size, flags); 00106 } 00107 00108 int getHandle() const 00109 { 00110 return transport().getHandle(); 00111 } 00112 00114 void startReading(); 00115 00117 void stopReading(); 00118 00120 void startWriting(); 00121 00123 void stopWriting(); 00124 00127 void scheduleTimerAbsolute(Date timeout, 00128 size_t cookie = 0, 00129 void (*freecookie) (size_t) = 0) 00130 { 00131 transport().scheduleTimerAbsolute(timeout, cookie); 00132 } 00133 00137 void scheduleTimerRelative(double secondsFromNow, 00138 size_t cookie = 0, 00139 void (*freecookie) (size_t) = 0) 00140 { 00141 transport().scheduleTimerRelative(secondsFromNow, cookie); 00142 } 00143 00145 void cancelTimer() 00146 { 00147 transport().cancelTimer(); 00148 } 00149 00150 void closeWhenHandlerFinished() 00151 { 00152 transport().closeWhenHandlerFinished(); 00153 } 00154 00155 void recycleWhenHandlerFinished() 00156 { 00157 transport().recycleWhenHandlerFinished(); 00158 } 00159 00160 /* Event callbacks. The default implementations throw an exception. 00161 */ 00162 virtual void handleInput(); 00163 virtual void handleOutput(); 00164 virtual void handlePeerShutdown(); 00165 virtual void handleTimeout(Date time, size_t cookie); 00166 00167 bool hasTransport() const 00168 { 00169 return transport_; 00170 } 00171 00172 TransportBase & transport() 00173 { 00174 if (!transport_) 00175 throw ML::Exception("connection asked for transport with none " 00176 "set"); 00177 return *transport_; 00178 } 00179 00180 const TransportBase & transport() const 00181 { 00182 if (!transport_) 00183 throw ML::Exception("connection asked for transport with none " 00184 "set"); 00185 return *transport_; 00186 } 00187 00188 EndpointBase * get_endpoint() { return transport().get_endpoint(); } 00189 00195 //virtual int handlerReturnCode() const = 0; 00196 00198 void addActivity(const std::string & activity); 00199 00201 void addActivityS(const char * activity); 00202 00203 void addActivity(const char * fmt, ...); 00204 00205 void checkMagic() const; 00206 00210 void doAsync(const boost::function<void ()> & callback, 00211 const char * name) 00212 { 00213 transport().doAsync(callback, name); 00214 } 00215 00216 private: 00217 void setTransport(TransportBase * transport); 00218 TransportBase * transport_; 00219 friend class TransportBase; 00220 00222 int closePeer() 00223 { 00224 return transport().closePeer(); 00225 } 00226 00227 int magic; 00228 }; 00229 00230 00231 /*****************************************************************************/ 00232 /* PASSIVE CONNECTION HANDLER */ 00233 /*****************************************************************************/ 00234 00235 struct PassiveConnectionHandler: public ConnectionHandler { 00236 00237 PassiveConnectionHandler() 00238 : inSend(false) 00239 { 00240 } 00241 00242 std::string error; 00243 00244 int done; 00245 bool inSend; 00246 00248 enum NextAction { 00249 NEXT_CLOSE, 00250 NEXT_RECYCLE, 00251 NEXT_CONTINUE 00252 }; 00253 00254 typedef boost::function<void ()> OnWriteFinished; 00255 00256 struct WriteEntry { 00257 Date date; 00258 std::string data; 00259 OnWriteFinished onWriteFinished; 00260 NextAction next; 00261 }; 00262 00263 std::list<WriteEntry> toWrite; 00264 00268 void send(const std::string & str, 00269 NextAction action = NEXT_CONTINUE, 00270 OnWriteFinished onWriteFinished = OnWriteFinished()); 00271 00273 virtual void handleData(const std::string & data) = 0; 00274 00276 virtual void handleError(const std::string & message) = 0; 00277 00281 virtual void doError(const std::string & error); 00282 00283 /* ACE_Event_Handler callbacks */ 00284 virtual void handleInput(); 00285 virtual void handleOutput(); 00286 virtual void handleTimeout(Date time, size_t cookie); 00287 00288 friend class TransportBase; 00289 }; 00290 00291 } // namespace Datacratic 00292 00293 #endif /* __rtb__connection_handler_h__ */
1.7.6.1