RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/active_endpoint.h
00001 /* active_endpoint.h                                               -*- C++ -*-
00002    Jeremy Barnes, 29 April 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004    
00005    Active endpoint class.
00006 */
00007 
00008 #pragma once
00009 
00010 #include "soa/service//endpoint.h"
00011 #include "connection_handler.h"
00012 #include "ace/SOCK_Connector.h"
00013 
00014 namespace Datacratic {
00015 
00016 /*****************************************************************************/
00017 /* CONNECTOR                                                                 */
00018 /*****************************************************************************/
00019 
00020 struct Connector {
00021     virtual ~Connector()
00022     {
00023     }
00024 
00025     virtual std::shared_ptr<TransportBase>
00026     makeNewTransport(EndpointBase * owner) = 0;
00027 
00028     virtual int doConnect(const std::shared_ptr<TransportBase> & transport,
00029                           const ACE_INET_Addr & addr,
00030                           double timeout,
00031                           bool block) = 0;
00032     virtual void closePeer() = 0;
00033     
00034 };
00035 
00036 
00037 /*****************************************************************************/
00038 /* ACTIVE ENDPOINT                                                           */
00039 /*****************************************************************************/
00040 
00044 struct ActiveEndpoint: public EndpointBase {
00045 
00046     ActiveEndpoint(const std::string & name)
00047         : EndpointBase(name)
00048     {
00049         modifyIdle = false;
00050     }
00051 
00052     ~ActiveEndpoint()
00053     {
00054         shutdown();
00055     }
00056 
00060     static void throwExceptionOnConnectionError(const std::string & error);
00061 
00063     static void doNothingOnConnectionError(const std::string & error);
00064 
00065     typedef boost::function<void (const std::shared_ptr<TransportBase> &)>
00066         OnNewConnection;
00067     typedef boost::function<void (std::string)> OnConnectionError;
00068 
00075     int init(int port, const std::string & hostname,
00076              int connections = 0,
00077              int threads = 1, bool synchronous = true,
00078              bool throwIfAnyConnectionError = true,
00079              OnConnectionError onEachConnectionError
00080                  = doNothingOnConnectionError,
00081              double timeout = 1.0);
00082 
00086     int createConnections(int num_connections, bool synchronous,
00087                           bool throwIfAnyConnectionError,
00088                           OnConnectionError onEachConnectionError,
00089                           double timeout = 1.0);
00090 
00092     virtual void newConnection(OnNewConnection onNewConnection,
00093                                OnConnectionError onConnectionError,
00094                                double timeout = 1.0)
00095     {
00096         std::shared_ptr<TransportBase> transport = makeNewTransport();
00097         transport->associate
00098             (ML::make_std_sp(new ConnectManager(onNewConnection,
00099                                             onConnectionError,
00100                                             this)));
00101 
00102         doConnect(transport, addr, timeout, false /* block */);
00103     }
00104 
00108     virtual void getConnection(OnNewConnection onNewConnection,
00109                                OnConnectionError onConnectionError,
00110                                double timeout);
00111 
00113     virtual int doConnect(const std::shared_ptr<TransportBase> & transport,
00114                            const ACE_INET_Addr & addr,
00115                            double timeout,
00116                            bool block)
00117     {
00118         return connector->doConnect(transport, addr, timeout, block);
00119     }
00120 
00121     virtual std::shared_ptr<TransportBase> makeNewTransport()
00122     {
00123         return connector->makeNewTransport(this);
00124     }
00125 
00126     virtual void closePeer()
00127     {
00128         return connector->closePeer();
00129     }
00130 
00132     virtual std::string hostname() const
00133     {
00134         return hostname_;
00135     }
00136 
00138     virtual int port() const
00139     {
00140         return port_;
00141     }
00142 
00143     int numActiveConnections() const
00144     {
00145         Guard guard(lock);
00146         return active.size();
00147     }
00148     
00149     int numInactiveConnections() const
00150     {
00151         Guard guard(lock);
00152         return inactive.size();
00153     }
00154 
00156     virtual void dumpState() const;
00157 
00158     void shutdown();
00159 
00160 protected:
00161     Connections active, inactive;
00162     int port_;
00163     std::string hostname_;
00164     ACE_INET_Addr addr;
00165 
00166     std::shared_ptr<Connector> connector;
00167 
00169     virtual void
00170     notifyNewTransport(const std::shared_ptr<TransportBase> & transport);
00171 
00173     virtual void
00174     notifyTransportOpen(const std::shared_ptr<TransportBase> & transport);
00175 
00179     virtual void
00180     notifyCloseTransport(const std::shared_ptr<TransportBase> & transport);
00181 
00185     virtual void
00186     notifyRecycleTransport(const std::shared_ptr<TransportBase> & transport);
00187 
00188     struct ConnectManager : public ConnectionHandler {
00189 
00190         ConnectManager(OnNewConnection onNewConnection,
00191                        OnConnectionError onConnectionError,
00192                        ActiveEndpoint * owner)
00193             : onNewConnection(onNewConnection),
00194               onConnectionError(onConnectionError),
00195               success(false), doneError(false),
00196               owner(owner)
00197         {
00198         }
00199 
00200         ActiveEndpoint::OnNewConnection onNewConnection;
00201         ActiveEndpoint::OnConnectionError onConnectionError;
00202         bool success;
00203         bool doneError;
00204         ActiveEndpoint * owner;
00205 
00206         virtual void doError(const std::string & error)
00207         {
00208             onConnectionError(error);
00209             success = false;
00210             doneError = true;
00211             //transport().asyncClose();
00212         }
00213 
00214         virtual void handleError(const std::string & error)
00215         {
00216             onConnectionError(error);
00217             success = false;
00218             doneError = true;
00219             closeWhenHandlerFinished();
00220         }
00221         
00222         virtual void onCleanup()
00223         {
00224         }
00225 
00226         virtual void handleOutput()
00227         {
00228             if (doneError || success)
00229                 throw ML::Exception("too many events for connector");
00230             using namespace std;
00231             //cerr << "connect on " << fd << " finished" << endl;
00232 
00233             transport().cancelTimer();
00234             stopWriting();
00235 
00236             // Connection finished or has an error; check which one
00237             int error = 0;
00238             socklen_t error_len = sizeof(int);
00239             int res = getsockopt(getHandle(), SOL_SOCKET, SO_ERROR,
00240                                  &error, &error_len);
00241             if (res == -1 || error_len != sizeof(int))
00242                 std::cerr << "error getting connect message: "
00243                           << strerror(errno)
00244                           << std::endl;
00245             
00246             if (error != 0)
00247                 throw ML::Exception("connect success but error");
00248 
00249             transport().hasConnection();
00250 
00251             Guard guard(owner->lock);
00252             
00253             // We how have a connection
00254             
00255             owner->notifyTransportOpen(transport().shared_from_this());
00256             
00257             //cerr << "doing onNewConnection" << endl;
00258             
00259             onNewConnection(transport().shared_from_this());
00260             
00261             //cerr << "finished onNewConnection" << endl;            
00262         }
00263     
00264         virtual void handleTimeout(Date date, size_t)
00265         {
00266             onConnectionError("connect: connection timed out");
00267             closeWhenHandlerFinished();
00268         }
00269     };
00270 };
00271 
00272 
00273 /*****************************************************************************/
00274 /* CONNECTOR TEMPLATE                                                        */
00275 /*****************************************************************************/
00276 
00277 template<typename Transport>
00278 struct ConnectorT : public Connector {
00279     ACE_SOCK_Connector connector;
00280 
00281     ConnectorT()
00282     {
00283     }
00284 
00285     virtual std::shared_ptr<TransportBase>
00286     makeNewTransport(EndpointBase * owner)
00287     {
00288         return ML::make_std_sp(new Transport(owner));
00289     }
00290 
00291     virtual int doConnect(const std::shared_ptr<TransportBase> & transport,
00292                           const ACE_INET_Addr & addr,
00293                           double timeout,
00294                           bool block)
00295     {
00296         using namespace std;
00297         //cerr << "doConnect: block = " << block << endl;
00298 
00299         Transport * t = static_cast<Transport *>(transport.get());
00300 
00301         ACE_Time_Value to(0, 0);
00302         if (block)
00303             to = ACE_Time_Value((int)timeout,
00304                                 (timeout - (int)timeout) * 1000000);
00305         int res;
00306         do {
00307             res = connector.connect(t->peer(), addr, &to);
00308 #if 0
00309             using namespace std;
00310             cerr << "connect block=" << block << " on fd "
00311                  << t->getHandle() << " returned "
00312                  << res << " errno "
00313                  << strerror(errno) << endl;
00314 #endif
00315         } while (res == -1 && errno == EINTR);
00316         
00317         if (res == -1 && (block
00318                           || (errno != EINPROGRESS && errno != EAGAIN))) {
00319             using namespace std;
00320             cerr << "connect returned " << res << " with errno "
00321                  << strerror(errno) << endl;
00322             abort();
00323             t->doError("connect: " + std::string(strerror(errno)));
00324             //t->close();
00325             return -1;
00326         }
00327 
00328         if (res == -1) {
00329             // Asynchronous setup
00330 
00331             //cerr << "transport->getHandle() = " << transport->getHandle()
00332             //     << endl;
00333 
00334             transport->get_endpoint()->notifyNewTransport(transport);
00335 
00336             auto finishSetup = [=] ()
00337                 {
00338                     // Set up a timeout
00339                     t->scheduleTimerRelative(timeout);
00340                     
00341                     // Ready to write
00342                     t->startWriting();
00343                 };
00344             
00345             // Call the rest in a handler context
00346             transport->doAsync(finishSetup, "connect");
00347         }
00348         else {
00349             t->handleOutput();
00350             return 0;
00351         }
00352 
00353         return res;
00354     }
00355 
00356     virtual void closePeer()
00357     {
00358     }
00359 };
00360 
00361 
00362 /*****************************************************************************/
00363 /* ACTIVE ENDPOINT TEMPLATE                                                  */
00364 /*****************************************************************************/
00365 
00366 template<typename Transport>
00367 struct ActiveEndpointT : public ActiveEndpoint {
00368     ActiveEndpointT(const std::string & name)
00369         : ActiveEndpoint(name)
00370     {
00371         connector.reset(new ConnectorT<Transport>());
00372     }
00373 };
00374 
00375 
00376 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator