RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/active_endpoint.cc
00001 /* active_endpoint.cc
00002    Jeremy Barnes, 29 April 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Active endpoint class.
00006 */
00007 
00008 #include "soa/service//active_endpoint.h"
00009 #include "jml/arch/timers.h"
00010 
00011 using namespace std;
00012 using namespace ML;
00013 
00014 namespace Datacratic {
00015 
00016 
00017 /*****************************************************************************/
00018 /* ACTIVE ENDPOINT                                                           */
00019 /*****************************************************************************/
00020 
00021 void
00022 ActiveEndpoint::
00023 throwExceptionOnConnectionError(const std::string & error)
00024 {
00025     throw Exception("Connection error: " + error);
00026 }
00027 
00028 void
00029 ActiveEndpoint::
00030 doNothingOnConnectionError(const std::string & error)
00031 {
00032 }
00033 
00034 int
00035 ActiveEndpoint::
00036 init(int port, const std::string & hostname,
00037      int connections, int threads, bool synchronous,
00038      bool throwIfAnyConnectionError,
00039      OnConnectionError onEachConnectionError,
00040      double timeout)
00041 {
00042     //cerr << "connecting on " << hostname << ":" << port << endl;
00043 
00044     this->port_ = port;
00045     this->hostname_ = hostname;
00046 
00047     if (hostname == "localhost" || hostname == "")
00048         addr.set(port);
00049     else addr.set(port, hostname.c_str());
00050 
00051     //cerr << "connections " << connections << " threads " << threads << endl;
00052 
00053     spinup(threads, synchronous);
00054 
00055     return createConnections(connections, synchronous,
00056                              throwIfAnyConnectionError,
00057                              onEachConnectionError,
00058                              timeout);
00059 }
00060 
00061 int
00062 ActiveEndpoint::
00063 createConnections(int nconnections, bool synchronous,
00064                   bool throwIfAnyConnectionError,
00065                   OnConnectionError onEachConnectionError,
00066                   double timeout)
00067 {
00068     // Pre-create connections so that we don't have to wait for them
00069     //cerr << "pre-creating " << nconnections << " connections" << endl;
00070 
00071     int finished = 0;
00072     int errors = 0;
00073     string first_error;
00074 
00075     //set<int> inProgress;
00076 
00077     auto onConnection
00078         = [&] (const std::shared_ptr<TransportBase> & transport, int n)
00079         {
00080             Guard guard(this->lock);
00081             //cerr << "transport " << transport << " connected: finished "
00082             //     << finished << " errors " << errors << endl;
00083             if (!inactive.count(transport))
00084                 throw Exception("new connection not known in inactive");
00085             ML::atomic_add(finished, 1);
00086             //inProgress.erase(n);
00087         };
00088 
00089     auto onConnectionError2 = [&] (string error, int n)
00090         {
00091             cerr << "error creating connection " << n << ": " << error << endl;
00092             Guard guard(this->lock);
00093             ML::atomic_add(finished, 1);
00094             ML::atomic_add(errors, 1);
00095             if (onEachConnectionError)
00096                 onEachConnectionError(error);
00097             //inProgress.erase(n);
00098 
00099             if (first_error == "")
00100                 first_error = error;
00101         };
00102     
00103     for (unsigned i = 0;  i < nconnections;  ++i) {
00104         //{
00105         //    Guard guard(this->lock);
00106         //    inProgress.insert(i);
00107         //}
00108         
00109         newConnection(boost::bind<void>(onConnection, _1, i),
00110                       boost::bind<void>(onConnectionError2, _1, i),
00111                       timeout);
00112     }
00113 
00114     while (finished < nconnections) {
00115         ML::sleep(0.1);
00116         //int nevents = handleEvents(0.1);
00117         cerr << "finished " << finished << " of "
00118              << nconnections << " errors " << errors
00119              << " connections " << inactive.size()
00120              << endl;
00121     }
00122     
00123     cerr << inactive.size() << " connections created with "
00124          << errors << " errors" << endl;
00125 
00126     if ((errors != 0 || inactive.size() != nconnections)
00127         && throwIfAnyConnectionError)
00128         throw Exception("error creating connections: " + first_error);
00129     
00130     return inactive.size();
00131 }
00132 
00133 void
00134 ActiveEndpoint::
00135 getConnection(OnNewConnection onNewConnection,
00136               OnConnectionError onConnectionError,
00137               double timeout)
00138 {
00139     Guard guard(lock);
00140 
00141     if (!inactive.empty()) {
00142         /* If there's a spare connection then use it */
00143         std::shared_ptr<TransportBase> result = *inactive.begin();
00144         
00145         if (active.count(result))
00146             throw Exception("doubling up on IDs");
00147         
00148         inactive.erase(inactive.begin());
00149         
00150         if (active.empty()) idle.acquire();  // no longer idle
00151         active.insert(result);
00152         guard.release();
00153 
00154         auto finish = [=] ()
00155             {
00156                 onNewConnection(result);
00157             };
00158 
00159         result->doAsync(finish, "getConnection");
00160 
00161         return;
00162     }
00163 
00164     guard.release();
00165 
00166     auto newOnConnectionAvailable
00167         = [=] (const std::shared_ptr<TransportBase> & transport)
00168         {
00169             Guard guard(lock);
00170 
00171             if (active.count(transport))
00172                 throw Exception("doubling up on IDs 2");
00173             if (!inactive.count(transport))
00174                 throw Exception("inactive doesn't include the connection");
00175 
00176             inactive.erase(transport);
00177             if (active.empty()) idle.acquire();
00178             
00179             active.insert(transport);
00180 
00181             guard.release();
00182             onNewConnection(transport);
00183             return;
00184         };
00185     
00186     newConnection(newOnConnectionAvailable, onConnectionError, timeout);
00187 }
00188 
00189 void
00190 ActiveEndpoint::
00191 shutdown()
00192 {
00193     active.clear();
00194     inactive.clear();
00195     EndpointBase::shutdown();
00196 }
00197 
00198 void
00199 ActiveEndpoint::
00200 notifyNewTransport(const std::shared_ptr<TransportBase> & transport)
00201 {
00202     EndpointBase::notifyNewTransport(transport);
00203 }
00204 
00205 void
00206 ActiveEndpoint::
00207 notifyTransportOpen(const std::shared_ptr<TransportBase> & transport)
00208 {
00209 #if 0
00210     cerr << "notifyTransportOpen " << transport << " "
00211          << transport->status() << endl;
00212 
00213     backtrace();
00214 #endif
00215 
00216     Guard guard(lock);
00217 
00218     if (active.count(transport))
00219         throw ML::Exception("attempt to add new transport twice");
00220     if (inactive.count(transport))
00221         throw ML::Exception("attempt to add new transport %p (%d,%s) twice 2",
00222                             transport.get(),
00223                             transport->getHandle(),
00224                             transport->status().c_str());
00225 
00226     inactive.insert(transport);
00227 }
00228 
00229 void
00230 ActiveEndpoint::
00231 notifyCloseTransport(const std::shared_ptr<TransportBase> & transport)
00232 {
00233     Guard guard(lock);
00234     if (inactive.count(transport))
00235         inactive.erase(transport);
00236     if (active.count(transport))
00237         active.erase(transport);
00238 
00239     EndpointBase::notifyCloseTransport(transport);
00240 
00241     if (active.empty())
00242         idle.release();
00243 }
00244 
00245 void
00246 ActiveEndpoint::
00247 notifyRecycleTransport(const std::shared_ptr<TransportBase> & transport)
00248 {
00249     Guard guard(lock);
00250 
00251     if (!active.count(transport))
00252         throw ML::Exception("recycled transport was not active");
00253     if (inactive.count(transport))
00254         throw ML::Exception("recycled transport already inactive");
00255     if (transport->hasSlave())
00256         throw ML::Exception("recycled transport has a slave");
00257 
00258     active.erase(transport);
00259     inactive.insert(transport);
00260 
00261     if (active.empty())
00262         idle.release();
00263 }
00264 
00265 void
00266 ActiveEndpoint::
00267 dumpState() const
00268 {
00269     Guard guard(lock);
00270     cerr << endl << endl;
00271     cerr << "----------------------------------------------" << endl;
00272     cerr << "Active Endpoint of type " << type_name(*this)
00273          << " with " << active.size() << " active connections and "
00274          << inactive.size() << " inactive connections" << endl;
00275 
00276     int i = 0;
00277     for (auto it = active.begin(), end = active.end();  it != end && i < 10;  ++it,++i) {
00278         cerr << "  active " << i << (*it)->status() << endl;
00279         (*it)->activities.dump();
00280         cerr << endl;
00281     }
00282 }
00283 
00284 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator