RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* carbon_connector.cc 00002 Jeremy Barnes, 3 August 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 */ 00006 00007 #include "soa/service/carbon_connector.h" 00008 #include "ace/INET_Addr.h" 00009 #include "jml/arch/exception.h" 00010 #include "jml/arch/format.h" 00011 #include <iostream> 00012 #include "jml/arch/cmp_xchg.h" 00013 #include "jml/arch/atomic_ops.h" 00014 #include "jml/arch/timers.h" 00015 #include "jml/arch/futex.h" 00016 #include "jml/utils/floating_point.h" 00017 #include "jml/utils/smart_ptr_utils.h" 00018 #include "jml/utils/exc_assert.h" 00019 #include <boost/tuple/tuple.hpp> 00020 #include <boost/bind.hpp> 00021 #include <boost/make_shared.hpp> 00022 #include <poll.h> 00023 00024 00025 using namespace std; 00026 using namespace ML; 00027 00028 00029 namespace Datacratic { 00030 00031 00032 /*****************************************************************************/ 00033 /* MULTI AGGREGATOR */ 00034 /*****************************************************************************/ 00035 00036 MultiAggregator:: 00037 MultiAggregator() 00038 : doShutdown(false), doDump(false), dumpInterval(0.0) 00039 { 00040 } 00041 00042 MultiAggregator:: 00043 MultiAggregator(const std::string & path, 00044 const OutputFn & output, 00045 double dumpInterval, 00046 std::function<void ()> onStop) 00047 : doShutdown(false), doDump(false) 00048 { 00049 open(path, output, dumpInterval, onStop); 00050 } 00051 00052 MultiAggregator:: 00053 ~MultiAggregator() 00054 { 00055 shutdown(); 00056 } 00057 00058 void 00059 MultiAggregator:: 00060 open(const std::string & path, 00061 const OutputFn & output, 00062 double dumpInterval, 00063 std::function<void ()> onStop) 00064 { 00065 shutdown(); 00066 00067 doShutdown = doDump = false; 00068 this->dumpInterval = dumpInterval; 00069 this->onStop = onStop; 00070 00071 if (path == "") prefix = ""; 00072 else prefix = path + "."; 00073 00074 if (output) 00075 outputFn = output; 00076 else 00077 outputFn = [&] (const std::vector<StatReading> & values) 00078 { 00079 this->doStat(values); 00080 }; 00081 00082 dumpingThread.reset 00083 (new std::thread(std::bind(&MultiAggregator::runDumpingThread, 00084 this))); 00085 } 00086 00087 void 00088 MultiAggregator:: 00089 stop() 00090 { 00091 shutdown(); 00092 } 00093 00094 void 00095 MultiAggregator:: 00096 doStat(const std::vector<StatReading> & values) const 00097 { 00098 outputFn(values); 00099 } 00100 00101 StatAggregator * createNewCounter() 00102 { 00103 return new CounterAggregator(); 00104 } 00105 00106 StatAggregator * createNewGauge() 00107 { 00108 return new GaugeAggregator(); 00109 } 00110 00111 StatAggregator * createNewOutcome() 00112 { 00113 return new GaugeAggregator(); 00114 } 00115 00116 void 00117 MultiAggregator:: 00118 record(const std::string & stat, 00119 EventType type, 00120 float value) 00121 { 00122 switch (type) { 00123 case ET_LEVEL: recordLevel(stat, value); break; 00124 case ET_ACCUM: recordQuantity(stat, value); break; 00125 case ET_COUNT: recordOccurrence(stat /*, value*/); break; 00126 case ET_OUTCOME: recordOutcome(stat, value); break; 00127 default: 00128 cerr << "warning: unknown stat type" << endl; 00129 } 00130 } 00131 00132 void 00133 MultiAggregator:: 00134 recordLevel(const std::string & stat, 00135 float level) 00136 { 00137 getAggregator(stat, createNewGauge).record(level); 00138 } 00139 00140 void 00141 MultiAggregator:: 00142 recordOccurrence(const std::string & stat) 00143 { 00144 getAggregator(stat, createNewCounter).record(1.0); 00145 } 00146 00147 void 00148 MultiAggregator:: 00149 recordQuantity(const std::string & stat, 00150 float quantity) 00151 { 00152 getAggregator(stat, createNewCounter).record(quantity); 00153 } 00154 00155 void 00156 MultiAggregator:: 00157 recordOutcome(const std::string & stat, 00158 float outcome) 00159 { 00160 getAggregator(stat, createNewOutcome).record(outcome); 00161 } 00162 00163 void 00164 MultiAggregator:: 00165 dump() 00166 { 00167 { 00168 std::lock_guard<std::mutex> lock(m); 00169 doDump = true; 00170 } 00171 00172 cond.notify_all(); 00173 } 00174 00175 void 00176 MultiAggregator:: 00177 dumpSync(std::ostream & stream) const 00178 { 00179 std::unique_lock<Lock> guard(this->lock); 00180 00181 for (auto & s: stats) { 00182 auto vals = s.second->read(s.first); 00183 for (auto v: vals) { 00184 stream << v.name << ":\t" << v.value << endl; 00185 } 00186 } 00187 } 00188 00189 void 00190 MultiAggregator:: 00191 shutdown() 00192 { 00193 if (dumpingThread) { 00194 if (onPreShutdown) onPreShutdown(); 00195 00196 { 00197 std::lock_guard<std::mutex> lock(m); 00198 doShutdown = true; 00199 } 00200 00201 cond.notify_all(); 00202 00203 dumpingThread->join(); 00204 dumpingThread.reset(); 00205 00206 if (onPostShutdown) onPostShutdown(); 00207 00208 if (onStop) onStop(); 00209 } 00210 } 00211 00212 StatAggregator & 00213 MultiAggregator:: 00214 getAggregator(const std::string & stat, 00215 StatAggregator * (*createFn) ()) 00216 { 00217 if (!lookupCache.get()) 00218 lookupCache.reset(new LookupCache()); 00219 00220 auto found = lookupCache->find(stat); 00221 if (found != lookupCache->end()) 00222 return *found->second->second; 00223 00224 // Get the read lock to look for the aggregator 00225 std::unique_lock<Lock> guard(lock); 00226 00227 auto found2 = stats.find(stat); 00228 00229 if (found2 != stats.end()) { 00230 guard.unlock(); 00231 00232 (*lookupCache)[stat] = found2; 00233 00234 return *found2->second; 00235 } 00236 00237 guard.unlock(); 00238 00239 // Get the write lock to add it to the aggregator 00240 std::unique_lock<Lock> guard2(lock); 00241 00242 // Add it in 00243 found2 = stats.insert(make_pair(stat, std::shared_ptr<StatAggregator>(createFn()))).first; 00244 00245 guard2.unlock(); 00246 (*lookupCache)[stat] = found2; 00247 return *found2->second; 00248 } 00249 00250 void 00251 MultiAggregator:: 00252 runDumpingThread() 00253 { 00254 Date nextDump = Date::now().plusSeconds(dumpInterval); 00255 00256 for (;;) { 00257 std::unique_lock<std::mutex> lock(m); 00258 00259 while ((dumpInterval == 0.0 || Date::now() < nextDump) 00260 && !doShutdown && !doDump) 00261 cond.wait_until(lock, nextDump.toStd()); 00262 00263 if (doShutdown) 00264 break; 00265 00266 ExcAssert((dumpInterval != 0.0 && Date::now() >= nextDump) || doDump); 00267 00268 doDump = false; 00269 00270 // Get the read lock to extract a list of stats to dump 00271 vector<Stats::iterator> toDump; 00272 00273 { 00274 std::unique_lock<Lock> guard(this->lock); 00275 toDump.reserve(stats.size()); 00276 for (auto it = stats.begin(), end = stats.end(); 00277 it != end; ++it) 00278 toDump.push_back(it); 00279 //std::copy(stats.begin(), stats.end(), back_inserter(toDump)); 00280 } 00281 00282 std::vector<std::string> toWrite; 00283 00284 // Now dump them without the lock held 00285 for (auto it = toDump.begin(), end = toDump.end(); 00286 it != end; ++it) { 00287 00288 try { 00289 //cerr << "doStat(" << (*it)->first << ")" << endl; 00290 doStat((*it)->second->read((*it)->first)); 00291 } catch (const std::exception & exc) { 00292 cerr << "error writing stat: " << exc.what() << endl; 00293 } 00294 } 00295 00296 while (nextDump < Date::now() && dumpInterval != 0.0) 00297 nextDump.addSeconds(dumpInterval); 00298 } 00299 } 00300 00301 00302 /*****************************************************************************/ 00303 /* CARBON CONNECTOR */ 00304 /*****************************************************************************/ 00305 00306 CarbonConnector:: 00307 CarbonConnector() 00308 { 00309 } 00310 00311 CarbonConnector:: 00312 CarbonConnector(const std::string & carbonAddr, 00313 const std::string & path, 00314 std::function<void ()> onStop) 00315 { 00316 open(carbonAddr, path, onStop); 00317 } 00318 00319 CarbonConnector:: 00320 CarbonConnector(const std::vector<std::string> & carbonAddrs, 00321 const std::string & path, 00322 std::function<void ()> onStop) 00323 { 00324 open(carbonAddrs, path, onStop); 00325 } 00326 00327 CarbonConnector:: 00328 ~CarbonConnector() 00329 { 00330 doShutdown(); 00331 } 00332 00333 void 00334 CarbonConnector:: 00335 open(const std::string & carbonAddr, 00336 const std::string & path, 00337 std::function<void ()> onStop) 00338 { 00339 return open(vector<string>({carbonAddr}), path, onStop); 00340 } 00341 00342 void 00343 CarbonConnector:: 00344 open(const std::vector<std::string> & carbonAddrs, 00345 const std::string & path, 00346 std::function<void ()> onStop) 00347 { 00348 stop(); 00349 00350 int numConnections = 0; 00351 00352 connections.clear(); 00353 for (unsigned i = 0; i < carbonAddrs.size(); ++i) { 00354 connections.push_back(std::make_shared<Connection> 00355 (carbonAddrs[i])); 00356 string error = connections.back()->connect(); 00357 if (connections.back()->fd == -1) { 00358 cerr << "error connecting to Carbon at " << carbonAddrs[i] 00359 << ": " << error << endl; 00360 } 00361 else ++numConnections; 00362 } 00363 00364 if (numConnections == 0) 00365 throw ML::Exception("unable to connect to any Carbon instances"); 00366 00367 this->onPostShutdown = std::bind(&CarbonConnector::doShutdown, this); 00368 00369 MultiAggregator::open(path, OutputFn(), 1.0, onStop); 00370 } 00371 00372 void 00373 CarbonConnector:: 00374 doShutdown() 00375 { 00376 stop(); 00377 connections.clear(); 00378 } 00379 00380 void 00381 CarbonConnector:: 00382 doStat(const std::vector<StatReading> & values) const 00383 { 00384 if (connections.empty()) 00385 return; 00386 00387 std::string message; 00388 00389 for (unsigned i = 0; i < values.size(); ++i) { 00390 message += ML::format("%s%s %.5f %lld\n", 00391 prefix.c_str(), values[i].name.c_str(), 00392 values[i].value, 00393 (unsigned long long) 00394 values[i].timestamp.secondsSinceEpoch()); 00395 } 00396 00397 for (unsigned i = 0; i < connections.size(); ++i) 00398 connections[i]->send(message); 00399 } 00400 00401 CarbonConnector::Connection:: 00402 ~Connection() 00403 { 00404 close(); 00405 00406 if (reconnectionThread) { 00407 shutdown = 1; 00408 futex_wake(shutdown); 00409 reconnectionThread->join(); 00410 reconnectionThread.reset(); 00411 } 00412 } 00413 00414 void 00415 CarbonConnector::Connection:: 00416 close() 00417 { 00418 if (fd != -1) 00419 ::close(fd); 00420 fd = -1; 00421 } 00422 00423 std::string 00424 CarbonConnector::Connection:: 00425 connect() 00426 { 00427 if (fd != -1) 00428 throw ML::Exception("error connecting"); 00429 00430 ip = ACE_INET_Addr(addr.c_str()); 00431 00432 cerr << "connecting to Carbon at " 00433 << ip.get_host_addr() << ":" << ip.get_port_number() 00434 << " (" << ip.get_host_name() << ")" << endl; 00435 00436 int tmpFd = socket(AF_INET, SOCK_STREAM, 0); 00437 int res = ::connect(tmpFd, 00438 (sockaddr *)ip.get_addr(), 00439 ip.get_addr_size()); 00440 00441 int saved_errno = errno; 00442 00443 if (res == -1) { 00444 ::close(tmpFd); 00445 return ML::format("connect to carbon at %s:%d (%s): %s", 00446 ip.get_host_addr(), 00447 ip.get_port_number(), 00448 ip.get_host_name(), 00449 strerror(saved_errno)); 00450 } 00451 00452 fd = tmpFd; 00453 00454 return ""; 00455 } 00456 00457 void 00458 CarbonConnector::Connection:: 00459 send(const std::string & message) 00460 { 00461 //cerr << "STAT: " << message << endl; 00462 //return; 00463 if (message.empty()) 00464 return; 00465 00466 //cerr << "sending to " << addr << " on " << fd << " " << message << endl; 00467 00468 if (fd == -1) { 00469 if (reconnectionThreadActive) return; 00470 throw ML::Exception("send with fd -1 and no thread active"); 00471 } 00472 00473 size_t done = 0; 00474 00475 for (;;) { 00476 int sendRes = ::send(fd, message.c_str() + done, 00477 message.size() - done, 00478 MSG_DONTWAIT | MSG_NOSIGNAL); 00479 00480 if (sendRes > 0) { 00481 done += sendRes; 00482 if (done == message.size()) 00483 return; // done; normal case 00484 else if (done > message.size()) 00485 throw ML::Exception("logic error sending message to Carbon"); 00486 else continue; // do the rest of the message 00487 } 00488 00489 if (sendRes != -1) 00490 throw ML::Exception("invalid return code from send"); 00491 00492 // Error handling 00493 if (errno == EINTR) 00494 continue; // retry 00495 else if (errno == EAGAIN || errno == EWOULDBLOCK) { 00496 // Would block (something that we don't want). Select on the 00497 // socket for the timeout before giving up. 00498 struct pollfd events[] = { 00499 { fd, POLLOUT | POLLERR | POLLHUP | POLLNVAL, 0 } 00500 }; 00501 00502 int res = poll(events, 1, 500 /* 500ms max timeout */); 00503 if (res == 1 && events[0].revents == POLLOUT) { 00504 // Ready to send 00505 continue; // we can now send it 00506 } 00507 else if (res == -1) { 00508 // error in epoll call 00509 int saved_errno = errno; 00510 cerr << "error on epoll with CarbonConnector " << addr 00511 << ": " << strerror(saved_errno) << endl; 00512 } 00513 else if (res == 0) { 00514 // nothing ready... must be a timeout 00515 cerr << "timeout sending to CarbonConnector at " << addr 00516 << endl; 00517 } 00518 else if (res == 1 && events[0].revents & ~POLLOUT) { 00519 // Disconnection or error... need to reconnect 00520 cerr << "disconnection sending to CarbonConnector at " << addr 00521 << endl; 00522 } 00523 else { 00524 // Logic error; we should never get here 00525 throw ML::Exception("logic error in carbon connector"); 00526 } 00527 } else { 00528 // Error in sending 00529 int saved_errno = errno; 00530 cerr << "error sending to CarbonConnector " << addr 00531 << ": " << strerror(saved_errno) << endl; 00532 } 00533 break; 00534 } 00535 00536 reconnect(); 00537 } 00538 00539 void 00540 CarbonConnector::Connection:: 00541 reconnect() 00542 { 00543 close(); 00544 00545 cerr << "reconnecting to " << addr << endl; 00546 00547 if (reconnectionThreadJoinable && reconnectionThread) { 00548 reconnectionThread->join(); 00549 reconnectionThread.reset(); 00550 } 00551 00552 reconnectionThreadActive = false; 00553 reconnectionThreadJoinable = false; 00554 00555 reconnectionThread 00556 = std::make_shared<std::thread> 00557 (std::bind(&Connection::runReconnectionThread, this)); 00558 00559 } 00560 00561 void 00562 CarbonConnector::Connection:: 00563 runReconnectionThread() 00564 { 00565 cerr << "started reconnection thread" << endl; 00566 00567 reconnectionThreadActive = true; 00568 00569 // Close the current connection 00570 if (fd != -1) 00571 close(); 00572 00573 double meanWaitTime = 0.5; // half a second 00574 00575 while (!shutdown) { 00576 string error = connect(); 00577 if (fd != -1) break; 00578 00579 cerr << "error reconnecting to " << addr << ": " << error 00580 << endl; 00581 00582 double r = (random() % 10001) / 10000.0; 00583 double waitTime = meanWaitTime * (0.5 + r); 00584 00585 if (meanWaitTime < 8.0) 00586 meanWaitTime *= 2; 00587 00588 // Wait for the given time before we attempt reconnection 00589 // again. 00590 futex_wait(shutdown, 0, waitTime); 00591 } 00592 00593 reconnectionThreadActive = false; 00594 reconnectionThreadJoinable = true; 00595 } 00596 00597 } // namespace Datacratic