RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* endpoint.h -*- C++ -*- 00002 Jeremy Barnes, 21 February 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Generic endpoint; will be subclassed for a particular connection type. 00006 */ 00007 00008 #ifndef __rtb__endpoint_h__ 00009 #define __rtb__endpoint_h__ 00010 00011 #include <ace/Synch.h> 00012 #include <ace/Guard_T.h> 00013 #include <set> 00014 #include <boost/function.hpp> 00015 #include <boost/thread/thread.hpp> 00016 #include <iostream> 00017 #include "jml/arch/exception.h" 00018 #include "jml/arch/backtrace.h" 00019 #include "jml/utils/smart_ptr_utils.h" 00020 #include "jml/arch/wakeup_fd.h" 00021 #include "transport.h" 00022 #include "connection_handler.h" 00023 #include "soa/service/epoller.h" 00024 #include <map> 00025 00026 00027 namespace Datacratic { 00028 00029 00030 struct ConnectionHandler; 00031 struct EndpointBase; 00032 struct PassiveEndpoint; 00033 00034 /*****************************************************************************/ 00035 /* ENDPOINT BASE */ 00036 /*****************************************************************************/ 00037 00038 struct EndpointBase : public Epoller { 00039 00040 EndpointBase(const std::string & name); 00041 00042 virtual ~EndpointBase(); 00043 00053 void useThisThread(); 00054 00059 void shutdown(); 00060 00062 virtual std::string hostname() const = 0; 00063 00065 virtual int port() const = 0; 00066 00068 boost::function<bool ()> onCheckFinished; 00069 00071 void sleepUntilIdle() const; 00072 00073 int threadsActive() const { return threadsActive_; } 00074 00076 virtual void dumpState() const; 00077 00079 virtual int numConnections() const; 00080 00082 virtual std::map<std::string, int> numConnectionsByHost() const; 00083 00087 typedef boost::function<void (TransportBase *)> OnTransportEvent; 00088 OnTransportEvent onTransportOpen, onTransportClose; 00089 00090 const std::string & name() const { return name_; } 00091 00093 void makeRealTime(int priority = 1); 00094 00099 virtual void spinup(int num_threads, bool synchronous); 00100 00101 protected: 00102 00104 bool checkFinished() const 00105 { 00106 if (onCheckFinished) return onCheckFinished(); 00107 return false; 00108 } 00109 00111 virtual void 00112 associateHandler(const std::shared_ptr<TransportBase> & transport) 00113 { 00114 if (!transport->hasSlave()) 00115 throw ML::Exception("either makeNewTransport or associateHandler" 00116 "need to be overridden to make a handler"); 00117 } 00118 00119 virtual void closePeer() = 0; 00120 00121 struct SPLess { 00122 template<typename SP> 00123 bool operator () (const SP & sp1, const SP & sp2) const 00124 { 00125 return sp1.get() < sp2.get(); 00126 } 00127 }; 00128 00129 typedef std::set<std::shared_ptr<TransportBase>, SPLess> Connections; 00130 00135 Connections alive; 00136 00138 virtual void 00139 notifyNewTransport(const std::shared_ptr<TransportBase> & transport); 00140 00142 virtual void 00143 notifyCloseTransport(const std::shared_ptr<TransportBase> & transport); 00144 00148 virtual void 00149 notifyRecycleTransport(const std::shared_ptr<TransportBase> & transport); 00150 00154 virtual void restartPolling(TransportBase * transport); 00155 00157 virtual void startPolling(TransportBase * transport); 00158 00160 virtual void stopPolling(TransportBase * transport); 00161 00165 void doAsync(const std::shared_ptr<TransportBase> & transport, 00166 const boost::function<void ()> & callback, 00167 const char * nameOfCallback); 00168 00169 typedef ACE_Recursive_Thread_Mutex Lock; 00170 typedef ACE_Guard<Lock> Guard; 00171 00172 mutable Lock lock; 00173 00175 mutable ACE_Semaphore idle; 00176 00178 mutable bool modifyIdle; 00179 00180 private: 00181 std::string name_; 00182 std::unique_ptr<boost::thread_group> eventThreads; 00183 std::vector<boost::thread *> eventThreadList; 00184 int threadsActive_; 00185 00186 friend class TransportBase; 00187 friend class ConnectionHandler; 00188 template<typename Transport> friend class ConnectorT; 00189 00190 /* Number of active FDs in items */ 00191 int numTransports; 00192 00193 /* FD we can use to wake up the event loop */ 00194 ML::Wakeup_Fd wakeup; 00195 00196 /* Are we shutting down? */ 00197 bool shutdown_; 00198 00199 std::map<std::string, int> numTransportsByHost; 00200 00202 void runEventThread(int threadNum, int numThreads); 00203 00205 bool handleEpollEvent(epoll_event & event); 00206 }; 00207 00208 } // namespace Datacratic 00209 00210 #endif /* __rtb__endpoint_h__ */