RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/passive_endpoint.h
00001 /* passive_endpoint.h                                              -*- C++ -*-
00002    Jeremy Barnes, 29 April 2011
00003    Copyright (C) 2011 Datacratic.  All rights reserved.
00004 
00005    Base class for a passive endpoint.
00006 */
00007 
00008 #pragma once
00009 
00010 #include "soa/service/endpoint.h"
00011 #include "soa/service/port_range_service.h"
00012 #include "jml/arch/wakeup_fd.h"
00013 
00014 namespace Datacratic {
00015 
00016 enum {
00017     DEF_BACKLOG = 128
00018 };
00019 
00020 /*****************************************************************************/
00021 /* ACCEPTOR                                                                  */
00022 /*****************************************************************************/
00023 
00024 struct Acceptor {
00025     virtual ~Acceptor()
00026     {
00027     }
00028 
00029     virtual int
00030     listen(PortRange const & portRange, const std::string & hostname,
00031            PassiveEndpoint * endpoint, bool nameLookup, int backlog) = 0;
00032 
00033     virtual void closePeer() = 0;
00034 
00036     virtual std::string hostname() const = 0;
00037 
00039     virtual int port() const = 0;
00040 };
00041 
00042 
00043 /*****************************************************************************/
00044 /* PASSIVE ENDPOINT                                                          */
00045 /*****************************************************************************/
00046 
00051 struct PassiveEndpoint: public EndpointBase {
00052 
00053     PassiveEndpoint(const std::string & name);
00054 
00055     virtual ~PassiveEndpoint();
00056 
00073     int init(PortRange const & portRange = PortRange(), const std::string & hostname = "localhost",
00074              int threads = 1, bool synchronous = true, bool nameLookup=true,
00075              int backlog = DEF_BACKLOG);
00076 
00080     virtual int listen(PortRange const & portRange, const std::string & host,bool nameLookup=true,
00081                        int backlog = DEF_BACKLOG)
00082     {
00083         if (!acceptor)
00084             throw ML::Exception("can't listen without acceptor");
00085 
00086         return acceptor->listen(portRange, host, this, nameLookup, backlog);
00087     }
00088 
00089     virtual void closePeer()
00090     {
00091         return acceptor->closePeer();
00092     }
00093 
00095     virtual std::string hostname() const
00096     {
00097         return acceptor->hostname();
00098     }
00099 
00101     virtual int port() const
00102     {
00103         return acceptor->port();
00104     }
00105 
00109     boost::function<std::shared_ptr<ConnectionHandler> ()> onMakeNewHandler;
00110 
00112     boost::function<void (std::string)> onAcceptError;
00113     
00114 protected:
00115 
00117     std::shared_ptr<Acceptor> acceptor;
00118 
00119     virtual void
00120     associateHandler(const std::shared_ptr<TransportBase> & transport)
00121     {
00122         if (!transport)
00123             throw ML::Exception("no transport");
00124         transport->hasConnection();
00125         notifyNewTransport(transport);
00126 
00127         auto finishAccept = [=] ()
00128             {
00129                 std::shared_ptr<ConnectionHandler> handler
00130                     = this->makeNewHandler();
00131                 transport->associate(handler);
00132             };
00133 
00134         transport->doAsync(finishAccept, "finishAccept");
00135     }
00136 
00137     virtual std::shared_ptr<ConnectionHandler>
00138     makeNewHandler()
00139     {
00140         return onMakeNewHandler();
00141     }
00142 
00143     virtual void acceptError(const std::string & error)
00144     {
00145         if (onAcceptError) onAcceptError(error);
00146         else {
00147             using namespace std;
00148             cerr << "error accepting connection: " << error << endl;
00149         }
00150     }
00151 
00152     template<typename Transport> friend struct AcceptorT;
00153     // whether or not to perform a host name look up
00154     bool nameLookup_;// whether or not to perform a host name look up
00155 };
00156 
00157 
00158 /*****************************************************************************/
00159 /* ACCEPTOR TEMPLATE                                                         */
00160 /*****************************************************************************/
00161 
00162 template<typename Transport>
00163 struct AcceptorT: public Acceptor {
00164 };
00165 
00166 /*****************************************************************************/
00167 /* ACCEPTOR TEMPLATE FOR SOCKET TRANSPORT                                    */
00168 /*****************************************************************************/
00169 
00170 template<>
00171 struct AcceptorT<SocketTransport> : public Acceptor {
00172 
00173     AcceptorT();
00174     virtual ~AcceptorT();
00175 
00177     virtual int listen(PortRange const & portRange,
00178                        const std::string & hostname,
00179                        PassiveEndpoint * endpoint,
00180                        bool nameLookup,
00181                        int backlog);
00182 
00184     virtual void closePeer();
00185 
00187     virtual std::string hostname() const;
00188 
00190     virtual int port() const;
00191 
00195     void runAcceptThread();
00196 
00197 
00198 protected:
00199     std::shared_ptr<boost::thread> acceptThread;
00200     ML::Wakeup_Fd wakeup;
00201     ACE_INET_Addr addr;
00202     int fd;
00203     PassiveEndpoint * endpoint;
00204     bool nameLookup;
00205     bool shutdown;
00206 };
00207 
00208 
00209 /*****************************************************************************/
00210 /* PASSIVE ENDPOINT TEMPLATE                                                 */
00211 /*****************************************************************************/
00212 
00213 template<typename Transport>
00214 struct PassiveEndpointT: public PassiveEndpoint {
00215 
00216     PassiveEndpointT(const std::string & name)
00217         : PassiveEndpoint(name)
00218     {
00219         acceptor.reset(new AcceptorT<Transport>());
00220     }
00221     
00222     virtual ~PassiveEndpointT()
00223     {
00224     }
00225 };
00226 
00227 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator