RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/connection_handler.h
00001 /* connection_handler.h                                            -*- C++ -*-
00002    Jeremy Barnes, 27 February 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Connection handler logic.
00006 */
00007 
00008 #ifndef __rtb__connection_handler_h__
00009 #define __rtb__connection_handler_h__
00010 
00011 #include "transport.h"
00012 #include <iostream>
00013 #include <boost/function.hpp>
00014 #include <list>
00015 #include "jml/arch/format.h"
00016 #include "jml/arch/demangle.h"
00017 #include "jml/arch/atomic_ops.h"
00018 
00019 namespace Datacratic {
00020 
00021 
00022 /*****************************************************************************/
00023 /* CONNECTION HANDLER                                                        */
00024 /*****************************************************************************/
00025 
00026 struct ConnectionHandler {
00027 
00028     static uint32_t created, destroyed;
00029 
00030     ConnectionHandler()
00031         : transport_(0), magic(0x1234)
00032     {
00033         ML::atomic_add(created, 1);
00034     }
00035 
00036     virtual ~ConnectionHandler()
00037     {
00038         ML::atomic_add(destroyed, 1);
00039 
00040         if (magic != 0x1234)
00041             throw ML::Exception("Attempt to double free connection handler");
00042         magic = 0;
00043     }
00044 
00046     virtual void onGotTransport()
00047     {
00048     }
00049 
00051     virtual void onDisassociate()
00052     {
00053     }
00054 
00056     virtual void onHandlerException(const std::string & handler,
00057                                     const std::exception & exc)
00058     {
00059         //using namespace std;
00060         //cerr << "handler " << handler << " threw exception "
00061         //     << exc.what() << endl;
00062         doError("handler " + handler + " had exception " + exc.what());
00063     }
00064 
00068     virtual void onCleanup()
00069     {
00070     }
00071 
00073     virtual void handleError(const std::string & message)
00074     {
00075         doError(message);
00076         closeWhenHandlerFinished();
00077     }
00078 
00080     virtual void handleDisconnect()
00081     {
00082         closeWhenHandlerFinished();
00083     }
00084 
00085     virtual std::string status() const
00086     {
00087         return ML::format("%p of type %s", this, ML::type_name(*this).c_str());
00088     }
00089 
00091     virtual void doError(const std::string & error) = 0;
00092 
00094     void closeConnection();
00095 
00097     ssize_t send(const char * buf, size_t len, int flags)
00098     {
00099         return transport().send(buf, len, flags);
00100     }
00101 
00103     ssize_t recv(char * buf, size_t buf_size, int flags)
00104     {
00105         return transport().recv(buf, buf_size, flags);
00106     }
00107 
00108     int getHandle() const
00109     {
00110         return transport().getHandle();
00111     }
00112 
00114     void startReading();
00115 
00117     void stopReading();
00118 
00120     void startWriting();
00121 
00123     void stopWriting();
00124 
00127     void scheduleTimerAbsolute(Date timeout,
00128                                size_t cookie = 0,
00129                                void (*freecookie) (size_t) = 0)
00130     {
00131         transport().scheduleTimerAbsolute(timeout, cookie);
00132     }
00133 
00137     void scheduleTimerRelative(double secondsFromNow,
00138                                size_t cookie = 0,
00139                                void (*freecookie) (size_t) = 0)
00140     {
00141         transport().scheduleTimerRelative(secondsFromNow, cookie);
00142     }
00143     
00145     void cancelTimer()
00146     {
00147         transport().cancelTimer();
00148     }
00149 
00150     void closeWhenHandlerFinished()
00151     {
00152         transport().closeWhenHandlerFinished();
00153     }
00154 
00155     void recycleWhenHandlerFinished()
00156     {
00157         transport().recycleWhenHandlerFinished();
00158     }
00159 
00160     /* Event callbacks.  The default implementations throw an exception.
00161     */
00162     virtual void handleInput();
00163     virtual void handleOutput();
00164     virtual void handlePeerShutdown();
00165     virtual void handleTimeout(Date time, size_t cookie);
00166 
00167     bool hasTransport() const
00168     {
00169         return transport_;
00170     }
00171 
00172     TransportBase & transport()
00173     {
00174         if (!transport_)
00175             throw ML::Exception("connection asked for transport with none "
00176                                 "set");
00177         return *transport_;
00178     }
00179 
00180     const TransportBase & transport() const
00181     {
00182         if (!transport_)
00183             throw ML::Exception("connection asked for transport with none "
00184                                 "set");
00185         return *transport_;
00186     }
00187 
00188     EndpointBase * get_endpoint() { return transport().get_endpoint(); }
00189 
00195     //virtual int handlerReturnCode() const = 0;
00196 
00198     void addActivity(const std::string & activity);
00199 
00201     void addActivityS(const char * activity);
00202 
00203     void addActivity(const char * fmt, ...);
00204 
00205     void checkMagic() const;
00206 
00210     void doAsync(const boost::function<void ()> & callback,
00211                  const char * name)
00212     {
00213         transport().doAsync(callback, name);
00214     }
00215 
00216 private:
00217     void setTransport(TransportBase * transport);
00218     TransportBase * transport_;
00219     friend class TransportBase;
00220 
00222     int closePeer()
00223     {
00224         return transport().closePeer();
00225     }
00226 
00227     int magic;
00228 };
00229 
00230 
00231 /*****************************************************************************/
00232 /* PASSIVE CONNECTION HANDLER                                                */
00233 /*****************************************************************************/
00234 
00235 struct PassiveConnectionHandler: public ConnectionHandler {
00236 
00237     PassiveConnectionHandler()
00238         : inSend(false)
00239     {
00240     }
00241 
00242     std::string error;
00243 
00244     int done;
00245     bool inSend;
00246 
00248     enum NextAction {
00249         NEXT_CLOSE,
00250         NEXT_RECYCLE,
00251         NEXT_CONTINUE
00252     };
00253 
00254     typedef boost::function<void ()> OnWriteFinished;
00255 
00256     struct WriteEntry {
00257         Date date;
00258         std::string data;
00259         OnWriteFinished onWriteFinished;
00260         NextAction next;
00261     };
00262 
00263     std::list<WriteEntry> toWrite;
00264     
00268     void send(const std::string & str,
00269               NextAction action = NEXT_CONTINUE,
00270               OnWriteFinished onWriteFinished = OnWriteFinished());
00271     
00273     virtual void handleData(const std::string & data) = 0;
00274     
00276     virtual void handleError(const std::string & message) = 0;
00277 
00281     virtual void doError(const std::string & error);
00282 
00283     /* ACE_Event_Handler callbacks */
00284     virtual void handleInput();
00285     virtual void handleOutput();
00286     virtual void handleTimeout(Date time, size_t cookie);
00287 
00288     friend class TransportBase;
00289 };
00290 
00291 } // namespace Datacratic
00292 
00293 #endif /* __rtb__connection_handler_h__ */
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator