RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* active_endpoint.h -*- C++ -*- 00002 Jeremy Barnes, 29 April 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Active endpoint class. 00006 */ 00007 00008 #pragma once 00009 00010 #include "soa/service//endpoint.h" 00011 #include "connection_handler.h" 00012 #include "ace/SOCK_Connector.h" 00013 00014 namespace Datacratic { 00015 00016 /*****************************************************************************/ 00017 /* CONNECTOR */ 00018 /*****************************************************************************/ 00019 00020 struct Connector { 00021 virtual ~Connector() 00022 { 00023 } 00024 00025 virtual std::shared_ptr<TransportBase> 00026 makeNewTransport(EndpointBase * owner) = 0; 00027 00028 virtual int doConnect(const std::shared_ptr<TransportBase> & transport, 00029 const ACE_INET_Addr & addr, 00030 double timeout, 00031 bool block) = 0; 00032 virtual void closePeer() = 0; 00033 00034 }; 00035 00036 00037 /*****************************************************************************/ 00038 /* ACTIVE ENDPOINT */ 00039 /*****************************************************************************/ 00040 00044 struct ActiveEndpoint: public EndpointBase { 00045 00046 ActiveEndpoint(const std::string & name) 00047 : EndpointBase(name) 00048 { 00049 modifyIdle = false; 00050 } 00051 00052 ~ActiveEndpoint() 00053 { 00054 shutdown(); 00055 } 00056 00060 static void throwExceptionOnConnectionError(const std::string & error); 00061 00063 static void doNothingOnConnectionError(const std::string & error); 00064 00065 typedef boost::function<void (const std::shared_ptr<TransportBase> &)> 00066 OnNewConnection; 00067 typedef boost::function<void (std::string)> OnConnectionError; 00068 00075 int init(int port, const std::string & hostname, 00076 int connections = 0, 00077 int threads = 1, bool synchronous = true, 00078 bool throwIfAnyConnectionError = true, 00079 OnConnectionError onEachConnectionError 00080 = doNothingOnConnectionError, 00081 double timeout = 1.0); 00082 00086 int createConnections(int num_connections, bool synchronous, 00087 bool throwIfAnyConnectionError, 00088 OnConnectionError onEachConnectionError, 00089 double timeout = 1.0); 00090 00092 virtual void newConnection(OnNewConnection onNewConnection, 00093 OnConnectionError onConnectionError, 00094 double timeout = 1.0) 00095 { 00096 std::shared_ptr<TransportBase> transport = makeNewTransport(); 00097 transport->associate 00098 (ML::make_std_sp(new ConnectManager(onNewConnection, 00099 onConnectionError, 00100 this))); 00101 00102 doConnect(transport, addr, timeout, false /* block */); 00103 } 00104 00108 virtual void getConnection(OnNewConnection onNewConnection, 00109 OnConnectionError onConnectionError, 00110 double timeout); 00111 00113 virtual int doConnect(const std::shared_ptr<TransportBase> & transport, 00114 const ACE_INET_Addr & addr, 00115 double timeout, 00116 bool block) 00117 { 00118 return connector->doConnect(transport, addr, timeout, block); 00119 } 00120 00121 virtual std::shared_ptr<TransportBase> makeNewTransport() 00122 { 00123 return connector->makeNewTransport(this); 00124 } 00125 00126 virtual void closePeer() 00127 { 00128 return connector->closePeer(); 00129 } 00130 00132 virtual std::string hostname() const 00133 { 00134 return hostname_; 00135 } 00136 00138 virtual int port() const 00139 { 00140 return port_; 00141 } 00142 00143 int numActiveConnections() const 00144 { 00145 Guard guard(lock); 00146 return active.size(); 00147 } 00148 00149 int numInactiveConnections() const 00150 { 00151 Guard guard(lock); 00152 return inactive.size(); 00153 } 00154 00156 virtual void dumpState() const; 00157 00158 void shutdown(); 00159 00160 protected: 00161 Connections active, inactive; 00162 int port_; 00163 std::string hostname_; 00164 ACE_INET_Addr addr; 00165 00166 std::shared_ptr<Connector> connector; 00167 00169 virtual void 00170 notifyNewTransport(const std::shared_ptr<TransportBase> & transport); 00171 00173 virtual void 00174 notifyTransportOpen(const std::shared_ptr<TransportBase> & transport); 00175 00179 virtual void 00180 notifyCloseTransport(const std::shared_ptr<TransportBase> & transport); 00181 00185 virtual void 00186 notifyRecycleTransport(const std::shared_ptr<TransportBase> & transport); 00187 00188 struct ConnectManager : public ConnectionHandler { 00189 00190 ConnectManager(OnNewConnection onNewConnection, 00191 OnConnectionError onConnectionError, 00192 ActiveEndpoint * owner) 00193 : onNewConnection(onNewConnection), 00194 onConnectionError(onConnectionError), 00195 success(false), doneError(false), 00196 owner(owner) 00197 { 00198 } 00199 00200 ActiveEndpoint::OnNewConnection onNewConnection; 00201 ActiveEndpoint::OnConnectionError onConnectionError; 00202 bool success; 00203 bool doneError; 00204 ActiveEndpoint * owner; 00205 00206 virtual void doError(const std::string & error) 00207 { 00208 onConnectionError(error); 00209 success = false; 00210 doneError = true; 00211 //transport().asyncClose(); 00212 } 00213 00214 virtual void handleError(const std::string & error) 00215 { 00216 onConnectionError(error); 00217 success = false; 00218 doneError = true; 00219 closeWhenHandlerFinished(); 00220 } 00221 00222 virtual void onCleanup() 00223 { 00224 } 00225 00226 virtual void handleOutput() 00227 { 00228 if (doneError || success) 00229 throw ML::Exception("too many events for connector"); 00230 using namespace std; 00231 //cerr << "connect on " << fd << " finished" << endl; 00232 00233 transport().cancelTimer(); 00234 stopWriting(); 00235 00236 // Connection finished or has an error; check which one 00237 int error = 0; 00238 socklen_t error_len = sizeof(int); 00239 int res = getsockopt(getHandle(), SOL_SOCKET, SO_ERROR, 00240 &error, &error_len); 00241 if (res == -1 || error_len != sizeof(int)) 00242 std::cerr << "error getting connect message: " 00243 << strerror(errno) 00244 << std::endl; 00245 00246 if (error != 0) 00247 throw ML::Exception("connect success but error"); 00248 00249 transport().hasConnection(); 00250 00251 Guard guard(owner->lock); 00252 00253 // We how have a connection 00254 00255 owner->notifyTransportOpen(transport().shared_from_this()); 00256 00257 //cerr << "doing onNewConnection" << endl; 00258 00259 onNewConnection(transport().shared_from_this()); 00260 00261 //cerr << "finished onNewConnection" << endl; 00262 } 00263 00264 virtual void handleTimeout(Date date, size_t) 00265 { 00266 onConnectionError("connect: connection timed out"); 00267 closeWhenHandlerFinished(); 00268 } 00269 }; 00270 }; 00271 00272 00273 /*****************************************************************************/ 00274 /* CONNECTOR TEMPLATE */ 00275 /*****************************************************************************/ 00276 00277 template<typename Transport> 00278 struct ConnectorT : public Connector { 00279 ACE_SOCK_Connector connector; 00280 00281 ConnectorT() 00282 { 00283 } 00284 00285 virtual std::shared_ptr<TransportBase> 00286 makeNewTransport(EndpointBase * owner) 00287 { 00288 return ML::make_std_sp(new Transport(owner)); 00289 } 00290 00291 virtual int doConnect(const std::shared_ptr<TransportBase> & transport, 00292 const ACE_INET_Addr & addr, 00293 double timeout, 00294 bool block) 00295 { 00296 using namespace std; 00297 //cerr << "doConnect: block = " << block << endl; 00298 00299 Transport * t = static_cast<Transport *>(transport.get()); 00300 00301 ACE_Time_Value to(0, 0); 00302 if (block) 00303 to = ACE_Time_Value((int)timeout, 00304 (timeout - (int)timeout) * 1000000); 00305 int res; 00306 do { 00307 res = connector.connect(t->peer(), addr, &to); 00308 #if 0 00309 using namespace std; 00310 cerr << "connect block=" << block << " on fd " 00311 << t->getHandle() << " returned " 00312 << res << " errno " 00313 << strerror(errno) << endl; 00314 #endif 00315 } while (res == -1 && errno == EINTR); 00316 00317 if (res == -1 && (block 00318 || (errno != EINPROGRESS && errno != EAGAIN))) { 00319 using namespace std; 00320 cerr << "connect returned " << res << " with errno " 00321 << strerror(errno) << endl; 00322 abort(); 00323 t->doError("connect: " + std::string(strerror(errno))); 00324 //t->close(); 00325 return -1; 00326 } 00327 00328 if (res == -1) { 00329 // Asynchronous setup 00330 00331 //cerr << "transport->getHandle() = " << transport->getHandle() 00332 // << endl; 00333 00334 transport->get_endpoint()->notifyNewTransport(transport); 00335 00336 auto finishSetup = [=] () 00337 { 00338 // Set up a timeout 00339 t->scheduleTimerRelative(timeout); 00340 00341 // Ready to write 00342 t->startWriting(); 00343 }; 00344 00345 // Call the rest in a handler context 00346 transport->doAsync(finishSetup, "connect"); 00347 } 00348 else { 00349 t->handleOutput(); 00350 return 0; 00351 } 00352 00353 return res; 00354 } 00355 00356 virtual void closePeer() 00357 { 00358 } 00359 }; 00360 00361 00362 /*****************************************************************************/ 00363 /* ACTIVE ENDPOINT TEMPLATE */ 00364 /*****************************************************************************/ 00365 00366 template<typename Transport> 00367 struct ActiveEndpointT : public ActiveEndpoint { 00368 ActiveEndpointT(const std::string & name) 00369 : ActiveEndpoint(name) 00370 { 00371 connector.reset(new ConnectorT<Transport>()); 00372 } 00373 }; 00374 00375 00376 } // namespace Datacratic