RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
core/banker/slave_banker.cc
00001 /* slave_banker.cc
00002    Jeremy Barnes, 8 November 2012
00003    Copyright (c) 2012 Datacratic Inc.  All rights reserved.
00004 
00005    Slave banker implementation.
00006 */
00007 
00008 #include "slave_banker.h"
00009 #include "soa/service/http_header.h"
00010 #include "jml/utils/vector_utils.h"
00011 
00012 using namespace std;
00013 
00014 
00015 namespace RTBKIT {
00016 
00017 
00018 /*****************************************************************************/
00019 /* SLAVE BUDGET CONTROLLER                                                   */
00020 /*****************************************************************************/
00021 
00022 SlaveBudgetController::
00023 SlaveBudgetController()
00024 {
00025 }
00026 
00027 void
00028 SlaveBudgetController::
00029 addAccount(const AccountKey & account,
00030            const OnBudgetResult & onResult)
00031 {
00032     push(budgetResultCallback(onResult),
00033          "POST", "/v1/accounts",
00034          { {"accountName", account.toString()},
00035            { "accountType", "budget" } });
00036 }
00037 
00038 
00039 void
00040 SlaveBudgetController::
00041 topupTransfer(const AccountKey & account,
00042               CurrencyPool amount,
00043               const OnBudgetResult & onResult)
00044 {
00045     push(budgetResultCallback(onResult),
00046          "PUT", "/v1/accounts/" + account.toString() + "/balance",
00047          { { "accountType", "budget"} },
00048          amount.toJson().toString());
00049 }
00050 
00051 void
00052 SlaveBudgetController::
00053 setBudget(const std::string & topLevelAccount,
00054           CurrencyPool amount,
00055           const OnBudgetResult & onResult)
00056 {
00057     push(budgetResultCallback(onResult),
00058          "PUT", "/v1/accounts/" + topLevelAccount + "/budget",
00059          { /* {"amount", amount.toString()}*/ },
00060          amount.toJson().toString());
00061 }
00062 
00063 void
00064 SlaveBudgetController::
00065 addBudget(const std::string & topLevelAccount,
00066           CurrencyPool amount,
00067           const OnBudgetResult & onResult)
00068 {
00069     throw ML::Exception("addBudget no good any more");
00070 }
00071 
00072 void
00073 SlaveBudgetController::
00074 getAccountList(const AccountKey & account,
00075                int depth,
00076                std::function<void (std::exception_ptr,
00077                                    std::vector<AccountKey> &&)>)
00078 {
00079     throw ML::Exception("getAccountList not needed anymore");
00080 }
00081 
00082 void
00083 SlaveBudgetController::
00084 getAccountSummary(const AccountKey & account,
00085                   int depth,
00086                   std::function<void (std::exception_ptr,
00087                                       AccountSummary &&)> onResult)
00088 {
00089     push([=] (std::exception_ptr ptr, int resultCode, string body)
00090          {
00091              AccountSummary summary;
00092              if (ptr)
00093                  onResult(ptr, std::move(summary));
00094              else if (resultCode < 200 || resultCode >= 300)
00095                  onResult(std::make_exception_ptr(ML::Exception("getAccountSummary returned code %d: %s", resultCode, body.c_str())),
00096                           std::move(summary));
00097              else {
00098                  try {
00099                      Json::Value result = Json::parse(body);
00100                      summary = AccountSummary::fromJson(result);
00101                      onResult(nullptr, std::move(summary));
00102                  } catch (...) {
00103                      onResult(std::current_exception(), std::move(summary));
00104                  }
00105              }
00106          },
00107          "GET", "/v1/accounts/" + account.toString() + "/summary",
00108          { {"depth", to_string(depth)} },
00109          "");
00110 }
00111 
00112 void
00113 SlaveBudgetController::
00114 getAccount(const AccountKey & accountKey,
00115            std::function<void (std::exception_ptr,
00116                                Account &&)> onResult)
00117 {
00118     push([=] (std::exception_ptr ptr, int resultCode, string body)
00119          {
00120              Account account;
00121              if (ptr)
00122                  onResult(ptr, std::move(account));
00123              else if (resultCode < 200 || resultCode >= 300)
00124                  onResult(std::make_exception_ptr(ML::Exception("getAccount returned code %d: %s", resultCode, body.c_str())),
00125                           std::move(account));
00126              else {
00127                  try {
00128                      Json::Value result = Json::parse(body);
00129                      account = Account::fromJson(result);
00130                      onResult(nullptr, std::move(account));
00131                  } catch (...) {
00132                      onResult(std::current_exception(), std::move(account));
00133                  }
00134              }
00135          },
00136          "GET", "/v1/accounts/" + accountKey.toString());
00137 }
00138 
00139 SlaveBudgetController::OnDone
00140 SlaveBudgetController::
00141 budgetResultCallback(const OnBudgetResult & onResult)
00142 {
00143     return [=] (std::exception_ptr ptr, int resultCode, string body)
00144         {
00145             //cerr << "got budget result callback with resultCode "
00146             //     << resultCode << " body " << body << endl;
00147             onResult(ptr);
00148         };
00149 }
00150 
00151 
00152 /*****************************************************************************/
00153 /* SLAVE BANKER                                                              */
00154 /*****************************************************************************/
00155 
00156 SlaveBanker::
00157 SlaveBanker(std::shared_ptr<zmq::context_t> context)
00158     : RestProxy(context), createdAccounts(128)
00159 {
00160 }
00161 
00162 SlaveBanker::
00163 SlaveBanker(std::shared_ptr<zmq::context_t> context,
00164             std::shared_ptr<ConfigurationService> config,
00165             const std::string & accountSuffix,
00166             const std::string & bankerServiceName)
00167     : RestProxy(context), createdAccounts(128)
00168 {
00169     init(config, accountSuffix, bankerServiceName);
00170 }
00171 
00172 void
00173 SlaveBanker::
00174 init(std::shared_ptr<ConfigurationService> config,
00175      const std::string & accountSuffix,
00176      const std::string & bankerServiceName)
00177 {
00178     if (accountSuffix.empty()) {
00179         throw ML::Exception("'accountSuffix' cannot be empty");
00180     }
00181     if (bankerServiceName.empty()) {
00182         throw ML::Exception("'bankerServiceName' cannot be empty");
00183     }
00184 
00185     // When our account manager creates an account, it will call this
00186     // function.  We can't do anything from it (because the lock could
00187     // be held), but we *can* push a message asynchronously to be
00188     // handled later...
00189     accounts.onNewAccount = [=] (const AccountKey & accountKey)
00190         {
00191             //cerr << "((((1)))) new account " << accountKey << endl;
00192             createdAccounts.push(accountKey);
00193         };
00194 
00195     // ... here.  Now we know that no lock is held and so we can
00196     // perform the work we need to synchronize the account with
00197     // the server.
00198     createdAccounts.onEvent = [=] (const AccountKey & accountKey)
00199         {
00200             //cerr << "((((2)))) new account " << accountKey << endl;
00201 
00202             auto onDone = [=] (std::exception_ptr exc,
00203                                ShadowAccount && account)
00204             {
00205 #if 0
00206                 cerr << "((((3)))) new account " << accountKey << endl;
00207 
00208                 cerr << "got back shadow account " << account
00209                 << " for " << accountKey << endl;
00210 
00211                 cerr << "current status is " << accounts.getAccount(accountKey)
00212                 << endl;
00213 #endif
00214             };
00215 
00216             addSpendAccount(accountKey, USD(0), onDone);
00217         };
00218 
00219     addSource("SlaveBanker::createdAccounts", createdAccounts);
00220 
00221     this->accountSuffix = accountSuffix;
00222     
00223     // Connect to the master banker
00224     RestProxy::initServiceClass(config, bankerServiceName, "zeromq");
00225     
00226     addPeriodic("SlaveBanker::reportSpend", 1.0,
00227                 std::bind(&SlaveBanker::reportSpend,
00228                           this,
00229                           std::placeholders::_1),
00230                 true /* single threaded */);
00231     addPeriodic("SlaveBanker::reauthorizeBudget", 1.0,
00232                 std::bind(&SlaveBanker::reauthorizeBudget,
00233                           this,
00234                           std::placeholders::_1),
00235                 true /* single threaded */);
00236 }
00237 
00238 ShadowAccount
00239 SlaveBanker::
00240 syncAccountSync(const AccountKey & account)
00241 {
00242     BankerSyncResult<ShadowAccount> result;
00243     syncAccount(account, result);
00244     return result.get();
00245 }
00246 
00247 void
00248 SlaveBanker::
00249 onSyncResult(const AccountKey & accountKey,
00250                std::function<void (std::exception_ptr,
00251                                    ShadowAccount &&)> onDone,
00252                std::exception_ptr exc,
00253                Account&& masterAccount)
00254 {
00255     ShadowAccount result;
00256 
00257     try {
00258         if (exc) {
00259             onDone(exc, std::move(result));
00260             return;
00261         }
00262 
00263         //cerr << "got result from master for " << accountKey
00264         //     << " which is "
00265         //     << masterAccount << endl;
00266         
00267         result = accounts.syncFromMaster(accountKey, masterAccount);
00268     } catch (...) {
00269         onDone(std::current_exception(), std::move(result));
00270     }
00271 
00272     try {
00273         onDone(nullptr, std::move(result));
00274     } catch (...) {
00275         cerr << "warning: onDone handler threw" << endl;
00276     }
00277 }
00278 
00279 void
00280 SlaveBanker::
00281 onInitializeResult(const AccountKey & accountKey,
00282                    std::function<void (std::exception_ptr,
00283                                        ShadowAccount &&)> onDone,
00284                    std::exception_ptr exc,
00285                    Account&& masterAccount)
00286 {
00287     ShadowAccount result;
00288 
00289     try {
00290         if (exc) {
00291             onDone(exc, std::move(result));
00292             return;
00293         }
00294 
00295         result = accounts.initializeAndMergeState(accountKey, masterAccount);
00296     } catch (...) {
00297         onDone(std::current_exception(), std::move(result));
00298     }
00299     
00300     try {
00301         onDone(nullptr, std::move(result));
00302     } catch (...) {
00303         cerr << "warning: onDone handler threw" << endl;
00304     }
00305 }
00306 
00307 void
00308 SlaveBanker::
00309 syncAccount(const AccountKey & accountKey,
00310             std::function<void (std::exception_ptr,
00311                                 ShadowAccount &&)> onDone)
00312 {
00313     auto onDone2
00314         = std::bind(&SlaveBanker::onSyncResult,
00315                     this,
00316                     accountKey,
00317                     onDone,
00318                     std::placeholders::_1,
00319                     std::placeholders::_2);
00320 
00321     //cerr << "syncing account " << accountKey << ": "
00322     //     << accounts.getAccount(accountKey) << endl;
00323 
00324     push(makeRestResponseJsonDecoder<Account>("syncAccount", onDone2),
00325          "PUT",
00326          "/v1/accounts/" + getShadowAccountStr(accountKey) + "/shadow",
00327          {},
00328          accounts.getAccount(accountKey).toJson().toString());
00329 }
00330 
00331 void
00332 SlaveBanker::
00333 syncAllSync()
00334 {
00335     BankerSyncResult<void> result;
00336     syncAll(result);
00337     result.get();
00338 }
00339 
00340 void
00341 SlaveBanker::
00342 syncAll(std::function<void (std::exception_ptr)> onDone)
00343 {
00344     auto allKeys = accounts.getAccountKeys();
00345 
00346     vector<AccountKey> filteredKeys;
00347     for (auto k: allKeys)
00348         if (accounts.isInitialized(k))
00349             filteredKeys.push_back(k);
00350 
00351     allKeys.swap(filteredKeys);
00352 
00353     if (allKeys.empty()) {
00354         if (onDone)
00355             onDone(nullptr);
00356         return;
00357     }
00358 
00359     struct Aggregator {
00360 
00361         Aggregator(int numTotal,
00362                    std::function<void (std::exception_ptr)> onDone)
00363             : itl(new Itl())
00364         {
00365             itl->numTotal = numTotal;
00366             itl->numFinished = 0;
00367             itl->exc = nullptr;
00368             itl->onDone = onDone;
00369         }
00370 
00371         struct Itl {
00372             int numTotal;
00373             int numFinished;
00374             std::exception_ptr exc;
00375             std::function<void (std::exception_ptr)> onDone;
00376         };
00377 
00378         std::shared_ptr<Itl> itl;
00379         
00380         void operator () (std::exception_ptr exc, ShadowAccount && account)
00381         {
00382             if (exc)
00383                 itl->exc = exc;
00384             int nowDone = __sync_add_and_fetch(&itl->numFinished, 1);
00385             if (nowDone == itl->numTotal) {
00386                 if (itl->onDone)
00387                     itl->onDone(itl->exc);
00388                 else {
00389                     if (itl->exc)
00390                         cerr << "warning: async callback aggregator ate "
00391                              << "exception" << endl;
00392                 }
00393             }
00394         }               
00395     };
00396     
00397     Aggregator aggregator(allKeys.size(), onDone);
00398 
00399     //cerr << "syncing " << allKeys.size() << " keys" << endl;
00400 
00401     for (auto & key: allKeys) {
00402         // We take its parent since syncAccount assumes nothing was added
00403         if (accounts.isInitialized(key))
00404             syncAccount(key, aggregator);
00405     }
00406 }
00407 
00408 void
00409 SlaveBanker::
00410 addSpendAccount(const AccountKey & accountKey,
00411                 CurrencyPool accountFloat,
00412                 std::function<void (std::exception_ptr, ShadowAccount&&)> onDone)
00413 {
00414     bool first = accounts.createAccountAtomic(accountKey);
00415     if(!first) {
00416         // already done
00417         if (onDone) {
00418             auto account = accounts.getAccount(accountKey);
00419             onDone(nullptr, std::move(account));
00420         }
00421     }
00422     else {
00423         // TODO: record float
00424         //accountFloats[accountKey] = accountFloat;
00425 
00426         // Now kick off the initial synchronization step
00427         auto onDone2
00428             = std::bind(&SlaveBanker::onInitializeResult,
00429                         this,
00430                         accountKey,
00431                         onDone,
00432                         std::placeholders::_1,
00433                         std::placeholders::_2);
00434 
00435         cerr << "********* calling addSpendAccount for " << accountKey
00436              << " for SlaveBanker " << accountSuffix << endl;
00437 
00438         push(makeRestResponseJsonDecoder<Account>("addSpendAccount", onDone2),
00439              "POST",
00440              "/v1/accounts",
00441              { { "accountName", getShadowAccountStr(accountKey) },
00442                { "accountType", "spend" } },
00443              "");
00444     }
00445 }
00446 
00447 void
00448 SlaveBanker::
00449 reportSpend(uint64_t numTimeoutsExpired)
00450 {
00451     if (numTimeoutsExpired > 1) {
00452         cerr << "warning: slave banker missed " << numTimeoutsExpired
00453              << " timeouts" << endl;
00454     }
00455 
00456     if (reportSpendSent != Date())
00457         cerr << "warning: report spend still in progress" << endl;
00458 
00459     //cerr << "started report spend" << endl;
00460 
00461     auto onDone = [=] (std::exception_ptr exc)
00462         {
00463             //cerr << "finished report spend" << endl;
00464             reportSpendSent = Date();
00465             if (exc)
00466                 cerr << "reportSpend got exception" << endl;
00467         };
00468     
00469     syncAll(onDone);
00470 }
00471 
00472 void
00473 SlaveBanker::
00474 reauthorizeBudget(uint64_t numTimeoutsExpired)
00475 {
00476     if (numTimeoutsExpired > 1) {
00477         cerr << "warning: slave banker missed " << numTimeoutsExpired
00478              << " timeouts" << endl;
00479     }
00480 
00481     //std::unique_lock<Lock> guard(lock);
00482     if (reauthorizeBudgetSent != Date()) {
00483         cerr << "warning: reauthorize budget still in progress" << endl;
00484     }
00485 
00486     int numDone = 0;
00487 
00488     // For each of our accounts, we report back what has been spent
00489     // and re-up to our desired float
00490     auto onAccount = [&] (const AccountKey & key,
00491                           const ShadowAccount & account)
00492         {
00493             RestRequest request;
00494             request.verb = "POST";
00495             request.resource
00496                 = "/v1/accounts/"
00497                 + getShadowAccountStr(key)
00498                 + "/balance";
00499             request.params = { { "accountType", "spend" } };
00500 
00501             Json::Value payload = CurrencyPool(USD(0.10)).toJson();
00502             request.payload = payload.toString();
00503             
00504             //cerr << "sending out request " << request << endl;
00505             ++numDone;
00506 
00507             // Finally, send it out
00508             push(request, std::bind(&SlaveBanker::onReauthorizeBudgetMessage,
00509                                     this,
00510                                     key,
00511                                     std::placeholders::_1,
00512                                     std::placeholders::_2,
00513                                     std::placeholders::_3));
00514         };
00515 
00516     accounts.forEachInitializedAccount(onAccount);
00517 
00518     if (numDone != 0)
00519         reauthorizeBudgetSent = Date::now();
00520 }
00521 
00522 void
00523 SlaveBanker::
00524 onReauthorizeBudgetMessage(const AccountKey & accountKey,
00525                            std::exception_ptr exc,
00526                            int responseCode,
00527                            const std::string & payload)
00528 {
00529     //cerr << "finished reauthorize budget" << endl;
00530 
00531     if (exc) {
00532         cerr << "reauthorize budget got exception" << payload << endl;
00533         cerr << "accountKey = " << accountKey << endl;
00534         abort();  // for now...
00535         return;
00536     }
00537 
00538     Account masterAccount = Account::fromJson(Json::parse(payload));
00539     accounts.syncFromMaster(accountKey, masterAccount);
00540     reauthorizeBudgetSent = Date();
00541 }
00542 
00543 } // namespace RTBKIT
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator