RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/carbon_connector.cc
00001 /* carbon_connector.cc
00002    Jeremy Barnes, 3 August 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005 */
00006 
00007 #include "soa/service/carbon_connector.h"
00008 #include "ace/INET_Addr.h"
00009 #include "jml/arch/exception.h"
00010 #include "jml/arch/format.h"
00011 #include <iostream>
00012 #include "jml/arch/cmp_xchg.h"
00013 #include "jml/arch/atomic_ops.h"
00014 #include "jml/arch/timers.h"
00015 #include "jml/arch/futex.h"
00016 #include "jml/utils/floating_point.h"
00017 #include "jml/utils/smart_ptr_utils.h"
00018 #include "jml/utils/exc_assert.h"
00019 #include <boost/tuple/tuple.hpp>
00020 #include <boost/bind.hpp>
00021 #include <boost/make_shared.hpp>
00022 #include <poll.h>
00023 
00024 
00025 using namespace std;
00026 using namespace ML;
00027 
00028 
00029 namespace Datacratic {
00030 
00031 
00032 /*****************************************************************************/
00033 /* MULTI AGGREGATOR                                                          */
00034 /*****************************************************************************/
00035 
00036 MultiAggregator::
00037 MultiAggregator()
00038     : doShutdown(false), doDump(false), dumpInterval(0.0)
00039 {
00040 }
00041 
00042 MultiAggregator::
00043 MultiAggregator(const std::string & path,
00044                 const OutputFn & output,
00045                 double dumpInterval,
00046                 std::function<void ()> onStop)
00047     : doShutdown(false), doDump(false)
00048 {
00049     open(path, output, dumpInterval, onStop);
00050 }
00051 
00052 MultiAggregator::
00053 ~MultiAggregator()
00054 {
00055     shutdown();
00056 }
00057 
00058 void
00059 MultiAggregator::
00060 open(const std::string & path,
00061      const OutputFn & output,
00062      double dumpInterval,
00063      std::function<void ()> onStop)
00064 {
00065     shutdown();
00066 
00067     doShutdown = doDump = false;
00068     this->dumpInterval = dumpInterval;
00069     this->onStop = onStop;
00070 
00071     if (path == "") prefix = "";
00072     else prefix = path + ".";
00073 
00074     if (output)
00075         outputFn = output;
00076     else
00077         outputFn = [&] (const std::vector<StatReading> & values)
00078             {
00079                 this->doStat(values);
00080             };
00081 
00082     dumpingThread.reset
00083         (new std::thread(std::bind(&MultiAggregator::runDumpingThread,
00084                                        this)));
00085 }
00086 
00087 void
00088 MultiAggregator::
00089 stop()
00090 {
00091     shutdown();
00092 }
00093 
00094 void
00095 MultiAggregator::
00096 doStat(const std::vector<StatReading> & values) const
00097 {
00098     outputFn(values);
00099 }
00100 
00101 StatAggregator * createNewCounter()
00102 {
00103     return new CounterAggregator();
00104 }
00105 
00106 StatAggregator * createNewGauge()
00107 {
00108     return new GaugeAggregator();
00109 }
00110 
00111 StatAggregator * createNewOutcome()
00112 {
00113     return new GaugeAggregator();
00114 }
00115 
00116 void
00117 MultiAggregator::
00118 record(const std::string & stat,
00119        EventType type,
00120        float value)
00121 {
00122     switch (type) {
00123     case ET_LEVEL: recordLevel(stat, value);       break;
00124     case ET_ACCUM: recordQuantity(stat, value);    break;
00125     case ET_COUNT: recordOccurrence(stat /*, value*/);  break;
00126     case ET_OUTCOME: recordOutcome(stat, value);   break;
00127     default:
00128         cerr << "warning: unknown stat type" << endl;
00129     }
00130 }
00131 
00132 void
00133 MultiAggregator::
00134 recordLevel(const std::string & stat,
00135             float level)
00136 {
00137     getAggregator(stat, createNewGauge).record(level);
00138 }
00139 
00140 void
00141 MultiAggregator::
00142 recordOccurrence(const std::string & stat)
00143 {
00144     getAggregator(stat, createNewCounter).record(1.0);
00145 }
00146 
00147 void
00148 MultiAggregator::
00149 recordQuantity(const std::string & stat,
00150                float quantity)
00151 {
00152     getAggregator(stat, createNewCounter).record(quantity);
00153 }
00154     
00155 void
00156 MultiAggregator::
00157 recordOutcome(const std::string & stat,
00158               float outcome)
00159 {
00160     getAggregator(stat, createNewOutcome).record(outcome);
00161 }
00162 
00163 void
00164 MultiAggregator::
00165 dump()
00166 {
00167     {
00168         std::lock_guard<std::mutex> lock(m);
00169         doDump = true;
00170     }
00171     
00172     cond.notify_all();
00173 }
00174 
00175 void
00176 MultiAggregator::
00177 dumpSync(std::ostream & stream) const
00178 {
00179     std::unique_lock<Lock> guard(this->lock);
00180 
00181     for (auto & s: stats) {
00182         auto vals = s.second->read(s.first);
00183         for (auto v: vals) {
00184             stream << v.name << ":\t" << v.value << endl;
00185         }
00186     }
00187 }
00188 
00189 void
00190 MultiAggregator::
00191 shutdown()
00192 {
00193     if (dumpingThread) {
00194         if (onPreShutdown) onPreShutdown();
00195         
00196         {
00197             std::lock_guard<std::mutex> lock(m);
00198             doShutdown = true;
00199         }
00200 
00201         cond.notify_all();
00202         
00203         dumpingThread->join();
00204         dumpingThread.reset();
00205 
00206         if (onPostShutdown) onPostShutdown();
00207 
00208         if (onStop) onStop();
00209     }
00210 }
00211 
00212 StatAggregator &
00213 MultiAggregator::
00214 getAggregator(const std::string & stat,
00215               StatAggregator * (*createFn) ())
00216 {
00217     if (!lookupCache.get())
00218         lookupCache.reset(new LookupCache());
00219 
00220     auto found = lookupCache->find(stat);
00221     if (found != lookupCache->end())
00222         return *found->second->second;
00223 
00224     // Get the read lock to look for the aggregator
00225     std::unique_lock<Lock> guard(lock);
00226 
00227     auto found2 = stats.find(stat);
00228 
00229     if (found2 != stats.end()) {
00230         guard.unlock();
00231 
00232         (*lookupCache)[stat] = found2;
00233 
00234         return *found2->second;
00235     }
00236     
00237     guard.unlock();
00238 
00239     // Get the write lock to add it to the aggregator
00240     std::unique_lock<Lock> guard2(lock);
00241 
00242     // Add it in
00243     found2 = stats.insert(make_pair(stat, std::shared_ptr<StatAggregator>(createFn()))).first;
00244 
00245     guard2.unlock();
00246     (*lookupCache)[stat] = found2;
00247     return *found2->second;
00248 }
00249 
00250 void
00251 MultiAggregator::
00252 runDumpingThread()
00253 {
00254     Date nextDump = Date::now().plusSeconds(dumpInterval);
00255 
00256     for (;;) {
00257         std::unique_lock<std::mutex> lock(m);
00258 
00259         while ((dumpInterval == 0.0 || Date::now() < nextDump)
00260                && !doShutdown && !doDump)
00261             cond.wait_until(lock, nextDump.toStd());
00262         
00263         if (doShutdown)
00264             break;
00265 
00266         ExcAssert((dumpInterval != 0.0 && Date::now() >= nextDump) || doDump);
00267         
00268         doDump = false;
00269 
00270         // Get the read lock to extract a list of stats to dump
00271         vector<Stats::iterator> toDump;
00272 
00273         {
00274             std::unique_lock<Lock> guard(this->lock);
00275             toDump.reserve(stats.size());
00276             for (auto it = stats.begin(), end = stats.end();
00277                  it != end;  ++it)
00278                 toDump.push_back(it);
00279             //std::copy(stats.begin(), stats.end(), back_inserter(toDump));
00280         }
00281 
00282         std::vector<std::string> toWrite;
00283 
00284         // Now dump them without the lock held
00285         for (auto it = toDump.begin(), end = toDump.end();
00286              it != end;  ++it) {
00287 
00288             try {
00289                 //cerr << "doStat(" << (*it)->first << ")" << endl;
00290                 doStat((*it)->second->read((*it)->first));
00291             } catch (const std::exception & exc) {
00292                 cerr << "error writing stat: " << exc.what() << endl;
00293             }
00294         }
00295 
00296         while (nextDump < Date::now() && dumpInterval != 0.0)
00297             nextDump.addSeconds(dumpInterval);
00298     }
00299 }
00300 
00301 
00302 /*****************************************************************************/
00303 /* CARBON CONNECTOR                                                        */
00304 /*****************************************************************************/
00305 
00306 CarbonConnector::
00307 CarbonConnector()
00308 {
00309 }
00310 
00311 CarbonConnector::
00312 CarbonConnector(const std::string & carbonAddr,
00313                 const std::string & path,
00314                 std::function<void ()> onStop)
00315 {
00316     open(carbonAddr, path, onStop);
00317 }
00318 
00319 CarbonConnector::
00320 CarbonConnector(const std::vector<std::string> & carbonAddrs,
00321                 const std::string & path,
00322                 std::function<void ()> onStop)
00323 {
00324     open(carbonAddrs, path, onStop);
00325 }
00326 
00327 CarbonConnector::
00328 ~CarbonConnector()
00329 {
00330     doShutdown();
00331 }
00332 
00333 void
00334 CarbonConnector::
00335 open(const std::string & carbonAddr,
00336      const std::string & path,
00337      std::function<void ()> onStop)
00338 {
00339     return open(vector<string>({carbonAddr}), path, onStop);
00340 }
00341 
00342 void
00343 CarbonConnector::
00344 open(const std::vector<std::string> & carbonAddrs,
00345      const std::string & path,
00346      std::function<void ()> onStop)
00347 {
00348     stop();
00349 
00350     int numConnections = 0;
00351 
00352     connections.clear();
00353     for (unsigned i = 0;  i < carbonAddrs.size();  ++i) {
00354         connections.push_back(std::make_shared<Connection>
00355                                     (carbonAddrs[i]));
00356         string error = connections.back()->connect();
00357         if (connections.back()->fd == -1) {
00358             cerr << "error connecting to Carbon at " << carbonAddrs[i]
00359                  << ": " << error << endl;
00360         }
00361         else ++numConnections;
00362     }
00363 
00364     if (numConnections == 0)
00365         throw ML::Exception("unable to connect to any Carbon instances");
00366 
00367     this->onPostShutdown = std::bind(&CarbonConnector::doShutdown, this);
00368 
00369     MultiAggregator::open(path, OutputFn(), 1.0, onStop);
00370 }
00371 
00372 void
00373 CarbonConnector::
00374 doShutdown()
00375 {
00376     stop();
00377     connections.clear();
00378 }
00379 
00380 void
00381 CarbonConnector::
00382 doStat(const std::vector<StatReading> & values) const
00383 {
00384     if (connections.empty())
00385         return;
00386 
00387     std::string message;
00388 
00389     for (unsigned i = 0;  i < values.size();  ++i) {
00390         message += ML::format("%s%s %.5f %lld\n",
00391                               prefix.c_str(), values[i].name.c_str(),
00392                               values[i].value,
00393                               (unsigned long long)
00394                               values[i].timestamp.secondsSinceEpoch());
00395     }
00396 
00397     for (unsigned i = 0;  i < connections.size();  ++i)
00398         connections[i]->send(message);
00399 }
00400 
00401 CarbonConnector::Connection::
00402 ~Connection()
00403 {
00404     close();
00405 
00406     if (reconnectionThread) {
00407         shutdown = 1;
00408         futex_wake(shutdown);
00409         reconnectionThread->join();
00410         reconnectionThread.reset();
00411     }
00412 }
00413 
00414 void
00415 CarbonConnector::Connection::
00416 close()
00417 {
00418     if (fd != -1)
00419         ::close(fd);
00420     fd = -1;
00421 }
00422 
00423 std::string
00424 CarbonConnector::Connection::
00425 connect()
00426 {
00427     if (fd != -1)
00428         throw ML::Exception("error connecting");
00429 
00430     ip = ACE_INET_Addr(addr.c_str());
00431 
00432     cerr << "connecting to Carbon at "
00433          << ip.get_host_addr() << ":" << ip.get_port_number()
00434          << " (" << ip.get_host_name() << ")" << endl;
00435 
00436     int tmpFd = socket(AF_INET, SOCK_STREAM, 0);
00437     int res = ::connect(tmpFd,
00438                         (sockaddr *)ip.get_addr(),
00439                         ip.get_addr_size());
00440     
00441     int saved_errno = errno;
00442 
00443     if (res == -1) {
00444         ::close(tmpFd);
00445         return ML::format("connect to carbon at %s:%d (%s): %s",
00446                           ip.get_host_addr(),
00447                           ip.get_port_number(),
00448                           ip.get_host_name(),
00449                           strerror(saved_errno));
00450     }
00451 
00452     fd = tmpFd;
00453 
00454     return "";
00455 }
00456 
00457 void
00458 CarbonConnector::Connection::
00459 send(const std::string & message)
00460 {
00461     //cerr << "STAT: " << message << endl;
00462     //return;
00463     if (message.empty())
00464         return;
00465 
00466     //cerr << "sending to " << addr << " on " << fd << " " << message << endl;
00467 
00468     if (fd == -1) {
00469         if (reconnectionThreadActive) return;
00470         throw ML::Exception("send with fd -1 and no thread active");
00471     }
00472     
00473     size_t done = 0;
00474 
00475     for (;;) {
00476         int sendRes = ::send(fd, message.c_str() + done,
00477                              message.size() - done,
00478                              MSG_DONTWAIT | MSG_NOSIGNAL);
00479         
00480         if (sendRes > 0) {
00481             done += sendRes;
00482             if (done == message.size())
00483                 return;  // done; normal case
00484             else if (done > message.size())
00485                 throw ML::Exception("logic error sending message to Carbon");
00486             else continue;  // do the rest of the message
00487         }
00488         
00489         if (sendRes != -1)
00490             throw ML::Exception("invalid return code from send");
00491         
00492         // Error handling
00493         if (errno == EINTR)
00494             continue;  // retry
00495         else if (errno == EAGAIN || errno == EWOULDBLOCK) {
00496             // Would block (something that we don't want).  Select on the
00497             // socket for the timeout before giving up.
00498             struct pollfd events[] = {
00499                 { fd, POLLOUT | POLLERR | POLLHUP | POLLNVAL, 0 }
00500             };
00501 
00502             int res = poll(events, 1, 500 /* 500ms max timeout */);
00503             if (res == 1 && events[0].revents == POLLOUT) {
00504                 // Ready to send
00505                 continue;  // we can now send it
00506             }
00507             else if (res == -1) {
00508                 // error in epoll call
00509                 int saved_errno = errno;
00510                 cerr << "error on epoll with CarbonConnector " << addr
00511                      << ": " << strerror(saved_errno) << endl;
00512             }
00513             else if (res == 0) {
00514                 // nothing ready... must be a timeout
00515                 cerr << "timeout sending to CarbonConnector at " << addr
00516                      << endl;
00517             }
00518             else if (res == 1 && events[0].revents & ~POLLOUT) {
00519                 // Disconnection or error... need to reconnect
00520                 cerr << "disconnection sending to CarbonConnector at " << addr
00521                      << endl;
00522             }
00523             else {
00524                 // Logic error; we should never get here
00525                 throw ML::Exception("logic error in carbon connector");
00526             }
00527         } else {
00528             // Error in sending
00529             int saved_errno = errno;
00530             cerr << "error sending to CarbonConnector " << addr
00531                  << ": " << strerror(saved_errno) << endl;
00532         }
00533         break;
00534     }    
00535 
00536     reconnect();
00537 }
00538 
00539 void
00540 CarbonConnector::Connection::
00541 reconnect()
00542 {
00543     close();
00544 
00545     cerr << "reconnecting to " << addr << endl;
00546 
00547     if (reconnectionThreadJoinable && reconnectionThread) {
00548         reconnectionThread->join();
00549         reconnectionThread.reset();
00550     }
00551 
00552     reconnectionThreadActive = false;
00553     reconnectionThreadJoinable = false;
00554     
00555     reconnectionThread
00556         = std::make_shared<std::thread>
00557             (std::bind(&Connection::runReconnectionThread, this));
00558 
00559 }
00560 
00561 void
00562 CarbonConnector::Connection::
00563 runReconnectionThread()
00564 {
00565     cerr << "started reconnection thread" << endl;
00566 
00567     reconnectionThreadActive = true;
00568 
00569     // Close the current connection
00570     if (fd != -1)
00571         close();
00572 
00573     double meanWaitTime = 0.5;  // half a second
00574 
00575     while (!shutdown) {
00576         string error = connect();
00577         if (fd != -1) break;
00578 
00579         cerr << "error reconnecting to " << addr << ": " << error
00580              << endl;
00581 
00582         double r = (random() % 10001) / 10000.0;
00583         double waitTime = meanWaitTime * (0.5 + r);
00584 
00585         if (meanWaitTime < 8.0)
00586             meanWaitTime *= 2;
00587         
00588         // Wait for the given time before we attempt reconnection
00589         // again.
00590         futex_wait(shutdown, 0, waitTime);
00591     }
00592 
00593     reconnectionThreadActive = false;
00594     reconnectionThreadJoinable = true;
00595 }
00596 
00597 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator