RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
core/router/augmentation_loop.h
00001 /* augmentation_loop.h                                              -*- C++ -*-
00002    Jeremy Barnes, 1 March 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005    Augmentation of bid requests.
00006 */
00007 
00008 #ifndef __rtb_router__augmentation_loop_h__
00009 #define __rtb_router__augmentation_loop_h__
00010 
00011 #include "rtbkit/common/augmentation.h"
00012 #include "soa/service/timeout_map.h"
00013 #include "soa/service/zmq_endpoint.h"
00014 #include "soa/service/typed_message_channel.h"
00015 #include "router_types.h"
00016 #include <boost/scoped_ptr.hpp>
00017 #include <boost/thread/thread.hpp>
00018 #include "soa/service/zmq.hpp"
00019 #include "soa/service/socket_per_thread.h"
00020 #include "soa/service/stats_events.h"
00021 #include "jml/arch/spinlock.h"
00022 #include <boost/thread/locks.hpp>
00023 #include "soa/gc/gc_lock.h"
00024 
00025 
00026 namespace RTBKIT {
00027 
00028 
00029 /*****************************************************************************/
00030 /* AUGMENTOR CONFIG                                                          */
00031 /*****************************************************************************/
00032 
00034 struct AugmentorInfo {
00035     AugmentorInfo()
00036         : numInFlight(0)
00037     {
00038     }
00039 
00040     std::string augmentorAddr;             
00041     std::string name;                   
00042     std::map<Id, Date> inFlight;
00043     int numInFlight;
00044 };
00045 
00046 // Information about an auction being augmented
00047 struct AugmentationInfo {
00048     AugmentationInfo()
00049     {
00050     }
00051 
00052     AugmentationInfo(const std::shared_ptr<Auction> & auction,
00053                      Date lossTimeout)
00054         : auction(auction), lossTimeout(lossTimeout)
00055     {
00056     }
00057 
00058     std::shared_ptr<Auction> auction;   
00059     Date lossTimeout;                     
00060     std::vector<GroupPotentialBidders> potentialGroups; 
00061 };
00062 
00063 
00064 /*****************************************************************************/
00065 /* AUGMENTATION LOOP                                                         */
00066 /*****************************************************************************/
00067 
00068 struct AugmentationLoop : public ServiceBase, public MessageLoop {
00069 
00070     AugmentationLoop(ServiceBase & parent,
00071                      const std::string & name = "augmentationLoop");
00072     AugmentationLoop(std::shared_ptr<ServiceProxies> proxies,
00073                      const std::string & name = "augmentationLoop");
00074     ~AugmentationLoop();
00075     
00076     typedef boost::function<void (const std::shared_ptr<AugmentationInfo> &)>
00077         OnFinished;
00078 
00079     void init();
00080 
00081     void start();
00082     void sleepUntilIdle();
00083     void shutdown();
00084     size_t numAugmenting() const;
00085     bool currentlyAugmenting(const Id & auctionId) const;
00086 
00087     void bindAugmentors(const std::string & uri);
00088 
00090     void augment(const std::shared_ptr<AugmentationInfo> & info,
00091                  Date timeout,
00092                  const OnFinished & onFinished);
00093 
00094     struct Entry {
00095         std::shared_ptr<AugmentationInfo> info;
00096         std::set<std::string> outstanding;
00097         OnFinished onFinished;
00098         Date timeout;
00099     };
00100 
00104     typedef TimeoutMap<Id, std::shared_ptr<Entry> > Augmenting;
00105     Augmenting augmenting;
00106 
00108     std::map<std::string, std::shared_ptr<AugmentorInfo> > augmentors;
00109 
00111     struct AugmentorInfoEntry {
00112         std::string name;
00113         std::shared_ptr<AugmentorInfo> info;
00114         //std::shared_ptr<const AugmentorConfig> config;
00115     };
00116 
00120     struct AllAugmentorInfo : public std::vector<AugmentorInfoEntry> {
00121         std::set<std::string> index;
00122     };
00123 
00125     AllAugmentorInfo * allAugmentors;
00126 
00128     mutable GcLock allAugmentorsGc;
00129 
00130     int idle_;
00131 
00133     TypedMessageSink<std::shared_ptr<Entry> > inbox;
00134 
00136     ZmqNamedClientBus toAugmentors;
00137 
00138     typedef ML::Spinlock Lock;
00139     typedef boost::unique_lock<Lock> Guard;
00140     mutable ML::Spinlock lock;
00141 
00143     void updateAllAugmentors();
00144 
00145     void handleAugmentorMessage(const std::vector<std::string> & message);
00146 
00147     void checkExpiries();
00148 
00150     void doConfig(const std::vector<std::string> & message);
00151 
00153     void doResponse(const std::vector<std::string> & message);
00154 
00156     void doAugment(const std::vector<std::string> & message);
00157 
00158     void augmentationExpired(const Id & id, const Entry & entry);
00159 };
00160 
00161 } // namespace RTBKIT
00162 
00163 #endif /* __rtb_router__augmentation_loop_h__ */
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator