RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* active_endpoint.cc 00002 Jeremy Barnes, 29 April 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Active endpoint class. 00006 */ 00007 00008 #include "soa/service//active_endpoint.h" 00009 #include "jml/arch/timers.h" 00010 00011 using namespace std; 00012 using namespace ML; 00013 00014 namespace Datacratic { 00015 00016 00017 /*****************************************************************************/ 00018 /* ACTIVE ENDPOINT */ 00019 /*****************************************************************************/ 00020 00021 void 00022 ActiveEndpoint:: 00023 throwExceptionOnConnectionError(const std::string & error) 00024 { 00025 throw Exception("Connection error: " + error); 00026 } 00027 00028 void 00029 ActiveEndpoint:: 00030 doNothingOnConnectionError(const std::string & error) 00031 { 00032 } 00033 00034 int 00035 ActiveEndpoint:: 00036 init(int port, const std::string & hostname, 00037 int connections, int threads, bool synchronous, 00038 bool throwIfAnyConnectionError, 00039 OnConnectionError onEachConnectionError, 00040 double timeout) 00041 { 00042 //cerr << "connecting on " << hostname << ":" << port << endl; 00043 00044 this->port_ = port; 00045 this->hostname_ = hostname; 00046 00047 if (hostname == "localhost" || hostname == "") 00048 addr.set(port); 00049 else addr.set(port, hostname.c_str()); 00050 00051 //cerr << "connections " << connections << " threads " << threads << endl; 00052 00053 spinup(threads, synchronous); 00054 00055 return createConnections(connections, synchronous, 00056 throwIfAnyConnectionError, 00057 onEachConnectionError, 00058 timeout); 00059 } 00060 00061 int 00062 ActiveEndpoint:: 00063 createConnections(int nconnections, bool synchronous, 00064 bool throwIfAnyConnectionError, 00065 OnConnectionError onEachConnectionError, 00066 double timeout) 00067 { 00068 // Pre-create connections so that we don't have to wait for them 00069 //cerr << "pre-creating " << nconnections << " connections" << endl; 00070 00071 int finished = 0; 00072 int errors = 0; 00073 string first_error; 00074 00075 //set<int> inProgress; 00076 00077 auto onConnection 00078 = [&] (const std::shared_ptr<TransportBase> & transport, int n) 00079 { 00080 Guard guard(this->lock); 00081 //cerr << "transport " << transport << " connected: finished " 00082 // << finished << " errors " << errors << endl; 00083 if (!inactive.count(transport)) 00084 throw Exception("new connection not known in inactive"); 00085 ML::atomic_add(finished, 1); 00086 //inProgress.erase(n); 00087 }; 00088 00089 auto onConnectionError2 = [&] (string error, int n) 00090 { 00091 cerr << "error creating connection " << n << ": " << error << endl; 00092 Guard guard(this->lock); 00093 ML::atomic_add(finished, 1); 00094 ML::atomic_add(errors, 1); 00095 if (onEachConnectionError) 00096 onEachConnectionError(error); 00097 //inProgress.erase(n); 00098 00099 if (first_error == "") 00100 first_error = error; 00101 }; 00102 00103 for (unsigned i = 0; i < nconnections; ++i) { 00104 //{ 00105 // Guard guard(this->lock); 00106 // inProgress.insert(i); 00107 //} 00108 00109 newConnection(boost::bind<void>(onConnection, _1, i), 00110 boost::bind<void>(onConnectionError2, _1, i), 00111 timeout); 00112 } 00113 00114 while (finished < nconnections) { 00115 ML::sleep(0.1); 00116 //int nevents = handleEvents(0.1); 00117 cerr << "finished " << finished << " of " 00118 << nconnections << " errors " << errors 00119 << " connections " << inactive.size() 00120 << endl; 00121 } 00122 00123 cerr << inactive.size() << " connections created with " 00124 << errors << " errors" << endl; 00125 00126 if ((errors != 0 || inactive.size() != nconnections) 00127 && throwIfAnyConnectionError) 00128 throw Exception("error creating connections: " + first_error); 00129 00130 return inactive.size(); 00131 } 00132 00133 void 00134 ActiveEndpoint:: 00135 getConnection(OnNewConnection onNewConnection, 00136 OnConnectionError onConnectionError, 00137 double timeout) 00138 { 00139 Guard guard(lock); 00140 00141 if (!inactive.empty()) { 00142 /* If there's a spare connection then use it */ 00143 std::shared_ptr<TransportBase> result = *inactive.begin(); 00144 00145 if (active.count(result)) 00146 throw Exception("doubling up on IDs"); 00147 00148 inactive.erase(inactive.begin()); 00149 00150 if (active.empty()) idle.acquire(); // no longer idle 00151 active.insert(result); 00152 guard.release(); 00153 00154 auto finish = [=] () 00155 { 00156 onNewConnection(result); 00157 }; 00158 00159 result->doAsync(finish, "getConnection"); 00160 00161 return; 00162 } 00163 00164 guard.release(); 00165 00166 auto newOnConnectionAvailable 00167 = [=] (const std::shared_ptr<TransportBase> & transport) 00168 { 00169 Guard guard(lock); 00170 00171 if (active.count(transport)) 00172 throw Exception("doubling up on IDs 2"); 00173 if (!inactive.count(transport)) 00174 throw Exception("inactive doesn't include the connection"); 00175 00176 inactive.erase(transport); 00177 if (active.empty()) idle.acquire(); 00178 00179 active.insert(transport); 00180 00181 guard.release(); 00182 onNewConnection(transport); 00183 return; 00184 }; 00185 00186 newConnection(newOnConnectionAvailable, onConnectionError, timeout); 00187 } 00188 00189 void 00190 ActiveEndpoint:: 00191 shutdown() 00192 { 00193 active.clear(); 00194 inactive.clear(); 00195 EndpointBase::shutdown(); 00196 } 00197 00198 void 00199 ActiveEndpoint:: 00200 notifyNewTransport(const std::shared_ptr<TransportBase> & transport) 00201 { 00202 EndpointBase::notifyNewTransport(transport); 00203 } 00204 00205 void 00206 ActiveEndpoint:: 00207 notifyTransportOpen(const std::shared_ptr<TransportBase> & transport) 00208 { 00209 #if 0 00210 cerr << "notifyTransportOpen " << transport << " " 00211 << transport->status() << endl; 00212 00213 backtrace(); 00214 #endif 00215 00216 Guard guard(lock); 00217 00218 if (active.count(transport)) 00219 throw ML::Exception("attempt to add new transport twice"); 00220 if (inactive.count(transport)) 00221 throw ML::Exception("attempt to add new transport %p (%d,%s) twice 2", 00222 transport.get(), 00223 transport->getHandle(), 00224 transport->status().c_str()); 00225 00226 inactive.insert(transport); 00227 } 00228 00229 void 00230 ActiveEndpoint:: 00231 notifyCloseTransport(const std::shared_ptr<TransportBase> & transport) 00232 { 00233 Guard guard(lock); 00234 if (inactive.count(transport)) 00235 inactive.erase(transport); 00236 if (active.count(transport)) 00237 active.erase(transport); 00238 00239 EndpointBase::notifyCloseTransport(transport); 00240 00241 if (active.empty()) 00242 idle.release(); 00243 } 00244 00245 void 00246 ActiveEndpoint:: 00247 notifyRecycleTransport(const std::shared_ptr<TransportBase> & transport) 00248 { 00249 Guard guard(lock); 00250 00251 if (!active.count(transport)) 00252 throw ML::Exception("recycled transport was not active"); 00253 if (inactive.count(transport)) 00254 throw ML::Exception("recycled transport already inactive"); 00255 if (transport->hasSlave()) 00256 throw ML::Exception("recycled transport has a slave"); 00257 00258 active.erase(transport); 00259 inactive.insert(transport); 00260 00261 if (active.empty()) 00262 idle.release(); 00263 } 00264 00265 void 00266 ActiveEndpoint:: 00267 dumpState() const 00268 { 00269 Guard guard(lock); 00270 cerr << endl << endl; 00271 cerr << "----------------------------------------------" << endl; 00272 cerr << "Active Endpoint of type " << type_name(*this) 00273 << " with " << active.size() << " active connections and " 00274 << inactive.size() << " inactive connections" << endl; 00275 00276 int i = 0; 00277 for (auto it = active.begin(), end = active.end(); it != end && i < 10; ++it,++i) { 00278 cerr << " active " << i << (*it)->status() << endl; 00279 (*it)->activities.dump(); 00280 cerr << endl; 00281 } 00282 } 00283 00284 } // namespace Datacratic