RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
core/banker/master_banker.cc
00001 /* master_banker.cc
00002    Jeremy Barnes, 8 November 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004    
00005    Master banker class.
00006 */
00007 
00008 #include <memory>
00009 #include <string>
00010 #include "soa/jsoncpp/value.h"
00011 #include <boost/algorithm/string.hpp>
00012 #include <jml/arch/futex.h>
00013 
00014 #include "master_banker.h"
00015 #include "soa/service/rest_request_binding.h"
00016 #include "soa/service/redis.h"
00017 
00018 
00019 using namespace std;
00020 using namespace ML;
00021 using namespace Redis;
00022 
00023 
00024 namespace RTBKIT {
00025 
00026 using Datacratic::jsonEncode;
00027 using Datacratic::jsonDecode;
00028 
00029 /*****************************************************************************/
00030 /* REDIS BANKER PERSISTENCE                                                  */
00031 /*****************************************************************************/
00032 
00033 struct RedisBankerPersistence::Itl {
00034     shared_ptr<Redis::AsyncConnection> redis;
00035 };
00036 
00037 RedisBankerPersistence::
00038 RedisBankerPersistence(const Redis::Address & redis)
00039 {
00040     itl = make_shared<Itl>();
00041     itl->redis = make_shared<Redis::AsyncConnection>(redis);
00042 }
00043 
00044 RedisBankerPersistence::
00045 RedisBankerPersistence(shared_ptr<Redis::AsyncConnection> redis)
00046 {
00047     itl = make_shared<Itl>();
00048     itl->redis = redis;
00049 }
00050 
00051 void
00052 RedisBankerPersistence::
00053 loadAll(const string & topLevelKey, OnLoadedCallback onLoaded)
00054 {
00055     shared_ptr<Accounts> newAccounts;
00056 
00057     Redis::Result result = itl->redis->exec(SMEMBERS("banker:accounts"), 5);
00058     if (!result.ok()) {
00059         onLoaded(newAccounts, BACKEND_ERROR, result.error());
00060         return;
00061     }
00062 
00063     const Reply & keysReply = result.reply();
00064     if (keysReply.type() != ARRAY) {
00065         onLoaded(newAccounts, DATA_INCONSISTENCY,
00066                  "SMEMBERS 'banker:accounts' must return an array");
00067         return;
00068     }
00069 
00070     newAccounts = make_shared<Accounts>();
00071     if (keysReply.length() == 0) {
00072         onLoaded(newAccounts, SUCCESS, "");
00073         return;
00074     }
00075 
00076     Command fetchCommand(MGET);
00077     vector<string> keys;
00078     for (int i = 0; i < keysReply.length(); i++) {
00079         string key(keysReply[i].asString());
00080         keys.push_back(key);
00081         fetchCommand.addArg("banker-" + key);
00082     }
00083 
00084     result = itl->redis->exec(fetchCommand, 5);
00085     if (!result.ok()) {
00086         onLoaded(newAccounts, BACKEND_ERROR, result.error());
00087         return;
00088     }
00089 
00090     const Reply & accountsReply = result.reply();
00091     ExcAssert(accountsReply.type() == ARRAY);
00092     for (int i = 0; i < accountsReply.length(); i++) {
00093         if (accountsReply[i].type() == NIL) {
00094             onLoaded(newAccounts, DATA_INCONSISTENCY,
00095                      "nil key '" + keys[i]
00096                      + "' referenced in 'banker:accounts'");
00097             return;
00098         }
00099         Json::Value storageValue = Json::parse(accountsReply[i]);
00100         newAccounts->restoreAccount(AccountKey(keys[i]), storageValue);
00101     }
00102 
00103     // newAccounts->checkBudgetConsistency();
00104 
00105     onLoaded(newAccounts, SUCCESS, "");
00106 }
00107 
00108 void
00109 RedisBankerPersistence::
00110 saveAll(const Accounts & toSave, OnSavedCallback onSaved)
00111 {
00112     /* TODO: we need to check the content of the "banker:accounts" set for
00113      * "extra" account keys */
00114 
00115     // Phase 1: we load all of the keys.  This way we can know what is
00116     // present and deal with keys that should be zeroed out.  We can also
00117     // detect if we have a synchronization error and bail out.
00118 
00119     vector<string> keys;
00120 
00121     Redis::Command fetchCommand(MGET);
00122 
00123     /* fetch all account keys and values from storage */
00124     auto onAccount = [&] (const AccountKey & key,
00125                           const Account & account)
00126         {
00127             string keyStr = key.toString();
00128             keys.push_back(keyStr);
00129             fetchCommand.addArg("banker-" + keyStr);
00130         };
00131     toSave.forEachAccount(onAccount);
00132 
00133     auto onPhase1Result = [=] (const Redis::Result & result)
00134         {
00135             if (!result.ok()) {
00136                 onSaved(BACKEND_ERROR, result.error());
00137                 return;
00138             }
00139 
00140             vector<Redis::Command> storeCommands;
00141             storeCommands.push_back(MULTI);
00142 
00143             const Reply & reply = result.reply();
00144             ExcAssert(reply.type() == ARRAY);
00145 
00146             Json::Value badAccounts(Json::arrayValue);
00147 
00148             /* All accounts known to the banker are fetched.
00149                We need to check them and restore them (if needed). */
00150             for (int i = 0; i < reply.length(); i++) {
00151                 const string & key = keys[i];
00152                 bool isParentAccount(key.find(":") == string::npos);
00153                 const Accounts::AccountInfo & bankerAccount
00154                     = toSave.getAccount(key);
00155                 if (toSave.isAccountOutOfSync(key)) {
00156                     cerr << "account '" << key
00157                          << "' is out of sync and will not be saved" << endl;
00158                     continue;
00159                 }
00160                 Json::Value bankerValue = bankerAccount.toJson();
00161                 bool saveAccount(false);
00162 
00163                 Result result = reply[i];
00164                 Reply accountReply = result.reply();
00165                 if (accountReply.type() == STRING) {
00166                     // We have here:
00167                     // a) an account that we want to write;
00168                     // b) the current in-database representation of that
00169                     //    account
00170 
00171                     // We need to do the following:
00172                     // 1.  Make sure that it's a valid update (eg, that no
00173                     //     always increasing numbers would go down and that
00174                     //     the data in the db is correct);
00175                     // 2.  Find what keys we need to modify to make it
00176                     //     correct
00177                     // 3.  Perform the modifications
00178 
00179                     Account storageAccount;
00180                     Json::Value storageValue = Json::parse(accountReply.asString());
00181                     storageAccount = storageAccount.fromJson(storageValue);
00182                     if (bankerAccount.isSameOrPastVersion(storageAccount)) {
00183                         /* FIXME: the need for updating an account should
00184                            probably be deduced differently than by comparing
00185                            JSON content.
00186                         */
00187                         saveAccount = (bankerValue != storageValue);
00188 
00189                         if (saveAccount && isParentAccount) {
00190                             /* set and update the "spent-tracking" output for top
00191                              * accounts, by integrating the past */
00192                             if (storageValue.isMember("spent-tracking")) {
00193                                 bankerValue["spent-tracking"]
00194                                     = storageValue["spent-tracking"];
00195                             }
00196                             else {
00197                                 bankerValue["spent-tracking"]
00198                                     = Json::Value(Json::objectValue);
00199                             }
00200                         }
00201                     }
00202                     else {
00203                         /* TODO: the list of inconsistent account should be
00204                            stored in the db */
00205                         badAccounts.append(Json::Value(key));
00206                     }
00207                 }
00208                 else {
00209                     /* The account does not exist yet in storage, thus we
00210                        create it. */
00211                     storeCommands.push_back(SADD("banker:accounts", key));
00212                     saveAccount = true;
00213                     if (isParentAccount) {
00214                         bankerValue["spent-tracking"]
00215                             = Json::Value(Json::objectValue);
00216                     }
00217                 }
00218 
00219                 if (saveAccount) {
00220                     if (isParentAccount) {
00221                         const AccountSummary & summary = toSave.getAccountSummary(key);
00222                         if (summary.spent != bankerAccount.initialSpent) {
00223                             // cerr << "adding tracking entry" << endl;
00224                             string sessionStartStr = toSave.sessionStart.printClassic();
00225                             bankerValue["spent-tracking"][sessionStartStr]
00226                                 = Json::Value(Json::objectValue);
00227                             Json::Value & tracking
00228                                 = bankerValue["spent-tracking"][sessionStartStr];
00229                             CurrencyPool delta(summary.spent
00230                                                - bankerAccount.initialSpent);
00231                             tracking["spent"] = delta.toJson();
00232                             string lastModifiedStr = Date::now().printClassic();
00233                             tracking["date"] = lastModifiedStr;
00234                         }
00235                     }
00236 
00237                     Redis::Command command = SET("banker-" + key,
00238                                                  boost::trim_copy(bankerValue.toString()));
00239                     storeCommands.push_back(command);
00240                 }
00241             }
00242 
00243             if (badAccounts.size() > 0) {
00244                 /* For now we do not save any account when at least one has
00245                    been detected as inconsistent. */
00246                 onSaved(DATA_INCONSISTENCY, boost::trim_copy(badAccounts.toString()));
00247             }
00248             else if (storeCommands.size() > 1) {
00249                  storeCommands.push_back(EXEC);
00250                  
00251                  auto onPhase2Result = [=] (const Redis::Results & results)
00252                  {
00253                      if (results.ok())
00254                          onSaved(SUCCESS, "");
00255                      else
00256                          onSaved(BACKEND_ERROR, results.error());
00257                  };
00258 
00259                  itl->redis->queueMulti(storeCommands, onPhase2Result, 5.0);
00260             }
00261             else {
00262                 onSaved(SUCCESS, "");
00263             }
00264         };
00265 
00266     if (keys.size() == 0) {
00267         /* no account to save */
00268         onSaved(SUCCESS, "");
00269         return;
00270     }
00271 
00272     itl->redis->queue(fetchCommand, onPhase1Result, 5.0);
00273 }
00274 
00275 /*****************************************************************************/
00276 /* MASTER BANKER                                                             */
00277 /*****************************************************************************/
00278 
00279 MasterBanker::
00280 MasterBanker(std::shared_ptr<ServiceProxies> proxies,
00281              const string & serviceName)
00282     : ServiceBase(serviceName, proxies),
00283       RestServiceEndpoint(proxies->zmqContext),
00284       saving(false),
00285       monitorProviderClient(proxies->zmqContext, *this)
00286 {
00287 }
00288 
00289 MasterBanker::
00290 ~MasterBanker()
00291 {
00292     saveState();
00293     while (saving) {
00294         cerr << "awaiting end of save operation..." << endl;
00295         ML::futex_wait(saving, true);
00296     }
00297     shutdown();
00298 }
00299 
00300 void
00301 MasterBanker::
00302 init(const shared_ptr<BankerPersistence> & storage)
00303 {
00304     this->storage_ = storage;
00305 
00306     loadStateSync();
00307 
00308     addPeriodic("MasterBanker::saveState", 1.0,
00309                 bind(&MasterBanker::saveState, this),
00310                 true /* single threaded */);
00311 
00312     registerServiceProvider(serviceName(), { "rtbBanker" });
00313 
00314     getServices()->config->removePath(serviceName());
00315     //registerService();
00316     RestServiceEndpoint::init(getServices()->config, serviceName());
00317 
00318     onHandleRequest = router.requestHandler();
00319 
00320     router.description = "API for the Datacratic Banker Service";
00321 
00322     router.addHelpRoute("/", "GET");
00323 
00324     RestRequestRouter::OnProcessRequest pingRoute
00325         = [] (const RestServiceEndpoint::ConnectionId & connection,
00326               const RestRequest & request,
00327               const RestRequestParsingContext & context) {
00328         connection.sendResponse(200, "pong");
00329         return RestRequestRouter::MR_YES;
00330     };
00331     router.addRoute("/ping", "GET", "availability request", pingRoute,
00332                     Json::Value());
00333 
00334     auto & versionNode = router.addSubRouter("/v1", "version 1 of API");
00335 
00336     addRouteSyncReturn(versionNode,
00337                        "/summary",
00338                        {"GET"},
00339                        "Return the simplified summaries of all existing"
00340                        " accounts",
00341                        "",
00342                        [] (const Json::Value & a) { return a; },
00343                        &Accounts::getAccountSummariesJson,
00344                        &accounts,
00345                        true,
00346                        RestParamDefault<int>("maxDepth", "maximum depth to traverse", 3));
00347 
00348 
00349     auto & accountsNode
00350         = versionNode.addSubRouter("/accounts",
00351                                    "Operations on accounts");
00352     
00353     addRouteSyncReturn(accountsNode,
00354                        "",
00355                        {"POST"},
00356                        "Add a new account to the banker",
00357                        "Representation of the added account",
00358                        [] (const Account & a) { return a.toJson(); },
00359                        &Accounts::createAccount,
00360                        &accounts,
00361                        RestParam<AccountKey>("accountName", "account name to create x:y:z"),
00362                        RestParam<AccountType>("accountType", "account type (spend or budget)"));
00363     
00364     addRouteSyncReturn(accountsNode,
00365                        "",
00366                        {"GET"},
00367                        "List accounts that are in the banker",
00368                        "List of account names matching the given prefix",
00369                        [] (const vector<AccountKey> & v) { return jsonEncode(v); },
00370                        &Accounts::getAccountKeys,
00371                        &accounts,
00372                        RestParamDefault<AccountKey>
00373                        ("accountPrefix",
00374                         "account name to look under (default empty which "
00375                         "means return all accounts)",
00376                         AccountKey()),
00377                        RestParamDefault<int>
00378                        ("maxDepth", "maximum depth to search (default unlimited)", -1));
00379     
00380     auto & account
00381         = accountsNode.addSubRouter(Rx("/([^/]*)", "/<accountName>"),
00382                                     "operations on an individual account");
00383     
00384     RequestParam<AccountKey> accountKeyParam(-2, "<account>", "account to operate on");
00385 
00386     addRouteSyncReturn(account,
00387                        "",
00388                        {"GET"},
00389                        "Return a representation of the given account",
00390                        "Representation of the named account",
00391                        [] (const Account & account) { return account.toJson(); },
00392                        &Accounts::getAccount,
00393                        &accounts,
00394                        accountKeyParam);
00395 
00396     addRouteSyncReturn(account,
00397                        "/subtree",
00398                        {"GET"},
00399                        "Return a representation of the given account and its "
00400                        "children",
00401                        "Representation of the given subtree",
00402                        [] (const Accounts & subtree) { return subtree.toJson(); },
00403                        &Accounts::getAccounts,
00404                        &accounts,
00405                        accountKeyParam,
00406                        RestParamDefault<int>("depth", "depth of children (default = 0)", 0));
00407 
00408     addRouteSyncReturn(account,
00409                        "/children",
00410                        {"GET"},
00411                        "Return a list of the children of a given account",
00412                        "Array of names of child accounts",
00413                        [] (const vector<AccountKey> & keys) { return jsonEncode(keys); },
00414                        &Accounts::getAccountKeys,
00415                        &accounts,
00416                        accountKeyParam,
00417                        RestParamDefault<int>("depth", "depth of children (default = 0)", 0));
00418 
00419     addRouteSyncReturn(account,
00420                        "/budget",
00421                        {"PUT", "POST"},
00422                        "Set a top level account's budget to match the given "
00423                        "amount.  ",
00424                        "Status of the account after the operation",
00425                        [] (const Account & a) { return a.toJson(); },
00426                        &Accounts::setBudget,
00427                        &accounts,
00428                        accountKeyParam,
00429                        JsonParam<CurrencyPool>("", "amount to set budget to"));
00430 
00431     addRouteSyncReturn(account,
00432                        "/balance",
00433                        {"PUT", "POST"},
00434                        "Transfer budget from the parent such that account's "
00435                        "balance amount matches the parameter",
00436                        "Account: Representation of the modified account",
00437                        [] (const Account & a) { return a.toJson(); },
00438                        &Accounts::setBalance,
00439                        &accounts,
00440                        accountKeyParam,
00441                        JsonParam<CurrencyPool>("", "amount to set balance to"),
00442                        RestParamDefault<AccountType>("accountType", "type of account for implicit creation (default no creation)", AT_NONE));
00443     
00444     addRouteSyncReturn(account,
00445                        "/summary",
00446                        "GET",
00447                        "Return the aggregated summary of the given account",
00448                        "AccountSummary: aggregation of the given account and its children",
00449                        [] (const AccountSummary & s) { return s.toJson(); },
00450                        &Accounts::getAccountSummary,
00451                        &accounts,
00452                        accountKeyParam,
00453                        RestParamDefault<int>("maxDepth", "maximum depth to traverse", 3));
00454 
00455     addRouteSyncReturn(account,
00456                        "/shadow",
00457                        {"PUT", "POST"},
00458                        "Update a spend account's spend and commitments",
00459                        "Account: Representation of the modified account",
00460                        [] (const Account & a) { return a.toJson(); },
00461                        &Accounts::syncFromShadow,
00462                        &accounts,
00463                        accountKeyParam,
00464                        JsonParam<ShadowAccount>("",
00465                                                 "Representation of the shadow account"));
00466 
00467     monitorProviderClient.init(getServices()->config);
00468 }
00469 
00470 void
00471 MasterBanker::
00472 start()
00473 {
00474     RestServiceEndpoint::start();
00475     monitorProviderClient.start();
00476 }
00477 
00478 pair<string, string>
00479 MasterBanker::
00480 bindTcp()
00481 {
00482     return RestServiceEndpoint::bindTcp(
00483             getServices()->ports->getRange("banker.zmq"),
00484             getServices()->ports->getRange("banker.http"));
00485 }
00486 
00487 void
00488 MasterBanker::
00489 shutdown()
00490 {
00491     unregisterServiceProvider(serviceName(), { "rtbBanker" });
00492     RestServiceEndpoint::shutdown();
00493     monitorProviderClient.shutdown();
00494 }
00495 
00496 Json::Value
00497 MasterBanker::
00498 createAccount(const AccountKey & key, AccountType type)
00499 {
00500     Account account = accounts.createAccount(key, type);
00501     return account.toJson();
00502 
00503     if (type == AT_BUDGET)
00504         return account.toJson();
00505     else {
00506         ShadowAccount shadow;
00507         shadow.syncFromMaster(account);
00508         return shadow.toJson();
00509     }
00510 }
00511 
00512 void
00513 MasterBanker::
00514 onStateSaved(BankerPersistence::PersistenceCallbackStatus status,
00515              const string & info)
00516 {
00517     if (status == BankerPersistence::SUCCESS) {
00518         //cerr << __FUNCTION__
00519         //     <<  ": banker state saved successfully to backend" << endl;
00520         lastSavedState = Date::now();
00521     }
00522     else if (status == BankerPersistence::DATA_INCONSISTENCY) {
00523         Json::Value accountKeys = Json::parse(info);
00524         ExcAssert(accountKeys.type() == Json::arrayValue);
00525         for (Json::Value jsonKey: accountKeys) {
00526             ExcAssert(jsonKey.type() == Json::stringValue);
00527             string keyStr = jsonKey.asString();
00528             accounts.markAccountOutOfSync(AccountKey(keyStr));
00529             cerr << __FUNCTION__
00530                  << ": account '" << keyStr << "' marked out of sync" << endl;
00531         }
00532     }
00533     else if (status == BankerPersistence::BACKEND_ERROR) {
00534         /* the backend is unavailable */
00535         cerr << __FUNCTION__ <<  ": " << info << endl;
00536     }
00537     else {
00538         throw ML::Exception("status code is not handled");
00539     }
00540 
00541     lastSaveStatus = status;
00542 
00543     saving = false;
00544     ML::futex_wake(saving);
00545 }
00546 
00547 void
00548 MasterBanker::
00549 saveState()
00550 {
00551     Guard guard(saveLock);
00552 
00553     if (!storage_ || saving)
00554         return;
00555 
00556     saving = true;
00557     storage_->saveAll(accounts, bind(&MasterBanker::onStateSaved, this,
00558                                           placeholders::_1,
00559                                           placeholders::_2));
00560 }
00561 
00562 void
00563 MasterBanker::
00564 onStateLoaded(shared_ptr<Accounts> newAccounts,
00565               BankerPersistence::PersistenceCallbackStatus status,
00566               const string & info)
00567 {
00568     if (status == BankerPersistence::SUCCESS) {
00569         newAccounts->ensureInterAccountConsistency();
00570         accounts = *newAccounts;
00571         cerr << __FUNCTION__ <<  ": successfully loaded accounts" << endl;
00572     }
00573     else if (status == BankerPersistence::DATA_INCONSISTENCY) {
00574         /* something is wrong with the backend data types */
00575         cerr << __FUNCTION__ <<  ": " << info << endl;
00576     }
00577     else if (status == BankerPersistence::BACKEND_ERROR) {
00578         /* the backend is unavailable */
00579         cerr << __FUNCTION__ <<  ": " << info << endl;
00580     }
00581     else {
00582         throw ML::Exception("status code is not handled");
00583     }
00584 }
00585 
00586 void
00587 MasterBanker::
00588 loadStateSync()
00589 {
00590     if (!storage_)
00591         return;
00592 
00593     int done = 0;
00594 
00595     auto onLoaded = [&](shared_ptr<Accounts> accounts,
00596                         BankerPersistence::PersistenceCallbackStatus status,
00597                         const string & info) {
00598         this->onStateLoaded(accounts, status, info);
00599         done = 1;
00600         ML::futex_wake(done);
00601     };
00602 
00603     storage_->loadAll("", onLoaded);
00604 
00605     while (!done) {
00606         ML::futex_wait(done, 0);
00607     }
00608 }
00609 
00610 void
00611 MasterBanker::
00612 bindFixedHttpAddress(const string & uri)
00613 {
00614 }
00615 
00617 string
00618 MasterBanker::
00619 getProviderName()
00620     const
00621 {
00622     return serviceName();
00623 }
00624 
00625 Json::Value
00626 MasterBanker::
00627 getProviderIndicators()
00628     const
00629 {
00630     Json::Value value;
00631 
00632     /* MB health check:
00633        - no error occurred in last save (implying Redis conn is alive) */
00634     Date now = Date::now();
00635     bool status(lastSaveStatus == BankerPersistence::SUCCESS);
00636     value["status"] = status ? "ok" : "failure";
00637 
00638     return value;
00639 }
00640 
00641 } // namespace RTBKIT
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator