RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
core/banker/master_banker.h
00001 /* master_banker.h                                                 -*- C++ -*-
00002    Jeremy Barnes, 8 November 2012
00003    Copyright (c) 2012 Datacratic Inc.  All rights reserved.
00004 
00005    Master banker class to deal with all budgeting.
00006 */
00007 
00008 #ifndef __banker__master_banker_h__
00009 #define __banker__master_banker_h__
00010 
00011 #include "banker.h"
00012 #include "soa/service/named_endpoint.h"
00013 #include "soa/service/message_loop.h"
00014 #include "soa/service/redis.h"
00015 #include "soa/service/rest_service_endpoint.h"
00016 #include "soa/service/rest_request_router.h"
00017 #include "jml/utils/vector_utils.h"
00018 #include "jml/utils/positioned_types.h"
00019 #include <type_traits>
00020 #include "rtbkit/core/monitor/monitor_provider.h"
00021 
00022 #include "null_banker.h"  // debug
00023 
00024 namespace RTBKIT {
00025 
00026 class Accounts;
00027 
00028 inline AccountKey restDecode(const std::string & str, AccountKey *)
00029 {
00030     return AccountKey(ML::split(str, ':'));
00031 }
00032 
00033 inline std::string restEncode(const AccountKey & val)
00034 {
00035     return val.toString();
00036 }
00037 
00038 
00039 /*****************************************************************************/
00040 /* REAL BANKER                                                               */
00041 /*****************************************************************************/
00042 
00141 /*****************************************************************************/
00142 /* BANKER PERSISTENCE                                                        */
00143 /*****************************************************************************/
00144 
00145 struct BankerPersistence {
00146     enum PersistenceCallbackStatus {
00147         SUCCESS,             /* info = "" */
00148         BACKEND_ERROR,       /* info = error string */
00149         DATA_INCONSISTENCY   /* info = json array of account keys */
00150     };
00151 
00152     virtual ~BankerPersistence()
00153     {
00154     }
00155 
00156     /* callback types */
00157     typedef std::function<void (std::shared_ptr<Accounts>,
00158                                 PersistenceCallbackStatus,
00159                                 const std::string & info)>
00160         OnLoadedCallback;
00161     typedef std::function<void (PersistenceCallbackStatus,
00162                                 const std::string & info)>
00163         OnSavedCallback;
00164 
00165     /* backend methods */
00166     virtual void loadAll(const std::string & topLevelKey,
00167                          OnLoadedCallback onLoaded) = 0;
00168     virtual void saveAll(const Accounts & toSave,
00169                          OnSavedCallback onDone) = 0;
00170 };
00171 
00172 
00173 /*****************************************************************************/
00174 /* NO BANKER PERSISTENCE                                                     */
00175 /*****************************************************************************/
00176 
00177 struct NoBankerPersistence : public BankerPersistence {
00178     NoBankerPersistence()
00179     {
00180     }
00181 
00182     virtual ~NoBankerPersistence()
00183     {
00184     }
00185 
00186     virtual void
00187     loadAll(const std::string & topLevelKey, OnLoadedCallback onLoaded)
00188     {
00189         onLoaded(std::make_shared<Accounts>(), SUCCESS, "");
00190     }
00191 
00192     virtual void
00193     saveAll(const Accounts & toSave, OnSavedCallback onDone)
00194     {
00195         onDone(SUCCESS, "");
00196     }
00197 };
00198 
00199 /*****************************************************************************/
00200 /* REDIS BANKER PERSISTENCE                                                  */
00201 /*****************************************************************************/
00202 
00203 struct RedisBankerPersistence : public BankerPersistence {
00204     RedisBankerPersistence(const Redis::Address & redis);
00205     RedisBankerPersistence(std::shared_ptr<Redis::AsyncConnection> redis);
00206 
00207     struct Itl;
00208     std::shared_ptr<Itl> itl;
00209 
00210     void loadAll(const std::string & topLevelKey, OnLoadedCallback onLoaded);
00211     void saveAll(const Accounts & toSave, OnSavedCallback onDone);
00212 };
00213 
00214 /*****************************************************************************/
00215 /* OLD REDIS BANKER PERSISTENCE                                              */
00216 /*****************************************************************************/
00217 
00218 struct OldRedisBankerPersistence : public BankerPersistence {
00219 
00220     OldRedisBankerPersistence();
00221 
00222     struct Itl;
00223     std::shared_ptr<Itl> itl;
00224 
00225     void loadAll(const std::string & topLevelKey, OnLoadedCallback onLoaded);
00226     void saveAll(const Accounts & toSave, OnSavedCallback onDone);
00227 };
00228 
00229 
00230 /*****************************************************************************/
00231 /* MASTER BANKER                                                             */
00232 /*****************************************************************************/
00233 
00238 struct MasterBanker
00239     : public ServiceBase,
00240       public RestServiceEndpoint,
00241       public MonitorProvider
00242 {
00243 
00244     MasterBanker(std::shared_ptr<ServiceProxies> proxies,
00245                  const std::string & serviceName = "masterBanker");
00246     ~MasterBanker();
00247 
00248     std::shared_ptr<BankerPersistence> storage_;
00249 
00250     void init(const std::shared_ptr<BankerPersistence> & storage);
00251     void start();
00252     std::pair<std::string, std::string> bindTcp();
00253 
00254     void shutdown();
00255 
00263     void bindFixedHttpAddress(const std::string & uri);
00264 
00265     RestRequestRouter router;
00266     Accounts accounts;
00267     Date lastSavedState;
00268     BankerPersistence::PersistenceCallbackStatus lastSaveStatus;
00269 
00270     typedef ML::Spinlock Lock;
00271     typedef std::unique_lock<Lock> Guard;
00272     mutable Lock saveLock;
00273     int saving;
00274 
00275     Json::Value createAccount(const AccountKey & key, AccountType type);
00276 
00278     void saveState();
00279 
00283     void loadStateSync();
00284 
00285     void onStateLoaded(std::shared_ptr<Accounts> newAccounts,
00286                        BankerPersistence::PersistenceCallbackStatus status,
00287                        const std::string & info);
00288     void onStateSaved(BankerPersistence::PersistenceCallbackStatus status,
00289                       const std::string & info);
00290 
00291     /* Reponds to Monitor requests */
00292     MonitorProviderClient monitorProviderClient;
00293 
00294     /* MonitorProvider interface */
00295     std::string getProviderName() const;
00296     Json::Value getProviderIndicators() const;
00297 
00298     Date lastWin;
00299     Date lastImpression;
00300 };
00301 
00302 } // namespace RTBKIT
00303 
00304 #endif /* __banker__master_banker_h__ */
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator