RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* slave_banker.cc 00002 Jeremy Barnes, 8 November 2012 00003 Copyright (c) 2012 Datacratic Inc. All rights reserved. 00004 00005 Slave banker implementation. 00006 */ 00007 00008 #include "slave_banker.h" 00009 #include "soa/service/http_header.h" 00010 #include "jml/utils/vector_utils.h" 00011 00012 using namespace std; 00013 00014 00015 namespace RTBKIT { 00016 00017 00018 /*****************************************************************************/ 00019 /* SLAVE BUDGET CONTROLLER */ 00020 /*****************************************************************************/ 00021 00022 SlaveBudgetController:: 00023 SlaveBudgetController() 00024 { 00025 } 00026 00027 void 00028 SlaveBudgetController:: 00029 addAccount(const AccountKey & account, 00030 const OnBudgetResult & onResult) 00031 { 00032 push(budgetResultCallback(onResult), 00033 "POST", "/v1/accounts", 00034 { {"accountName", account.toString()}, 00035 { "accountType", "budget" } }); 00036 } 00037 00038 00039 void 00040 SlaveBudgetController:: 00041 topupTransfer(const AccountKey & account, 00042 CurrencyPool amount, 00043 const OnBudgetResult & onResult) 00044 { 00045 push(budgetResultCallback(onResult), 00046 "PUT", "/v1/accounts/" + account.toString() + "/balance", 00047 { { "accountType", "budget"} }, 00048 amount.toJson().toString()); 00049 } 00050 00051 void 00052 SlaveBudgetController:: 00053 setBudget(const std::string & topLevelAccount, 00054 CurrencyPool amount, 00055 const OnBudgetResult & onResult) 00056 { 00057 push(budgetResultCallback(onResult), 00058 "PUT", "/v1/accounts/" + topLevelAccount + "/budget", 00059 { /* {"amount", amount.toString()}*/ }, 00060 amount.toJson().toString()); 00061 } 00062 00063 void 00064 SlaveBudgetController:: 00065 addBudget(const std::string & topLevelAccount, 00066 CurrencyPool amount, 00067 const OnBudgetResult & onResult) 00068 { 00069 throw ML::Exception("addBudget no good any more"); 00070 } 00071 00072 void 00073 SlaveBudgetController:: 00074 getAccountList(const AccountKey & account, 00075 int depth, 00076 std::function<void (std::exception_ptr, 00077 std::vector<AccountKey> &&)>) 00078 { 00079 throw ML::Exception("getAccountList not needed anymore"); 00080 } 00081 00082 void 00083 SlaveBudgetController:: 00084 getAccountSummary(const AccountKey & account, 00085 int depth, 00086 std::function<void (std::exception_ptr, 00087 AccountSummary &&)> onResult) 00088 { 00089 push([=] (std::exception_ptr ptr, int resultCode, string body) 00090 { 00091 AccountSummary summary; 00092 if (ptr) 00093 onResult(ptr, std::move(summary)); 00094 else if (resultCode < 200 || resultCode >= 300) 00095 onResult(std::make_exception_ptr(ML::Exception("getAccountSummary returned code %d: %s", resultCode, body.c_str())), 00096 std::move(summary)); 00097 else { 00098 try { 00099 Json::Value result = Json::parse(body); 00100 summary = AccountSummary::fromJson(result); 00101 onResult(nullptr, std::move(summary)); 00102 } catch (...) { 00103 onResult(std::current_exception(), std::move(summary)); 00104 } 00105 } 00106 }, 00107 "GET", "/v1/accounts/" + account.toString() + "/summary", 00108 { {"depth", to_string(depth)} }, 00109 ""); 00110 } 00111 00112 void 00113 SlaveBudgetController:: 00114 getAccount(const AccountKey & accountKey, 00115 std::function<void (std::exception_ptr, 00116 Account &&)> onResult) 00117 { 00118 push([=] (std::exception_ptr ptr, int resultCode, string body) 00119 { 00120 Account account; 00121 if (ptr) 00122 onResult(ptr, std::move(account)); 00123 else if (resultCode < 200 || resultCode >= 300) 00124 onResult(std::make_exception_ptr(ML::Exception("getAccount returned code %d: %s", resultCode, body.c_str())), 00125 std::move(account)); 00126 else { 00127 try { 00128 Json::Value result = Json::parse(body); 00129 account = Account::fromJson(result); 00130 onResult(nullptr, std::move(account)); 00131 } catch (...) { 00132 onResult(std::current_exception(), std::move(account)); 00133 } 00134 } 00135 }, 00136 "GET", "/v1/accounts/" + accountKey.toString()); 00137 } 00138 00139 SlaveBudgetController::OnDone 00140 SlaveBudgetController:: 00141 budgetResultCallback(const OnBudgetResult & onResult) 00142 { 00143 return [=] (std::exception_ptr ptr, int resultCode, string body) 00144 { 00145 //cerr << "got budget result callback with resultCode " 00146 // << resultCode << " body " << body << endl; 00147 onResult(ptr); 00148 }; 00149 } 00150 00151 00152 /*****************************************************************************/ 00153 /* SLAVE BANKER */ 00154 /*****************************************************************************/ 00155 00156 SlaveBanker:: 00157 SlaveBanker(std::shared_ptr<zmq::context_t> context) 00158 : RestProxy(context), createdAccounts(128) 00159 { 00160 } 00161 00162 SlaveBanker:: 00163 SlaveBanker(std::shared_ptr<zmq::context_t> context, 00164 std::shared_ptr<ConfigurationService> config, 00165 const std::string & accountSuffix, 00166 const std::string & bankerServiceName) 00167 : RestProxy(context), createdAccounts(128) 00168 { 00169 init(config, accountSuffix, bankerServiceName); 00170 } 00171 00172 void 00173 SlaveBanker:: 00174 init(std::shared_ptr<ConfigurationService> config, 00175 const std::string & accountSuffix, 00176 const std::string & bankerServiceName) 00177 { 00178 if (accountSuffix.empty()) { 00179 throw ML::Exception("'accountSuffix' cannot be empty"); 00180 } 00181 if (bankerServiceName.empty()) { 00182 throw ML::Exception("'bankerServiceName' cannot be empty"); 00183 } 00184 00185 // When our account manager creates an account, it will call this 00186 // function. We can't do anything from it (because the lock could 00187 // be held), but we *can* push a message asynchronously to be 00188 // handled later... 00189 accounts.onNewAccount = [=] (const AccountKey & accountKey) 00190 { 00191 //cerr << "((((1)))) new account " << accountKey << endl; 00192 createdAccounts.push(accountKey); 00193 }; 00194 00195 // ... here. Now we know that no lock is held and so we can 00196 // perform the work we need to synchronize the account with 00197 // the server. 00198 createdAccounts.onEvent = [=] (const AccountKey & accountKey) 00199 { 00200 //cerr << "((((2)))) new account " << accountKey << endl; 00201 00202 auto onDone = [=] (std::exception_ptr exc, 00203 ShadowAccount && account) 00204 { 00205 #if 0 00206 cerr << "((((3)))) new account " << accountKey << endl; 00207 00208 cerr << "got back shadow account " << account 00209 << " for " << accountKey << endl; 00210 00211 cerr << "current status is " << accounts.getAccount(accountKey) 00212 << endl; 00213 #endif 00214 }; 00215 00216 addSpendAccount(accountKey, USD(0), onDone); 00217 }; 00218 00219 addSource("SlaveBanker::createdAccounts", createdAccounts); 00220 00221 this->accountSuffix = accountSuffix; 00222 00223 // Connect to the master banker 00224 RestProxy::initServiceClass(config, bankerServiceName, "zeromq"); 00225 00226 addPeriodic("SlaveBanker::reportSpend", 1.0, 00227 std::bind(&SlaveBanker::reportSpend, 00228 this, 00229 std::placeholders::_1), 00230 true /* single threaded */); 00231 addPeriodic("SlaveBanker::reauthorizeBudget", 1.0, 00232 std::bind(&SlaveBanker::reauthorizeBudget, 00233 this, 00234 std::placeholders::_1), 00235 true /* single threaded */); 00236 } 00237 00238 ShadowAccount 00239 SlaveBanker:: 00240 syncAccountSync(const AccountKey & account) 00241 { 00242 BankerSyncResult<ShadowAccount> result; 00243 syncAccount(account, result); 00244 return result.get(); 00245 } 00246 00247 void 00248 SlaveBanker:: 00249 onSyncResult(const AccountKey & accountKey, 00250 std::function<void (std::exception_ptr, 00251 ShadowAccount &&)> onDone, 00252 std::exception_ptr exc, 00253 Account&& masterAccount) 00254 { 00255 ShadowAccount result; 00256 00257 try { 00258 if (exc) { 00259 onDone(exc, std::move(result)); 00260 return; 00261 } 00262 00263 //cerr << "got result from master for " << accountKey 00264 // << " which is " 00265 // << masterAccount << endl; 00266 00267 result = accounts.syncFromMaster(accountKey, masterAccount); 00268 } catch (...) { 00269 onDone(std::current_exception(), std::move(result)); 00270 } 00271 00272 try { 00273 onDone(nullptr, std::move(result)); 00274 } catch (...) { 00275 cerr << "warning: onDone handler threw" << endl; 00276 } 00277 } 00278 00279 void 00280 SlaveBanker:: 00281 onInitializeResult(const AccountKey & accountKey, 00282 std::function<void (std::exception_ptr, 00283 ShadowAccount &&)> onDone, 00284 std::exception_ptr exc, 00285 Account&& masterAccount) 00286 { 00287 ShadowAccount result; 00288 00289 try { 00290 if (exc) { 00291 onDone(exc, std::move(result)); 00292 return; 00293 } 00294 00295 result = accounts.initializeAndMergeState(accountKey, masterAccount); 00296 } catch (...) { 00297 onDone(std::current_exception(), std::move(result)); 00298 } 00299 00300 try { 00301 onDone(nullptr, std::move(result)); 00302 } catch (...) { 00303 cerr << "warning: onDone handler threw" << endl; 00304 } 00305 } 00306 00307 void 00308 SlaveBanker:: 00309 syncAccount(const AccountKey & accountKey, 00310 std::function<void (std::exception_ptr, 00311 ShadowAccount &&)> onDone) 00312 { 00313 auto onDone2 00314 = std::bind(&SlaveBanker::onSyncResult, 00315 this, 00316 accountKey, 00317 onDone, 00318 std::placeholders::_1, 00319 std::placeholders::_2); 00320 00321 //cerr << "syncing account " << accountKey << ": " 00322 // << accounts.getAccount(accountKey) << endl; 00323 00324 push(makeRestResponseJsonDecoder<Account>("syncAccount", onDone2), 00325 "PUT", 00326 "/v1/accounts/" + getShadowAccountStr(accountKey) + "/shadow", 00327 {}, 00328 accounts.getAccount(accountKey).toJson().toString()); 00329 } 00330 00331 void 00332 SlaveBanker:: 00333 syncAllSync() 00334 { 00335 BankerSyncResult<void> result; 00336 syncAll(result); 00337 result.get(); 00338 } 00339 00340 void 00341 SlaveBanker:: 00342 syncAll(std::function<void (std::exception_ptr)> onDone) 00343 { 00344 auto allKeys = accounts.getAccountKeys(); 00345 00346 vector<AccountKey> filteredKeys; 00347 for (auto k: allKeys) 00348 if (accounts.isInitialized(k)) 00349 filteredKeys.push_back(k); 00350 00351 allKeys.swap(filteredKeys); 00352 00353 if (allKeys.empty()) { 00354 if (onDone) 00355 onDone(nullptr); 00356 return; 00357 } 00358 00359 struct Aggregator { 00360 00361 Aggregator(int numTotal, 00362 std::function<void (std::exception_ptr)> onDone) 00363 : itl(new Itl()) 00364 { 00365 itl->numTotal = numTotal; 00366 itl->numFinished = 0; 00367 itl->exc = nullptr; 00368 itl->onDone = onDone; 00369 } 00370 00371 struct Itl { 00372 int numTotal; 00373 int numFinished; 00374 std::exception_ptr exc; 00375 std::function<void (std::exception_ptr)> onDone; 00376 }; 00377 00378 std::shared_ptr<Itl> itl; 00379 00380 void operator () (std::exception_ptr exc, ShadowAccount && account) 00381 { 00382 if (exc) 00383 itl->exc = exc; 00384 int nowDone = __sync_add_and_fetch(&itl->numFinished, 1); 00385 if (nowDone == itl->numTotal) { 00386 if (itl->onDone) 00387 itl->onDone(itl->exc); 00388 else { 00389 if (itl->exc) 00390 cerr << "warning: async callback aggregator ate " 00391 << "exception" << endl; 00392 } 00393 } 00394 } 00395 }; 00396 00397 Aggregator aggregator(allKeys.size(), onDone); 00398 00399 //cerr << "syncing " << allKeys.size() << " keys" << endl; 00400 00401 for (auto & key: allKeys) { 00402 // We take its parent since syncAccount assumes nothing was added 00403 if (accounts.isInitialized(key)) 00404 syncAccount(key, aggregator); 00405 } 00406 } 00407 00408 void 00409 SlaveBanker:: 00410 addSpendAccount(const AccountKey & accountKey, 00411 CurrencyPool accountFloat, 00412 std::function<void (std::exception_ptr, ShadowAccount&&)> onDone) 00413 { 00414 bool first = accounts.createAccountAtomic(accountKey); 00415 if(!first) { 00416 // already done 00417 if (onDone) { 00418 auto account = accounts.getAccount(accountKey); 00419 onDone(nullptr, std::move(account)); 00420 } 00421 } 00422 else { 00423 // TODO: record float 00424 //accountFloats[accountKey] = accountFloat; 00425 00426 // Now kick off the initial synchronization step 00427 auto onDone2 00428 = std::bind(&SlaveBanker::onInitializeResult, 00429 this, 00430 accountKey, 00431 onDone, 00432 std::placeholders::_1, 00433 std::placeholders::_2); 00434 00435 cerr << "********* calling addSpendAccount for " << accountKey 00436 << " for SlaveBanker " << accountSuffix << endl; 00437 00438 push(makeRestResponseJsonDecoder<Account>("addSpendAccount", onDone2), 00439 "POST", 00440 "/v1/accounts", 00441 { { "accountName", getShadowAccountStr(accountKey) }, 00442 { "accountType", "spend" } }, 00443 ""); 00444 } 00445 } 00446 00447 void 00448 SlaveBanker:: 00449 reportSpend(uint64_t numTimeoutsExpired) 00450 { 00451 if (numTimeoutsExpired > 1) { 00452 cerr << "warning: slave banker missed " << numTimeoutsExpired 00453 << " timeouts" << endl; 00454 } 00455 00456 if (reportSpendSent != Date()) 00457 cerr << "warning: report spend still in progress" << endl; 00458 00459 //cerr << "started report spend" << endl; 00460 00461 auto onDone = [=] (std::exception_ptr exc) 00462 { 00463 //cerr << "finished report spend" << endl; 00464 reportSpendSent = Date(); 00465 if (exc) 00466 cerr << "reportSpend got exception" << endl; 00467 }; 00468 00469 syncAll(onDone); 00470 } 00471 00472 void 00473 SlaveBanker:: 00474 reauthorizeBudget(uint64_t numTimeoutsExpired) 00475 { 00476 if (numTimeoutsExpired > 1) { 00477 cerr << "warning: slave banker missed " << numTimeoutsExpired 00478 << " timeouts" << endl; 00479 } 00480 00481 //std::unique_lock<Lock> guard(lock); 00482 if (reauthorizeBudgetSent != Date()) { 00483 cerr << "warning: reauthorize budget still in progress" << endl; 00484 } 00485 00486 int numDone = 0; 00487 00488 // For each of our accounts, we report back what has been spent 00489 // and re-up to our desired float 00490 auto onAccount = [&] (const AccountKey & key, 00491 const ShadowAccount & account) 00492 { 00493 RestRequest request; 00494 request.verb = "POST"; 00495 request.resource 00496 = "/v1/accounts/" 00497 + getShadowAccountStr(key) 00498 + "/balance"; 00499 request.params = { { "accountType", "spend" } }; 00500 00501 Json::Value payload = CurrencyPool(USD(0.10)).toJson(); 00502 request.payload = payload.toString(); 00503 00504 //cerr << "sending out request " << request << endl; 00505 ++numDone; 00506 00507 // Finally, send it out 00508 push(request, std::bind(&SlaveBanker::onReauthorizeBudgetMessage, 00509 this, 00510 key, 00511 std::placeholders::_1, 00512 std::placeholders::_2, 00513 std::placeholders::_3)); 00514 }; 00515 00516 accounts.forEachInitializedAccount(onAccount); 00517 00518 if (numDone != 0) 00519 reauthorizeBudgetSent = Date::now(); 00520 } 00521 00522 void 00523 SlaveBanker:: 00524 onReauthorizeBudgetMessage(const AccountKey & accountKey, 00525 std::exception_ptr exc, 00526 int responseCode, 00527 const std::string & payload) 00528 { 00529 //cerr << "finished reauthorize budget" << endl; 00530 00531 if (exc) { 00532 cerr << "reauthorize budget got exception" << payload << endl; 00533 cerr << "accountKey = " << accountKey << endl; 00534 abort(); // for now... 00535 return; 00536 } 00537 00538 Account masterAccount = Account::fromJson(Json::parse(payload)); 00539 accounts.syncFromMaster(accountKey, masterAccount); 00540 reauthorizeBudgetSent = Date(); 00541 } 00542 00543 } // namespace RTBKIT