RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* master_banker.cc 00002 Jeremy Barnes, 8 November 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 Master banker class. 00006 */ 00007 00008 #include <memory> 00009 #include <string> 00010 #include "soa/jsoncpp/value.h" 00011 #include <boost/algorithm/string.hpp> 00012 #include <jml/arch/futex.h> 00013 00014 #include "master_banker.h" 00015 #include "soa/service/rest_request_binding.h" 00016 #include "soa/service/redis.h" 00017 00018 00019 using namespace std; 00020 using namespace ML; 00021 using namespace Redis; 00022 00023 00024 namespace RTBKIT { 00025 00026 using Datacratic::jsonEncode; 00027 using Datacratic::jsonDecode; 00028 00029 /*****************************************************************************/ 00030 /* REDIS BANKER PERSISTENCE */ 00031 /*****************************************************************************/ 00032 00033 struct RedisBankerPersistence::Itl { 00034 shared_ptr<Redis::AsyncConnection> redis; 00035 }; 00036 00037 RedisBankerPersistence:: 00038 RedisBankerPersistence(const Redis::Address & redis) 00039 { 00040 itl = make_shared<Itl>(); 00041 itl->redis = make_shared<Redis::AsyncConnection>(redis); 00042 } 00043 00044 RedisBankerPersistence:: 00045 RedisBankerPersistence(shared_ptr<Redis::AsyncConnection> redis) 00046 { 00047 itl = make_shared<Itl>(); 00048 itl->redis = redis; 00049 } 00050 00051 void 00052 RedisBankerPersistence:: 00053 loadAll(const string & topLevelKey, OnLoadedCallback onLoaded) 00054 { 00055 shared_ptr<Accounts> newAccounts; 00056 00057 Redis::Result result = itl->redis->exec(SMEMBERS("banker:accounts"), 5); 00058 if (!result.ok()) { 00059 onLoaded(newAccounts, BACKEND_ERROR, result.error()); 00060 return; 00061 } 00062 00063 const Reply & keysReply = result.reply(); 00064 if (keysReply.type() != ARRAY) { 00065 onLoaded(newAccounts, DATA_INCONSISTENCY, 00066 "SMEMBERS 'banker:accounts' must return an array"); 00067 return; 00068 } 00069 00070 newAccounts = make_shared<Accounts>(); 00071 if (keysReply.length() == 0) { 00072 onLoaded(newAccounts, SUCCESS, ""); 00073 return; 00074 } 00075 00076 Command fetchCommand(MGET); 00077 vector<string> keys; 00078 for (int i = 0; i < keysReply.length(); i++) { 00079 string key(keysReply[i].asString()); 00080 keys.push_back(key); 00081 fetchCommand.addArg("banker-" + key); 00082 } 00083 00084 result = itl->redis->exec(fetchCommand, 5); 00085 if (!result.ok()) { 00086 onLoaded(newAccounts, BACKEND_ERROR, result.error()); 00087 return; 00088 } 00089 00090 const Reply & accountsReply = result.reply(); 00091 ExcAssert(accountsReply.type() == ARRAY); 00092 for (int i = 0; i < accountsReply.length(); i++) { 00093 if (accountsReply[i].type() == NIL) { 00094 onLoaded(newAccounts, DATA_INCONSISTENCY, 00095 "nil key '" + keys[i] 00096 + "' referenced in 'banker:accounts'"); 00097 return; 00098 } 00099 Json::Value storageValue = Json::parse(accountsReply[i]); 00100 newAccounts->restoreAccount(AccountKey(keys[i]), storageValue); 00101 } 00102 00103 // newAccounts->checkBudgetConsistency(); 00104 00105 onLoaded(newAccounts, SUCCESS, ""); 00106 } 00107 00108 void 00109 RedisBankerPersistence:: 00110 saveAll(const Accounts & toSave, OnSavedCallback onSaved) 00111 { 00112 /* TODO: we need to check the content of the "banker:accounts" set for 00113 * "extra" account keys */ 00114 00115 // Phase 1: we load all of the keys. This way we can know what is 00116 // present and deal with keys that should be zeroed out. We can also 00117 // detect if we have a synchronization error and bail out. 00118 00119 vector<string> keys; 00120 00121 Redis::Command fetchCommand(MGET); 00122 00123 /* fetch all account keys and values from storage */ 00124 auto onAccount = [&] (const AccountKey & key, 00125 const Account & account) 00126 { 00127 string keyStr = key.toString(); 00128 keys.push_back(keyStr); 00129 fetchCommand.addArg("banker-" + keyStr); 00130 }; 00131 toSave.forEachAccount(onAccount); 00132 00133 auto onPhase1Result = [=] (const Redis::Result & result) 00134 { 00135 if (!result.ok()) { 00136 onSaved(BACKEND_ERROR, result.error()); 00137 return; 00138 } 00139 00140 vector<Redis::Command> storeCommands; 00141 storeCommands.push_back(MULTI); 00142 00143 const Reply & reply = result.reply(); 00144 ExcAssert(reply.type() == ARRAY); 00145 00146 Json::Value badAccounts(Json::arrayValue); 00147 00148 /* All accounts known to the banker are fetched. 00149 We need to check them and restore them (if needed). */ 00150 for (int i = 0; i < reply.length(); i++) { 00151 const string & key = keys[i]; 00152 bool isParentAccount(key.find(":") == string::npos); 00153 const Accounts::AccountInfo & bankerAccount 00154 = toSave.getAccount(key); 00155 if (toSave.isAccountOutOfSync(key)) { 00156 cerr << "account '" << key 00157 << "' is out of sync and will not be saved" << endl; 00158 continue; 00159 } 00160 Json::Value bankerValue = bankerAccount.toJson(); 00161 bool saveAccount(false); 00162 00163 Result result = reply[i]; 00164 Reply accountReply = result.reply(); 00165 if (accountReply.type() == STRING) { 00166 // We have here: 00167 // a) an account that we want to write; 00168 // b) the current in-database representation of that 00169 // account 00170 00171 // We need to do the following: 00172 // 1. Make sure that it's a valid update (eg, that no 00173 // always increasing numbers would go down and that 00174 // the data in the db is correct); 00175 // 2. Find what keys we need to modify to make it 00176 // correct 00177 // 3. Perform the modifications 00178 00179 Account storageAccount; 00180 Json::Value storageValue = Json::parse(accountReply.asString()); 00181 storageAccount = storageAccount.fromJson(storageValue); 00182 if (bankerAccount.isSameOrPastVersion(storageAccount)) { 00183 /* FIXME: the need for updating an account should 00184 probably be deduced differently than by comparing 00185 JSON content. 00186 */ 00187 saveAccount = (bankerValue != storageValue); 00188 00189 if (saveAccount && isParentAccount) { 00190 /* set and update the "spent-tracking" output for top 00191 * accounts, by integrating the past */ 00192 if (storageValue.isMember("spent-tracking")) { 00193 bankerValue["spent-tracking"] 00194 = storageValue["spent-tracking"]; 00195 } 00196 else { 00197 bankerValue["spent-tracking"] 00198 = Json::Value(Json::objectValue); 00199 } 00200 } 00201 } 00202 else { 00203 /* TODO: the list of inconsistent account should be 00204 stored in the db */ 00205 badAccounts.append(Json::Value(key)); 00206 } 00207 } 00208 else { 00209 /* The account does not exist yet in storage, thus we 00210 create it. */ 00211 storeCommands.push_back(SADD("banker:accounts", key)); 00212 saveAccount = true; 00213 if (isParentAccount) { 00214 bankerValue["spent-tracking"] 00215 = Json::Value(Json::objectValue); 00216 } 00217 } 00218 00219 if (saveAccount) { 00220 if (isParentAccount) { 00221 const AccountSummary & summary = toSave.getAccountSummary(key); 00222 if (summary.spent != bankerAccount.initialSpent) { 00223 // cerr << "adding tracking entry" << endl; 00224 string sessionStartStr = toSave.sessionStart.printClassic(); 00225 bankerValue["spent-tracking"][sessionStartStr] 00226 = Json::Value(Json::objectValue); 00227 Json::Value & tracking 00228 = bankerValue["spent-tracking"][sessionStartStr]; 00229 CurrencyPool delta(summary.spent 00230 - bankerAccount.initialSpent); 00231 tracking["spent"] = delta.toJson(); 00232 string lastModifiedStr = Date::now().printClassic(); 00233 tracking["date"] = lastModifiedStr; 00234 } 00235 } 00236 00237 Redis::Command command = SET("banker-" + key, 00238 boost::trim_copy(bankerValue.toString())); 00239 storeCommands.push_back(command); 00240 } 00241 } 00242 00243 if (badAccounts.size() > 0) { 00244 /* For now we do not save any account when at least one has 00245 been detected as inconsistent. */ 00246 onSaved(DATA_INCONSISTENCY, boost::trim_copy(badAccounts.toString())); 00247 } 00248 else if (storeCommands.size() > 1) { 00249 storeCommands.push_back(EXEC); 00250 00251 auto onPhase2Result = [=] (const Redis::Results & results) 00252 { 00253 if (results.ok()) 00254 onSaved(SUCCESS, ""); 00255 else 00256 onSaved(BACKEND_ERROR, results.error()); 00257 }; 00258 00259 itl->redis->queueMulti(storeCommands, onPhase2Result, 5.0); 00260 } 00261 else { 00262 onSaved(SUCCESS, ""); 00263 } 00264 }; 00265 00266 if (keys.size() == 0) { 00267 /* no account to save */ 00268 onSaved(SUCCESS, ""); 00269 return; 00270 } 00271 00272 itl->redis->queue(fetchCommand, onPhase1Result, 5.0); 00273 } 00274 00275 /*****************************************************************************/ 00276 /* MASTER BANKER */ 00277 /*****************************************************************************/ 00278 00279 MasterBanker:: 00280 MasterBanker(std::shared_ptr<ServiceProxies> proxies, 00281 const string & serviceName) 00282 : ServiceBase(serviceName, proxies), 00283 RestServiceEndpoint(proxies->zmqContext), 00284 saving(false), 00285 monitorProviderClient(proxies->zmqContext, *this) 00286 { 00287 } 00288 00289 MasterBanker:: 00290 ~MasterBanker() 00291 { 00292 saveState(); 00293 while (saving) { 00294 cerr << "awaiting end of save operation..." << endl; 00295 ML::futex_wait(saving, true); 00296 } 00297 shutdown(); 00298 } 00299 00300 void 00301 MasterBanker:: 00302 init(const shared_ptr<BankerPersistence> & storage) 00303 { 00304 this->storage_ = storage; 00305 00306 loadStateSync(); 00307 00308 addPeriodic("MasterBanker::saveState", 1.0, 00309 bind(&MasterBanker::saveState, this), 00310 true /* single threaded */); 00311 00312 registerServiceProvider(serviceName(), { "rtbBanker" }); 00313 00314 getServices()->config->removePath(serviceName()); 00315 //registerService(); 00316 RestServiceEndpoint::init(getServices()->config, serviceName()); 00317 00318 onHandleRequest = router.requestHandler(); 00319 00320 router.description = "API for the Datacratic Banker Service"; 00321 00322 router.addHelpRoute("/", "GET"); 00323 00324 RestRequestRouter::OnProcessRequest pingRoute 00325 = [] (const RestServiceEndpoint::ConnectionId & connection, 00326 const RestRequest & request, 00327 const RestRequestParsingContext & context) { 00328 connection.sendResponse(200, "pong"); 00329 return RestRequestRouter::MR_YES; 00330 }; 00331 router.addRoute("/ping", "GET", "availability request", pingRoute, 00332 Json::Value()); 00333 00334 auto & versionNode = router.addSubRouter("/v1", "version 1 of API"); 00335 00336 addRouteSyncReturn(versionNode, 00337 "/summary", 00338 {"GET"}, 00339 "Return the simplified summaries of all existing" 00340 " accounts", 00341 "", 00342 [] (const Json::Value & a) { return a; }, 00343 &Accounts::getAccountSummariesJson, 00344 &accounts, 00345 true, 00346 RestParamDefault<int>("maxDepth", "maximum depth to traverse", 3)); 00347 00348 00349 auto & accountsNode 00350 = versionNode.addSubRouter("/accounts", 00351 "Operations on accounts"); 00352 00353 addRouteSyncReturn(accountsNode, 00354 "", 00355 {"POST"}, 00356 "Add a new account to the banker", 00357 "Representation of the added account", 00358 [] (const Account & a) { return a.toJson(); }, 00359 &Accounts::createAccount, 00360 &accounts, 00361 RestParam<AccountKey>("accountName", "account name to create x:y:z"), 00362 RestParam<AccountType>("accountType", "account type (spend or budget)")); 00363 00364 addRouteSyncReturn(accountsNode, 00365 "", 00366 {"GET"}, 00367 "List accounts that are in the banker", 00368 "List of account names matching the given prefix", 00369 [] (const vector<AccountKey> & v) { return jsonEncode(v); }, 00370 &Accounts::getAccountKeys, 00371 &accounts, 00372 RestParamDefault<AccountKey> 00373 ("accountPrefix", 00374 "account name to look under (default empty which " 00375 "means return all accounts)", 00376 AccountKey()), 00377 RestParamDefault<int> 00378 ("maxDepth", "maximum depth to search (default unlimited)", -1)); 00379 00380 auto & account 00381 = accountsNode.addSubRouter(Rx("/([^/]*)", "/<accountName>"), 00382 "operations on an individual account"); 00383 00384 RequestParam<AccountKey> accountKeyParam(-2, "<account>", "account to operate on"); 00385 00386 addRouteSyncReturn(account, 00387 "", 00388 {"GET"}, 00389 "Return a representation of the given account", 00390 "Representation of the named account", 00391 [] (const Account & account) { return account.toJson(); }, 00392 &Accounts::getAccount, 00393 &accounts, 00394 accountKeyParam); 00395 00396 addRouteSyncReturn(account, 00397 "/subtree", 00398 {"GET"}, 00399 "Return a representation of the given account and its " 00400 "children", 00401 "Representation of the given subtree", 00402 [] (const Accounts & subtree) { return subtree.toJson(); }, 00403 &Accounts::getAccounts, 00404 &accounts, 00405 accountKeyParam, 00406 RestParamDefault<int>("depth", "depth of children (default = 0)", 0)); 00407 00408 addRouteSyncReturn(account, 00409 "/children", 00410 {"GET"}, 00411 "Return a list of the children of a given account", 00412 "Array of names of child accounts", 00413 [] (const vector<AccountKey> & keys) { return jsonEncode(keys); }, 00414 &Accounts::getAccountKeys, 00415 &accounts, 00416 accountKeyParam, 00417 RestParamDefault<int>("depth", "depth of children (default = 0)", 0)); 00418 00419 addRouteSyncReturn(account, 00420 "/budget", 00421 {"PUT", "POST"}, 00422 "Set a top level account's budget to match the given " 00423 "amount. ", 00424 "Status of the account after the operation", 00425 [] (const Account & a) { return a.toJson(); }, 00426 &Accounts::setBudget, 00427 &accounts, 00428 accountKeyParam, 00429 JsonParam<CurrencyPool>("", "amount to set budget to")); 00430 00431 addRouteSyncReturn(account, 00432 "/balance", 00433 {"PUT", "POST"}, 00434 "Transfer budget from the parent such that account's " 00435 "balance amount matches the parameter", 00436 "Account: Representation of the modified account", 00437 [] (const Account & a) { return a.toJson(); }, 00438 &Accounts::setBalance, 00439 &accounts, 00440 accountKeyParam, 00441 JsonParam<CurrencyPool>("", "amount to set balance to"), 00442 RestParamDefault<AccountType>("accountType", "type of account for implicit creation (default no creation)", AT_NONE)); 00443 00444 addRouteSyncReturn(account, 00445 "/summary", 00446 "GET", 00447 "Return the aggregated summary of the given account", 00448 "AccountSummary: aggregation of the given account and its children", 00449 [] (const AccountSummary & s) { return s.toJson(); }, 00450 &Accounts::getAccountSummary, 00451 &accounts, 00452 accountKeyParam, 00453 RestParamDefault<int>("maxDepth", "maximum depth to traverse", 3)); 00454 00455 addRouteSyncReturn(account, 00456 "/shadow", 00457 {"PUT", "POST"}, 00458 "Update a spend account's spend and commitments", 00459 "Account: Representation of the modified account", 00460 [] (const Account & a) { return a.toJson(); }, 00461 &Accounts::syncFromShadow, 00462 &accounts, 00463 accountKeyParam, 00464 JsonParam<ShadowAccount>("", 00465 "Representation of the shadow account")); 00466 00467 monitorProviderClient.init(getServices()->config); 00468 } 00469 00470 void 00471 MasterBanker:: 00472 start() 00473 { 00474 RestServiceEndpoint::start(); 00475 monitorProviderClient.start(); 00476 } 00477 00478 pair<string, string> 00479 MasterBanker:: 00480 bindTcp() 00481 { 00482 return RestServiceEndpoint::bindTcp( 00483 getServices()->ports->getRange("banker.zmq"), 00484 getServices()->ports->getRange("banker.http")); 00485 } 00486 00487 void 00488 MasterBanker:: 00489 shutdown() 00490 { 00491 unregisterServiceProvider(serviceName(), { "rtbBanker" }); 00492 RestServiceEndpoint::shutdown(); 00493 monitorProviderClient.shutdown(); 00494 } 00495 00496 Json::Value 00497 MasterBanker:: 00498 createAccount(const AccountKey & key, AccountType type) 00499 { 00500 Account account = accounts.createAccount(key, type); 00501 return account.toJson(); 00502 00503 if (type == AT_BUDGET) 00504 return account.toJson(); 00505 else { 00506 ShadowAccount shadow; 00507 shadow.syncFromMaster(account); 00508 return shadow.toJson(); 00509 } 00510 } 00511 00512 void 00513 MasterBanker:: 00514 onStateSaved(BankerPersistence::PersistenceCallbackStatus status, 00515 const string & info) 00516 { 00517 if (status == BankerPersistence::SUCCESS) { 00518 //cerr << __FUNCTION__ 00519 // << ": banker state saved successfully to backend" << endl; 00520 lastSavedState = Date::now(); 00521 } 00522 else if (status == BankerPersistence::DATA_INCONSISTENCY) { 00523 Json::Value accountKeys = Json::parse(info); 00524 ExcAssert(accountKeys.type() == Json::arrayValue); 00525 for (Json::Value jsonKey: accountKeys) { 00526 ExcAssert(jsonKey.type() == Json::stringValue); 00527 string keyStr = jsonKey.asString(); 00528 accounts.markAccountOutOfSync(AccountKey(keyStr)); 00529 cerr << __FUNCTION__ 00530 << ": account '" << keyStr << "' marked out of sync" << endl; 00531 } 00532 } 00533 else if (status == BankerPersistence::BACKEND_ERROR) { 00534 /* the backend is unavailable */ 00535 cerr << __FUNCTION__ << ": " << info << endl; 00536 } 00537 else { 00538 throw ML::Exception("status code is not handled"); 00539 } 00540 00541 lastSaveStatus = status; 00542 00543 saving = false; 00544 ML::futex_wake(saving); 00545 } 00546 00547 void 00548 MasterBanker:: 00549 saveState() 00550 { 00551 Guard guard(saveLock); 00552 00553 if (!storage_ || saving) 00554 return; 00555 00556 saving = true; 00557 storage_->saveAll(accounts, bind(&MasterBanker::onStateSaved, this, 00558 placeholders::_1, 00559 placeholders::_2)); 00560 } 00561 00562 void 00563 MasterBanker:: 00564 onStateLoaded(shared_ptr<Accounts> newAccounts, 00565 BankerPersistence::PersistenceCallbackStatus status, 00566 const string & info) 00567 { 00568 if (status == BankerPersistence::SUCCESS) { 00569 newAccounts->ensureInterAccountConsistency(); 00570 accounts = *newAccounts; 00571 cerr << __FUNCTION__ << ": successfully loaded accounts" << endl; 00572 } 00573 else if (status == BankerPersistence::DATA_INCONSISTENCY) { 00574 /* something is wrong with the backend data types */ 00575 cerr << __FUNCTION__ << ": " << info << endl; 00576 } 00577 else if (status == BankerPersistence::BACKEND_ERROR) { 00578 /* the backend is unavailable */ 00579 cerr << __FUNCTION__ << ": " << info << endl; 00580 } 00581 else { 00582 throw ML::Exception("status code is not handled"); 00583 } 00584 } 00585 00586 void 00587 MasterBanker:: 00588 loadStateSync() 00589 { 00590 if (!storage_) 00591 return; 00592 00593 int done = 0; 00594 00595 auto onLoaded = [&](shared_ptr<Accounts> accounts, 00596 BankerPersistence::PersistenceCallbackStatus status, 00597 const string & info) { 00598 this->onStateLoaded(accounts, status, info); 00599 done = 1; 00600 ML::futex_wake(done); 00601 }; 00602 00603 storage_->loadAll("", onLoaded); 00604 00605 while (!done) { 00606 ML::futex_wait(done, 0); 00607 } 00608 } 00609 00610 void 00611 MasterBanker:: 00612 bindFixedHttpAddress(const string & uri) 00613 { 00614 } 00615 00617 string 00618 MasterBanker:: 00619 getProviderName() 00620 const 00621 { 00622 return serviceName(); 00623 } 00624 00625 Json::Value 00626 MasterBanker:: 00627 getProviderIndicators() 00628 const 00629 { 00630 Json::Value value; 00631 00632 /* MB health check: 00633 - no error occurred in last save (implying Redis conn is alive) */ 00634 Date now = Date::now(); 00635 bool status(lastSaveStatus == BankerPersistence::SUCCESS); 00636 value["status"] = status ? "ok" : "failure"; 00637 00638 return value; 00639 } 00640 00641 } // namespace RTBKIT