RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* augmentation_loop.h -*- C++ -*- 00002 Jeremy Barnes, 1 March 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 Augmentation of bid requests. 00006 */ 00007 00008 #ifndef __rtb_router__augmentation_loop_h__ 00009 #define __rtb_router__augmentation_loop_h__ 00010 00011 #include "rtbkit/common/augmentation.h" 00012 #include "soa/service/timeout_map.h" 00013 #include "soa/service/zmq_endpoint.h" 00014 #include "soa/service/typed_message_channel.h" 00015 #include "router_types.h" 00016 #include <boost/scoped_ptr.hpp> 00017 #include <boost/thread/thread.hpp> 00018 #include "soa/service/zmq.hpp" 00019 #include "soa/service/socket_per_thread.h" 00020 #include "soa/service/stats_events.h" 00021 #include "jml/arch/spinlock.h" 00022 #include <boost/thread/locks.hpp> 00023 #include "soa/gc/gc_lock.h" 00024 00025 00026 namespace RTBKIT { 00027 00028 00029 /*****************************************************************************/ 00030 /* AUGMENTOR CONFIG */ 00031 /*****************************************************************************/ 00032 00034 struct AugmentorInfo { 00035 AugmentorInfo() 00036 : numInFlight(0) 00037 { 00038 } 00039 00040 std::string augmentorAddr; 00041 std::string name; 00042 std::map<Id, Date> inFlight; 00043 int numInFlight; 00044 }; 00045 00046 // Information about an auction being augmented 00047 struct AugmentationInfo { 00048 AugmentationInfo() 00049 { 00050 } 00051 00052 AugmentationInfo(const std::shared_ptr<Auction> & auction, 00053 Date lossTimeout) 00054 : auction(auction), lossTimeout(lossTimeout) 00055 { 00056 } 00057 00058 std::shared_ptr<Auction> auction; 00059 Date lossTimeout; 00060 std::vector<GroupPotentialBidders> potentialGroups; 00061 }; 00062 00063 00064 /*****************************************************************************/ 00065 /* AUGMENTATION LOOP */ 00066 /*****************************************************************************/ 00067 00068 struct AugmentationLoop : public ServiceBase, public MessageLoop { 00069 00070 AugmentationLoop(ServiceBase & parent, 00071 const std::string & name = "augmentationLoop"); 00072 AugmentationLoop(std::shared_ptr<ServiceProxies> proxies, 00073 const std::string & name = "augmentationLoop"); 00074 ~AugmentationLoop(); 00075 00076 typedef boost::function<void (const std::shared_ptr<AugmentationInfo> &)> 00077 OnFinished; 00078 00079 void init(); 00080 00081 void start(); 00082 void sleepUntilIdle(); 00083 void shutdown(); 00084 size_t numAugmenting() const; 00085 bool currentlyAugmenting(const Id & auctionId) const; 00086 00087 void bindAugmentors(const std::string & uri); 00088 00090 void augment(const std::shared_ptr<AugmentationInfo> & info, 00091 Date timeout, 00092 const OnFinished & onFinished); 00093 00094 struct Entry { 00095 std::shared_ptr<AugmentationInfo> info; 00096 std::set<std::string> outstanding; 00097 OnFinished onFinished; 00098 Date timeout; 00099 }; 00100 00104 typedef TimeoutMap<Id, std::shared_ptr<Entry> > Augmenting; 00105 Augmenting augmenting; 00106 00108 std::map<std::string, std::shared_ptr<AugmentorInfo> > augmentors; 00109 00111 struct AugmentorInfoEntry { 00112 std::string name; 00113 std::shared_ptr<AugmentorInfo> info; 00114 //std::shared_ptr<const AugmentorConfig> config; 00115 }; 00116 00120 struct AllAugmentorInfo : public std::vector<AugmentorInfoEntry> { 00121 std::set<std::string> index; 00122 }; 00123 00125 AllAugmentorInfo * allAugmentors; 00126 00128 mutable GcLock allAugmentorsGc; 00129 00130 int idle_; 00131 00133 TypedMessageSink<std::shared_ptr<Entry> > inbox; 00134 00136 ZmqNamedClientBus toAugmentors; 00137 00138 typedef ML::Spinlock Lock; 00139 typedef boost::unique_lock<Lock> Guard; 00140 mutable ML::Spinlock lock; 00141 00143 void updateAllAugmentors(); 00144 00145 void handleAugmentorMessage(const std::vector<std::string> & message); 00146 00147 void checkExpiries(); 00148 00150 void doConfig(const std::vector<std::string> & message); 00151 00153 void doResponse(const std::vector<std::string> & message); 00154 00156 void doAugment(const std::vector<std::string> & message); 00157 00158 void augmentationExpired(const Id & id, const Entry & entry); 00159 }; 00160 00161 } // namespace RTBKIT 00162 00163 #endif /* __rtb_router__augmentation_loop_h__ */