![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* passive_endpoint.h -*- C++ -*- 00002 Jeremy Barnes, 29 April 2011 00003 Copyright (C) 2011 Datacratic. All rights reserved. 00004 00005 Base class for a passive endpoint. 00006 */ 00007 00008 #pragma once 00009 00010 #include "soa/service/endpoint.h" 00011 #include "soa/service/port_range_service.h" 00012 #include "jml/arch/wakeup_fd.h" 00013 00014 namespace Datacratic { 00015 00016 enum { 00017 DEF_BACKLOG = 128 00018 }; 00019 00020 /*****************************************************************************/ 00021 /* ACCEPTOR */ 00022 /*****************************************************************************/ 00023 00024 struct Acceptor { 00025 virtual ~Acceptor() 00026 { 00027 } 00028 00029 virtual int 00030 listen(PortRange const & portRange, const std::string & hostname, 00031 PassiveEndpoint * endpoint, bool nameLookup, int backlog) = 0; 00032 00033 virtual void closePeer() = 0; 00034 00036 virtual std::string hostname() const = 0; 00037 00039 virtual int port() const = 0; 00040 }; 00041 00042 00043 /*****************************************************************************/ 00044 /* PASSIVE ENDPOINT */ 00045 /*****************************************************************************/ 00046 00051 struct PassiveEndpoint: public EndpointBase { 00052 00053 PassiveEndpoint(const std::string & name); 00054 00055 virtual ~PassiveEndpoint(); 00056 00073 int init(PortRange const & portRange = PortRange(), const std::string & hostname = "localhost", 00074 int threads = 1, bool synchronous = true, bool nameLookup=true, 00075 int backlog = DEF_BACKLOG); 00076 00080 virtual int listen(PortRange const & portRange, const std::string & host,bool nameLookup=true, 00081 int backlog = DEF_BACKLOG) 00082 { 00083 if (!acceptor) 00084 throw ML::Exception("can't listen without acceptor"); 00085 00086 return acceptor->listen(portRange, host, this, nameLookup, backlog); 00087 } 00088 00089 virtual void closePeer() 00090 { 00091 return acceptor->closePeer(); 00092 } 00093 00095 virtual std::string hostname() const 00096 { 00097 return acceptor->hostname(); 00098 } 00099 00101 virtual int port() const 00102 { 00103 return acceptor->port(); 00104 } 00105 00109 boost::function<std::shared_ptr<ConnectionHandler> ()> onMakeNewHandler; 00110 00112 boost::function<void (std::string)> onAcceptError; 00113 00114 protected: 00115 00117 std::shared_ptr<Acceptor> acceptor; 00118 00119 virtual void 00120 associateHandler(const std::shared_ptr<TransportBase> & transport) 00121 { 00122 if (!transport) 00123 throw ML::Exception("no transport"); 00124 transport->hasConnection(); 00125 notifyNewTransport(transport); 00126 00127 auto finishAccept = [=] () 00128 { 00129 std::shared_ptr<ConnectionHandler> handler 00130 = this->makeNewHandler(); 00131 transport->associate(handler); 00132 }; 00133 00134 transport->doAsync(finishAccept, "finishAccept"); 00135 } 00136 00137 virtual std::shared_ptr<ConnectionHandler> 00138 makeNewHandler() 00139 { 00140 return onMakeNewHandler(); 00141 } 00142 00143 virtual void acceptError(const std::string & error) 00144 { 00145 if (onAcceptError) onAcceptError(error); 00146 else { 00147 using namespace std; 00148 cerr << "error accepting connection: " << error << endl; 00149 } 00150 } 00151 00152 template<typename Transport> friend struct AcceptorT; 00153 // whether or not to perform a host name look up 00154 bool nameLookup_;// whether or not to perform a host name look up 00155 }; 00156 00157 00158 /*****************************************************************************/ 00159 /* ACCEPTOR TEMPLATE */ 00160 /*****************************************************************************/ 00161 00162 template<typename Transport> 00163 struct AcceptorT: public Acceptor { 00164 }; 00165 00166 /*****************************************************************************/ 00167 /* ACCEPTOR TEMPLATE FOR SOCKET TRANSPORT */ 00168 /*****************************************************************************/ 00169 00170 template<> 00171 struct AcceptorT<SocketTransport> : public Acceptor { 00172 00173 AcceptorT(); 00174 virtual ~AcceptorT(); 00175 00177 virtual int listen(PortRange const & portRange, 00178 const std::string & hostname, 00179 PassiveEndpoint * endpoint, 00180 bool nameLookup, 00181 int backlog); 00182 00184 virtual void closePeer(); 00185 00187 virtual std::string hostname() const; 00188 00190 virtual int port() const; 00191 00195 void runAcceptThread(); 00196 00197 00198 protected: 00199 std::shared_ptr<boost::thread> acceptThread; 00200 ML::Wakeup_Fd wakeup; 00201 ACE_INET_Addr addr; 00202 int fd; 00203 PassiveEndpoint * endpoint; 00204 bool nameLookup; 00205 bool shutdown; 00206 }; 00207 00208 00209 /*****************************************************************************/ 00210 /* PASSIVE ENDPOINT TEMPLATE */ 00211 /*****************************************************************************/ 00212 00213 template<typename Transport> 00214 struct PassiveEndpointT: public PassiveEndpoint { 00215 00216 PassiveEndpointT(const std::string & name) 00217 : PassiveEndpoint(name) 00218 { 00219 acceptor.reset(new AcceptorT<Transport>()); 00220 } 00221 00222 virtual ~PassiveEndpointT() 00223 { 00224 } 00225 }; 00226 00227 } // namespace Datacratic
1.7.6.1