![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* transport.h -*- C++ -*- 00002 Jeremy Barnes, 23 February 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Transport abstraction for endpoints. 00006 */ 00007 00008 #ifndef __rtb__transport_h__ 00009 #define __rtb__transport_h__ 00010 00011 #include <boost/shared_ptr.hpp> 00012 #include <boost/utility.hpp> 00013 #include <boost/thread/locks.hpp> 00014 #include <ace/SOCK_Stream.h> 00015 #include <ace/Synch.h> 00016 #include "jml/arch/exception.h" 00017 #include "jml/arch/demangle.h" 00018 #include "jml/utils/guard.h" 00019 #include "jml/arch/format.h" 00020 #include "jml/arch/atomic_ops.h" 00021 #include "jml/arch/cmp_xchg.h" 00022 #include "jml/arch/spinlock.h" 00023 #include "soa/types/date.h" 00024 #include "soa/jsoncpp/json.h" 00025 #include <boost/type_traits/is_convertible.hpp> 00026 #include <boost/enable_shared_from_this.hpp> 00027 00028 namespace Datacratic { 00029 00030 00031 struct ConnectionHandler; 00032 struct EndpointBase; 00033 00034 extern boost::function<void (const char *, float)> onLatencyEvent; 00035 00036 /*****************************************************************************/ 00037 /* TRANSPORT BASE */ 00038 /*****************************************************************************/ 00039 00042 struct TransportBase : public std::enable_shared_from_this<TransportBase> { 00043 friend class EndpointBase; 00044 friend class ActiveEndpoint; 00045 friend class PassiveEndpoint; 00046 00047 TransportBase(); // throws 00048 TransportBase(EndpointBase * endpoint); 00049 00050 virtual ~TransportBase(); 00051 00052 /* Event callbacks */ 00053 virtual int handleInput(); 00054 virtual int handleOutput(); 00055 virtual int handlePeerShutdown(); 00056 virtual int handleTimeout(); 00057 virtual int handleError(const std::string & error); 00058 virtual int handleAsync(const boost::function<void ()> & callback, 00059 const char * name, Date dateSet); 00060 00061 virtual ssize_t send(const char * buf, size_t len, int flags) = 0; 00062 virtual ssize_t recv(char * buf, size_t buf_size, int flags) = 0; 00063 00064 // closeWhenHandlerFinished() should be used in almost all cases instead 00065 // of this, except when writing test code, in which case asyncClose() 00066 // should be called instead. 00067 virtual int closePeer() = 0; 00068 00069 virtual int getHandle() const = 0; 00070 00071 EndpointBase * get_endpoint() { return endpoint_; } 00072 00079 void recycleWhenHandlerFinished(); 00080 00083 void closeWhenHandlerFinished(); 00084 00089 void closeAsync(); 00090 00098 void associate(std::shared_ptr<ConnectionHandler> newSlave); 00099 00101 void 00102 associateWhenHandlerFinished(std::shared_ptr<ConnectionHandler> newSlave, 00103 const std::string & whereFrom); 00104 00105 void startReading(); 00106 void stopReading(); 00107 void startWriting(); 00108 void stopWriting(); 00109 00112 void scheduleTimerAbsolute(Date timeout, 00113 size_t cookie = 0, 00114 void (*freecookie) (size_t) = 0); 00115 00119 void scheduleTimerRelative(double secondsFromNow, 00120 size_t cookie = 0, 00121 void (*freecookie) (size_t) = 0); 00122 00124 void cancelTimer(); 00125 00129 void doAsync(const boost::function<void ()> & callback, 00130 const std::string & name); 00131 00132 00134 virtual std::string getPeerName() const = 0; 00135 00136 protected: 00137 long long lockThread; 00138 const char * lockActivity; 00139 00140 public: 00141 bool hasSlave() const { return !!slave_; } 00142 00143 void doError(const std::string & error); 00144 00146 static long created; 00147 static long destroyed; 00148 00149 virtual std::string status() const; 00150 00151 // DEBUG: record what happens on the socket 00152 bool debug; 00153 00154 struct Activity { 00155 Activity(Date time, std::string what) 00156 : time(time), what(what) 00157 { 00158 } 00159 00160 Activity(const Json::Value & val) 00161 { 00162 fromJson(val); 00163 } 00164 00165 Date time; 00166 std::string what; 00167 00168 void fromJson(const Json::Value & val); 00169 }; 00170 00171 struct Activities { 00172 00173 Activities() 00174 { 00175 } 00176 00177 Activities(const std::vector<Activity> & acts) 00178 : activities(acts) 00179 { 00180 } 00181 00182 ~Activities(); 00183 00184 void add(const std::string & act) 00185 { 00186 Guard guard(lock); 00187 00188 if (activities.size() > 200) 00189 activities.erase(activities.begin(), 00190 activities.end() - 100); 00191 00192 activities.push_back(Activity(Date::now(), act)); 00193 } 00194 00195 void limit(int maxSize) 00196 { 00197 Guard guard(lock); 00198 00199 if (activities.size() > maxSize) 00200 activities.erase(activities.begin(), 00201 activities.end() - maxSize); 00202 } 00203 00204 size_t size() const 00205 { 00206 Guard guard(lock); 00207 return activities.size(); 00208 } 00209 00210 void clear() 00211 { 00212 Guard guard(lock); 00213 activities.clear(); 00214 } 00215 00216 std::vector<Activity> takeCopy() 00217 { 00218 Guard guard(lock); 00219 return activities; 00220 } 00221 00222 void dump() const; 00223 00224 Json::Value toJson(int first = 0, int last = -1) const; 00225 00226 void fromJson(const Json::Value & val); 00227 00228 private: 00229 std::vector<Activity> activities; 00230 00231 typedef boost::lock_guard<ML::Spinlock> Guard; 00232 mutable ML::Spinlock lock; 00233 }; 00234 00235 Activities activities; 00236 00237 bool debugOn() const { return debug; } 00238 00239 void addActivity(const std::string & act) 00240 { 00241 if (!debug) return; 00242 //assertLockedByThisThread(); 00243 checkMagic(); 00244 activities.add(act); 00245 } 00246 00247 void addActivityS(const char * act) 00248 { 00249 if (!debug) return; 00250 //assertLockedByThisThread(); 00251 checkMagic(); 00252 activities.add(act); 00253 } 00254 00255 void addActivity(const char * fmt, ...) 00256 { 00257 if (!debug) return; 00258 //assertLockedByThisThread(); 00259 checkMagic(); 00260 00261 va_list ap; 00262 va_start(ap, fmt); 00263 ML::Call_Guard cleanupAp([&] () { va_end(ap); }); 00264 activities.add(ML::vformat(fmt, ap)); 00265 } 00266 00267 void dumpActivities() const 00268 { 00269 activities.dump(); 00270 } 00271 00272 struct InHandlerGuard { 00273 InHandlerGuard(TransportBase * t, const char * where) 00274 : t(0), where(0) 00275 { 00276 //using namespace std; 00277 //cerr << "InHandlerGuard constructur for " << where 00278 // << " t = " << t << endl; 00279 t->checkMagic(); 00280 init(t, where); 00281 } 00282 00283 InHandlerGuard(TransportBase & t, const char * where) 00284 : t(0), where(0) 00285 { 00286 //using namespace std; 00287 //cerr << "InHandlerGuard constructur for " << where 00288 // << " t = " << &t << endl; 00289 t.checkMagic(); 00290 init(&t, where); 00291 } 00292 00293 ~InHandlerGuard() 00294 { 00295 //using namespace std; 00296 //cerr << "InHandlerGuard destructor for " << where 00297 // << " t = " << t << endl; 00298 00299 if (!t) return; 00300 t->assertLockedByThisThread(); 00301 t->lockThread = 0; 00302 t->lockActivity = 0; 00303 } 00304 00305 void init(TransportBase * t, const char * where) 00306 { 00307 using namespace std; 00308 #if 0 00309 cerr << "InHandlerGuard init for " << where 00310 << " with " << t << " lockActivity " 00311 << t->lockActivity << endl; 00312 #endif 00313 this->t = t; 00314 this->where = where; 00315 00316 long long me = ACE_OS::thr_self(); 00317 long long locker = t->lockThread; 00318 00319 for (;;) { 00320 if (locker == me) { 00321 // recursively locked 00322 this->t = 0; 00323 return; 00324 } 00325 if (locker) 00326 throw ML::Exception("attempting to enter handler %s: " 00327 "already locked by thread %lld " 00328 " (not my thread %lld) doing %s", 00329 where, locker, me, t->lockActivity); 00330 if (ML::cmp_xchg(t->lockThread, locker, me)) break; 00331 } 00332 00333 t->lockActivity = where; 00334 t->assertLockedByThisThread(); 00335 } 00336 00337 void reset() 00338 { 00339 t = 0; 00340 } 00341 00342 TransportBase * t; 00343 const char * where; 00344 }; 00345 00346 struct AsyncEntry { 00347 AsyncEntry() 00348 : name("none") 00349 { 00350 } 00351 00352 AsyncEntry(const boost::function<void ()> & callback, 00353 const std::string & name) 00354 : callback(callback), name(name), date(Date::now()) 00355 { 00356 } 00357 00358 boost::function<void ()> callback; 00359 std::string name; 00360 Date date; 00361 }; 00362 00364 struct AsyncNode : public AsyncEntry { 00365 AsyncNode(const boost::function<void ()> & callback, 00366 const std::string & name) 00367 : AsyncEntry(callback, name), next(0) 00368 { 00369 } 00370 00371 AsyncNode * next; 00372 }; 00373 00374 bool hasAsync() const 00375 { 00376 return asyncHead_; 00377 } 00378 00380 AsyncNode * asyncHead_; 00381 00385 void pushAsync(const boost::function<void ()> & fn, 00386 const std::string & name); 00387 00391 std::vector<AsyncEntry> popAsync(); 00392 00393 bool isZombie() const { return zombie_; } 00394 00395 bool locked() const 00396 { 00397 return lockThread; 00398 } 00399 00400 bool lockedByThisThread() const 00401 { 00402 return lockThread == ACE_OS::thr_self(); 00403 } 00404 00406 void assertLockedByThisThread() const 00407 { 00408 if (!lockedByThisThread()) 00409 throw ML::Exception("should be locked by this thread %lld " 00410 "but instead locked by %lld in %s", 00411 (long long)ACE_OS::thr_self(), 00412 lockThread, lockActivity); 00413 checkMagic(); 00414 } 00415 00417 void assertNotLockedByAnotherThread() const 00418 { 00419 if (locked() && !lockedByThisThread()) 00420 throw ML::Exception("already locked by thread %lld " 00421 " (not my thread %lld) doing %s", 00422 lockThread, 00423 (long long)ACE_OS::thr_self(), 00424 lockActivity); 00425 checkMagic(); 00426 } 00427 00430 void checkMagic() const; 00431 00432 ConnectionHandler & slave() 00433 { 00434 if (!slave_) { 00435 activities.dump(); 00436 throw ML::Exception("transport %p of type %s" 00437 "has no associated slave", this, 00438 ML::type_name(*this).c_str()); 00439 } 00440 return *slave_; 00441 } 00442 00443 bool hasTimeout() const 00444 { 00445 return timeout_.isSet(); 00446 } 00447 00448 Date nextTimeout() const 00449 { 00450 return timeout_.timeout; 00451 } 00452 00457 void hasConnection(); 00458 00459 private: 00460 std::shared_ptr<ConnectionHandler> slave_; 00461 EndpointBase * endpoint_; 00462 00466 std::shared_ptr<ConnectionHandler> newSlave_; 00467 std::string newSlaveFrom_; 00468 00471 bool recycle_; 00472 00475 bool close_; 00476 00478 short flags_; 00479 00481 int epollFd_; 00482 00484 int timerFd_; 00485 00487 int eventFd_; 00488 00490 bool hasConnection_; 00491 00493 struct Timeout { 00494 Timeout() 00495 : timeout(Date::notADate()), timeoutCookie(0), freeTimeoutCookie(0) 00496 { 00497 } 00498 00499 ~Timeout() 00500 { 00501 cancel(); 00502 } 00503 00504 00505 Date timeout; 00506 size_t timeoutCookie; 00507 void (*freeTimeoutCookie) (size_t); 00508 00509 bool isSet() const 00510 { 00511 return timeout.isADate(); 00512 } 00513 00514 void cancel() 00515 { 00516 timeout = Date::notADate(); 00517 if (freeTimeoutCookie) 00518 freeTimeoutCookie(timeoutCookie); 00519 timeoutCookie = 0; 00520 } 00521 00525 bool set(Date date, size_t cookie, void (*freeCookie) (size_t)) 00526 { 00527 if (!date.isADate()) 00528 throw ML::Exception("Transport::Timeout::set(): not a date"); 00529 00530 Date oldTimeout = timeout; 00531 00532 cancel(); 00533 00534 timeout = date; 00535 timeoutCookie = cookie; 00536 freeTimeoutCookie = freeCookie; 00537 00538 return (!oldTimeout.isADate() || date < oldTimeout); 00539 } 00540 00541 }; 00542 00544 Timeout timeout_; 00545 00547 int magic_; 00548 00552 bool zombie_; 00553 00555 int endEventHandler(const char * handler, InHandlerGuard & guard); 00556 00558 void handleException(const std::string & handler, 00559 const std::exception & exc); 00560 void handleUnknownException(const std::string & handler); 00561 00571 std::shared_ptr<ConnectionHandler> disassociate(); 00572 00578 int handleEvents(); 00579 }; 00580 00581 00582 /*****************************************************************************/ 00583 /* SOCKET TRANSPORT */ 00584 /*****************************************************************************/ 00585 00586 struct SocketTransport 00587 : public TransportBase { 00588 00589 SocketTransport(); // throws 00590 SocketTransport(EndpointBase * endpoint); 00591 00592 virtual ~SocketTransport(); 00593 00594 virtual int getHandle() const 00595 { 00596 return peer().get_handle(); 00597 } 00598 00599 virtual std::string getPeerName() const { return peerName_; } 00600 00601 virtual ssize_t send(const char * buf, size_t len, int flags); 00602 virtual ssize_t recv(char * buf, size_t buf_size, int flags); 00603 virtual int closePeer(); 00604 00605 ACE_SOCK_Stream & peer() { return peer_; } 00606 const ACE_SOCK_Stream & peer() const { return peer_; } 00607 00608 ACE_SOCK_Stream peer_; 00609 std::string peerName_; 00610 }; 00611 00612 00613 00614 } // namespace Datacratic 00615 00616 #endif /* __rtb__transport_h__ */ 00617
1.7.6.1