RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/transport.h
00001 /* transport.h                                                     -*- C++ -*-
00002    Jeremy Barnes, 23 February 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004    
00005    Transport abstraction for endpoints.
00006 */
00007 
00008 #ifndef __rtb__transport_h__
00009 #define __rtb__transport_h__
00010 
00011 #include <boost/shared_ptr.hpp>
00012 #include <boost/utility.hpp>
00013 #include <boost/thread/locks.hpp>
00014 #include <ace/SOCK_Stream.h>
00015 #include <ace/Synch.h>
00016 #include "jml/arch/exception.h"
00017 #include "jml/arch/demangle.h"
00018 #include "jml/utils/guard.h"
00019 #include "jml/arch/format.h"
00020 #include "jml/arch/atomic_ops.h"
00021 #include "jml/arch/cmp_xchg.h"
00022 #include "jml/arch/spinlock.h"
00023 #include "soa/types/date.h"
00024 #include "soa/jsoncpp/json.h"
00025 #include <boost/type_traits/is_convertible.hpp>
00026 #include <boost/enable_shared_from_this.hpp>
00027 
00028 namespace Datacratic {
00029 
00030 
00031 struct ConnectionHandler;
00032 struct EndpointBase;
00033 
00034 extern boost::function<void (const char *, float)> onLatencyEvent;
00035 
00036 /*****************************************************************************/
00037 /* TRANSPORT BASE                                                            */
00038 /*****************************************************************************/
00039 
00042 struct TransportBase : public std::enable_shared_from_this<TransportBase> {
00043     friend class EndpointBase;
00044     friend class ActiveEndpoint;
00045     friend class PassiveEndpoint;
00046 
00047     TransportBase(); // throws
00048     TransportBase(EndpointBase * endpoint);
00049 
00050     virtual ~TransportBase();
00051     
00052     /* Event callbacks */
00053     virtual int handleInput();
00054     virtual int handleOutput();
00055     virtual int handlePeerShutdown();
00056     virtual int handleTimeout();
00057     virtual int handleError(const std::string & error);
00058     virtual int handleAsync(const boost::function<void ()> & callback,
00059                             const char * name, Date dateSet);
00060 
00061     virtual ssize_t send(const char * buf, size_t len, int flags) = 0;
00062     virtual ssize_t recv(char * buf, size_t buf_size, int flags) = 0;
00063 
00064     // closeWhenHandlerFinished() should be used in almost all cases instead
00065     // of this, except when writing test code, in which case asyncClose()
00066     // should be called instead.
00067     virtual int closePeer() = 0;
00068 
00069     virtual int getHandle() const = 0;
00070     
00071     EndpointBase * get_endpoint() { return endpoint_; }
00072 
00079     void recycleWhenHandlerFinished();
00080 
00083     void closeWhenHandlerFinished();
00084 
00089     void closeAsync();
00090 
00098     void associate(std::shared_ptr<ConnectionHandler> newSlave);
00099 
00101     void
00102     associateWhenHandlerFinished(std::shared_ptr<ConnectionHandler> newSlave,
00103                                  const std::string & whereFrom);
00104 
00105     void startReading();
00106     void stopReading();
00107     void startWriting();
00108     void stopWriting();
00109 
00112     void scheduleTimerAbsolute(Date timeout,
00113                                size_t cookie = 0,
00114                                void (*freecookie) (size_t) = 0);
00115 
00119     void scheduleTimerRelative(double secondsFromNow,
00120                                size_t cookie = 0,
00121                                void (*freecookie) (size_t) = 0);
00122     
00124     void cancelTimer();
00125 
00129     void doAsync(const boost::function<void ()> & callback,
00130                  const std::string & name);
00131 
00132 
00134     virtual std::string getPeerName() const = 0;
00135 
00136 protected:
00137     long long lockThread;   
00138     const char * lockActivity;
00139 
00140 public:
00141     bool hasSlave() const { return !!slave_; }
00142 
00143     void doError(const std::string & error);
00144 
00146     static long created;
00147     static long destroyed;
00148 
00149     virtual std::string status() const;
00150 
00151     // DEBUG: record what happens on the socket
00152     bool debug;
00153 
00154     struct Activity {
00155         Activity(Date time, std::string what)
00156             : time(time), what(what)
00157         {
00158         }
00159 
00160         Activity(const Json::Value & val)
00161         {
00162             fromJson(val);
00163         }
00164 
00165         Date time;
00166         std::string what;
00167 
00168         void fromJson(const Json::Value & val);
00169     };
00170 
00171     struct Activities {
00172         
00173         Activities()
00174         {
00175         }
00176 
00177         Activities(const std::vector<Activity> & acts)
00178             : activities(acts)
00179         {
00180         }
00181 
00182         ~Activities();
00183 
00184         void add(const std::string & act)
00185         {
00186             Guard guard(lock);
00187 
00188             if (activities.size() > 200)
00189                 activities.erase(activities.begin(),
00190                                  activities.end() - 100);
00191 
00192             activities.push_back(Activity(Date::now(), act));
00193         }
00194 
00195         void limit(int maxSize)
00196         {
00197             Guard guard(lock);
00198 
00199             if (activities.size() > maxSize)
00200                 activities.erase(activities.begin(),
00201                                  activities.end() - maxSize);
00202         }
00203 
00204         size_t size() const
00205         {
00206             Guard guard(lock);
00207             return activities.size();
00208         }
00209 
00210         void clear()
00211         {
00212             Guard guard(lock);
00213             activities.clear();
00214         }
00215 
00216         std::vector<Activity> takeCopy()
00217         {
00218             Guard guard(lock);
00219             return activities;
00220         }
00221 
00222         void dump() const;
00223 
00224         Json::Value toJson(int first = 0, int last = -1) const;
00225 
00226         void fromJson(const Json::Value & val);
00227 
00228     private:
00229         std::vector<Activity> activities;
00230         
00231         typedef boost::lock_guard<ML::Spinlock> Guard;
00232         mutable ML::Spinlock lock;
00233     };
00234     
00235     Activities activities;
00236 
00237     bool debugOn() const { return debug; }
00238 
00239     void addActivity(const std::string & act)
00240     {
00241         if (!debug) return;
00242         //assertLockedByThisThread();
00243         checkMagic();
00244         activities.add(act);
00245     }
00246 
00247     void addActivityS(const char * act)
00248     {
00249         if (!debug) return;
00250         //assertLockedByThisThread();
00251         checkMagic();
00252         activities.add(act);
00253     }
00254 
00255     void addActivity(const char * fmt, ...)
00256     {
00257         if (!debug) return;
00258         //assertLockedByThisThread();
00259         checkMagic();
00260 
00261         va_list ap;
00262         va_start(ap, fmt);
00263         ML::Call_Guard cleanupAp([&] () { va_end(ap); });
00264         activities.add(ML::vformat(fmt, ap));
00265     }
00266 
00267     void dumpActivities() const
00268     {
00269         activities.dump();
00270     }
00271 
00272     struct InHandlerGuard {
00273         InHandlerGuard(TransportBase * t, const char * where)
00274             : t(0), where(0)
00275         {
00276             //using namespace std;
00277             //cerr << "InHandlerGuard constructur for " << where
00278             //     << " t = " << t << endl;
00279             t->checkMagic();
00280             init(t, where);
00281         }
00282         
00283         InHandlerGuard(TransportBase & t, const char * where)
00284             : t(0), where(0)
00285         {
00286             //using namespace std;
00287             //cerr << "InHandlerGuard constructur for " << where
00288             //     << " t = " << &t << endl;
00289             t.checkMagic();
00290             init(&t, where);
00291         }
00292         
00293         ~InHandlerGuard()
00294         {
00295             //using namespace std;
00296             //cerr << "InHandlerGuard destructor for " << where
00297             //     << " t = " << t << endl;
00298 
00299             if (!t) return;
00300             t->assertLockedByThisThread();
00301             t->lockThread = 0;
00302             t->lockActivity = 0;
00303         }
00304 
00305         void init(TransportBase * t, const char * where)
00306         {
00307             using namespace std;
00308 #if 0
00309             cerr << "InHandlerGuard init for " << where
00310                  << " with " << t << " lockActivity "
00311                  << t->lockActivity << endl;
00312 #endif
00313             this->t = t;
00314             this->where = where;
00315 
00316             long long me = ACE_OS::thr_self();
00317             long long locker = t->lockThread;
00318 
00319             for (;;) {
00320                 if (locker == me) {
00321                     // recursively locked
00322                     this->t = 0;
00323                     return;
00324                 }
00325                 if (locker)
00326                     throw ML::Exception("attempting to enter handler %s: "
00327                                         "already locked by thread %lld "
00328                                         " (not my thread %lld) doing %s",
00329                                         where, locker, me, t->lockActivity);
00330                 if (ML::cmp_xchg(t->lockThread, locker, me)) break;
00331             }
00332 
00333             t->lockActivity = where;
00334             t->assertLockedByThisThread();
00335         }
00336 
00337         void reset()
00338         {
00339             t = 0;
00340         }
00341         
00342         TransportBase * t;
00343         const char * where;
00344     };
00345 
00346     struct AsyncEntry {
00347         AsyncEntry()
00348             : name("none")
00349         {
00350         }
00351 
00352         AsyncEntry(const boost::function<void ()> & callback,
00353                    const std::string & name)
00354             : callback(callback), name(name), date(Date::now())
00355         {
00356         }
00357 
00358         boost::function<void ()> callback;
00359         std::string name;
00360         Date date;
00361     };
00362 
00364     struct AsyncNode : public AsyncEntry {
00365         AsyncNode(const boost::function<void ()> & callback,
00366                   const std::string & name)
00367             : AsyncEntry(callback, name), next(0)
00368         {
00369         }
00370 
00371         AsyncNode * next;
00372     };
00373 
00374     bool hasAsync() const
00375     {
00376         return asyncHead_;
00377     }
00378 
00380     AsyncNode * asyncHead_;
00381 
00385     void pushAsync(const boost::function<void ()> & fn,
00386                    const std::string & name);
00387 
00391     std::vector<AsyncEntry> popAsync();
00392     
00393     bool isZombie() const { return zombie_; }
00394 
00395     bool locked() const
00396     {
00397         return lockThread;
00398     }
00399 
00400     bool lockedByThisThread() const
00401     {
00402         return lockThread == ACE_OS::thr_self();
00403     }
00404 
00406     void assertLockedByThisThread() const
00407     {
00408         if (!lockedByThisThread())
00409             throw ML::Exception("should be locked by this thread %lld "
00410                                 "but instead locked by %lld in %s",
00411                                 (long long)ACE_OS::thr_self(),
00412                                 lockThread, lockActivity);
00413         checkMagic();
00414     }
00415 
00417     void assertNotLockedByAnotherThread() const
00418     {
00419         if (locked() && !lockedByThisThread())
00420             throw ML::Exception("already locked by thread %lld "
00421                                 " (not my thread %lld) doing %s",
00422                                 lockThread,
00423                                 (long long)ACE_OS::thr_self(),
00424                                 lockActivity);
00425         checkMagic();
00426     }
00427 
00430     void checkMagic() const;
00431 
00432     ConnectionHandler & slave()
00433     {
00434         if (!slave_) {
00435             activities.dump();
00436             throw ML::Exception("transport %p of type %s"
00437                                 "has no associated slave", this,
00438                                 ML::type_name(*this).c_str());
00439         }
00440         return *slave_;
00441     }
00442 
00443     bool hasTimeout() const
00444     {
00445         return timeout_.isSet();
00446     }
00447 
00448     Date nextTimeout() const
00449     {
00450         return timeout_.timeout;
00451     }
00452 
00457     void hasConnection();
00458 
00459 private:
00460     std::shared_ptr<ConnectionHandler> slave_;
00461     EndpointBase * endpoint_;
00462 
00466     std::shared_ptr<ConnectionHandler> newSlave_;
00467     std::string newSlaveFrom_;
00468 
00471     bool recycle_;
00472 
00475     bool close_;
00476 
00478     short flags_;
00479 
00481     int epollFd_;
00482 
00484     int timerFd_;
00485 
00487     int eventFd_;
00488 
00490     bool hasConnection_;
00491 
00493     struct Timeout {
00494         Timeout()
00495             : timeout(Date::notADate()), timeoutCookie(0), freeTimeoutCookie(0)
00496         {
00497         }
00498 
00499         ~Timeout()
00500         {
00501             cancel();
00502         }
00503         
00504 
00505         Date timeout;               
00506         size_t timeoutCookie;
00507         void (*freeTimeoutCookie) (size_t);
00508 
00509         bool isSet() const
00510         {
00511             return timeout.isADate();
00512         }
00513 
00514         void cancel()
00515         {
00516             timeout = Date::notADate();
00517             if (freeTimeoutCookie)
00518                 freeTimeoutCookie(timeoutCookie);
00519             timeoutCookie = 0;
00520         }
00521 
00525         bool set(Date date, size_t cookie, void (*freeCookie) (size_t))
00526         {
00527             if (!date.isADate())
00528                 throw ML::Exception("Transport::Timeout::set(): not a date");
00529 
00530             Date oldTimeout = timeout;
00531 
00532             cancel();
00533 
00534             timeout = date;
00535             timeoutCookie = cookie;
00536             freeTimeoutCookie = freeCookie;
00537 
00538             return (!oldTimeout.isADate() || date < oldTimeout);
00539         }
00540 
00541     };
00542 
00544     Timeout timeout_;
00545 
00547     int magic_;
00548 
00552     bool zombie_;
00553 
00555     int endEventHandler(const char * handler, InHandlerGuard & guard);
00556 
00558     void handleException(const std::string & handler,
00559                          const std::exception & exc);
00560     void handleUnknownException(const std::string & handler);
00561 
00571     std::shared_ptr<ConnectionHandler> disassociate();
00572 
00578     int handleEvents();
00579 };
00580 
00581 
00582 /*****************************************************************************/
00583 /* SOCKET TRANSPORT                                                          */
00584 /*****************************************************************************/
00585 
00586 struct SocketTransport
00587     : public TransportBase {
00588 
00589     SocketTransport();  // throws
00590     SocketTransport(EndpointBase * endpoint);
00591 
00592     virtual ~SocketTransport();
00593 
00594     virtual int getHandle() const
00595     {
00596         return peer().get_handle();
00597     }
00598 
00599     virtual std::string getPeerName() const { return peerName_; }
00600 
00601     virtual ssize_t send(const char * buf, size_t len, int flags);
00602     virtual ssize_t recv(char * buf, size_t buf_size, int flags);
00603     virtual int closePeer();
00604 
00605     ACE_SOCK_Stream & peer() { return peer_; }
00606     const ACE_SOCK_Stream & peer() const { return peer_; }
00607 
00608     ACE_SOCK_Stream peer_;
00609     std::string peerName_;
00610 };
00611 
00612 
00613 
00614 } // namespace Datacratic
00615 
00616 #endif /* __rtb__transport_h__ */
00617 
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator