RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/endpoint.h
00001 /* endpoint.h                                                      -*- C++ -*-
00002    Jeremy Barnes, 21 February 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Generic endpoint; will be subclassed for a particular connection type.
00006 */
00007 
00008 #ifndef __rtb__endpoint_h__
00009 #define __rtb__endpoint_h__
00010 
00011 #include <ace/Synch.h>
00012 #include <ace/Guard_T.h>
00013 #include <set>
00014 #include <boost/function.hpp>
00015 #include <boost/thread/thread.hpp>
00016 #include <iostream>
00017 #include "jml/arch/exception.h"
00018 #include "jml/arch/backtrace.h"
00019 #include "jml/utils/smart_ptr_utils.h"
00020 #include "jml/arch/wakeup_fd.h"
00021 #include "transport.h"
00022 #include "connection_handler.h"
00023 #include "soa/service/epoller.h"
00024 #include <map>
00025 
00026 
00027 namespace Datacratic {
00028 
00029 
00030 struct ConnectionHandler;
00031 struct EndpointBase;
00032 struct PassiveEndpoint;
00033 
00034 /*****************************************************************************/
00035 /* ENDPOINT BASE                                                             */
00036 /*****************************************************************************/
00037 
00038 struct EndpointBase : public Epoller {
00039 
00040     EndpointBase(const std::string & name);
00041 
00042     virtual ~EndpointBase();
00043 
00053     void useThisThread();
00054 
00059     void shutdown();
00060 
00062     virtual std::string hostname() const = 0;
00063 
00065     virtual int port() const = 0;
00066 
00068     boost::function<bool ()> onCheckFinished;
00069 
00071     void sleepUntilIdle() const;
00072 
00073     int threadsActive() const { return threadsActive_; }
00074 
00076     virtual void dumpState() const;
00077     
00079     virtual int numConnections() const;
00080 
00082     virtual std::map<std::string, int> numConnectionsByHost() const;
00083 
00087     typedef boost::function<void (TransportBase *)> OnTransportEvent;
00088     OnTransportEvent onTransportOpen, onTransportClose;
00089 
00090     const std::string & name() const { return name_; }
00091 
00093     void makeRealTime(int priority = 1);
00094 
00099     virtual void spinup(int num_threads, bool synchronous);
00100 
00101 protected:
00102 
00104     bool checkFinished() const
00105     {
00106         if (onCheckFinished) return onCheckFinished();
00107         return false;
00108     }
00109 
00111     virtual void
00112     associateHandler(const std::shared_ptr<TransportBase> & transport)
00113     {
00114         if (!transport->hasSlave())
00115             throw ML::Exception("either makeNewTransport or associateHandler"
00116                                 "need to be overridden to make a handler");
00117     }
00118     
00119     virtual void closePeer() = 0;
00120 
00121     struct SPLess {
00122         template<typename SP>
00123         bool operator () (const SP & sp1, const SP & sp2) const
00124         {
00125             return sp1.get() < sp2.get();
00126         }
00127     };
00128 
00129     typedef std::set<std::shared_ptr<TransportBase>, SPLess> Connections;
00130 
00135     Connections alive;
00136 
00138     virtual void
00139     notifyNewTransport(const std::shared_ptr<TransportBase> & transport);
00140 
00142     virtual void
00143     notifyCloseTransport(const std::shared_ptr<TransportBase> & transport);
00144 
00148     virtual void
00149     notifyRecycleTransport(const std::shared_ptr<TransportBase> & transport);
00150 
00154     virtual void restartPolling(TransportBase * transport);
00155 
00157     virtual void startPolling(TransportBase * transport);
00158 
00160     virtual void stopPolling(TransportBase * transport);
00161 
00165     void doAsync(const std::shared_ptr<TransportBase> & transport,
00166                  const boost::function<void ()> & callback,
00167                  const char * nameOfCallback);
00168 
00169     typedef ACE_Recursive_Thread_Mutex Lock;
00170     typedef ACE_Guard<Lock> Guard;
00171     
00172     mutable Lock lock;
00173 
00175     mutable ACE_Semaphore idle;
00176     
00178     mutable bool modifyIdle;
00179 
00180 private:
00181     std::string name_;
00182     std::unique_ptr<boost::thread_group> eventThreads;
00183     std::vector<boost::thread *> eventThreadList;
00184     int threadsActive_;
00185 
00186     friend class TransportBase;
00187     friend class ConnectionHandler;
00188     template<typename Transport> friend class ConnectorT;
00189 
00190     /* Number of active FDs in items */
00191     int numTransports;
00192 
00193     /* FD we can use to wake up the event loop */
00194     ML::Wakeup_Fd wakeup;
00195 
00196     /* Are we shutting down? */
00197     bool shutdown_;
00198 
00199     std::map<std::string, int> numTransportsByHost;
00200 
00202     void runEventThread(int threadNum, int numThreads);
00203 
00205     bool handleEpollEvent(epoll_event & event);
00206 };
00207 
00208 } // namespace Datacratic
00209 
00210 #endif /* __rtb__endpoint_h__ */
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator