![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* redis_rollback.cc 00002 Wolfgang Sourdeau, 7 January 2013 00003 Copyright (c) 2013 Datacratic. All rights reserved. 00004 00005 Redis migration rollback class 00006 */ 00007 00008 #include <jml/arch/futex.h> 00009 #include "soa/service/redis.h" 00010 00011 #include "rtbkit/core/banker/account.h" 00012 #include "rtbkit/core/banker/master_banker.h" 00013 00014 #include "redis_old_types.h" 00015 00016 #include "redis_rollback.h" 00017 00018 using namespace std; 00019 using namespace Redis; 00020 00021 using namespace Datacratic; 00022 using namespace RTBKIT; 00023 00024 namespace { 00025 00026 long long int 00027 CurrencyPoolToLongLong(const CurrencyPool & pool) 00028 { 00029 long long int result(0); 00030 00031 for (const Amount & amount: pool.currencyAmounts) { 00032 if (amount.currencyCode != CurrencyCode::CC_USD) { 00033 throw ML::Exception("unhandled currency code: " 00034 + Amount::getCurrencyStr(amount.currencyCode)); 00035 } 00036 result += amount.value; 00037 } 00038 00039 return result; 00040 } 00041 00042 void 00043 ConvertAccountsToCampaigns(const Accounts & accounts, Campaigns & campaigns) 00044 { 00045 /* conversion steps: 00046 00047 campaign: 00048 transferred = summary.allocated 00049 available = summary.budget - transferred 00050 00051 strategy: 00052 transferred = summary.budget 00053 spent = summary.spent 00054 available = transferred - spent 00055 */ 00056 00057 std::vector<AccountKey> keys = accounts.getAccountKeys(); 00058 if (keys.size() > 0) { 00059 #if 0 /* code to produce artificial expenses and see how they are converted 00060 back (requires non-const accounts) */ 00061 for (AccountKey & key: accounts.getAccountKeys()) { 00062 if (key.size() == 2) { 00063 accounts.setBalance(key, accounts.getAvailable(key) + MicroUSD(1234), AT_NONE); 00064 AccountKey childKey = key.childKey("legacyImported"); 00065 cerr << "child key: " << childKey << endl; 00066 if ((accounts.getAvailable(key) - MicroUSD(123)).isNonNegative()) { 00067 accounts.setBalance(childKey, MicroUSD(123), AT_NONE); 00068 accounts.addSpend(childKey, MicroUSD(12)); 00069 } 00070 } 00071 } 00072 #endif 00073 00074 cerr << keys.size() << " accounts found (including subaccounts)" << endl; 00075 for (AccountKey & key: accounts.getAccountKeys()) { 00076 const string & campaignKey = key[0]; 00077 const AccountSummary & summary = accounts.getAccountSummary(key); 00078 Campaign & campaign = campaigns[campaignKey]; 00079 switch (key.size()) { 00080 case 1: { /* campaign */ 00081 campaign.key_ = campaignKey; 00082 throw ML::Exception("this code is now obsolete due to missing" 00083 " members in AccountSummary"); 00084 #if 0 00085 campaign.transferred_ 00086 = CurrencyPoolToLongLong(summary.allocated); 00087 campaign.available_ 00088 = CurrencyPoolToLongLong(summary.budget) - campaign.transferred_; 00089 #endif 00090 cerr << "- campaign 'campaigns:" + campaignKey + "' recreated" 00091 << endl; 00092 break; 00093 } 00094 case 2: { /* strategy (budget) */ 00095 const string & strategyKey = key[1]; 00096 Strategy strategy(strategyKey, campaignKey); 00097 strategy.valid_ = true; /* we trust the consistency of 00098 accounts */ 00099 strategy.transferred_ = CurrencyPoolToLongLong(summary.budget); 00100 strategy.spent_ = CurrencyPoolToLongLong(summary.spent); 00101 strategy.available_ = strategy.transferred_ - strategy.spent_; 00102 campaign.strategies.push_back(strategy); 00103 cerr << ("- strategy 'campaigns:" 00104 + campaignKey + ":" + strategyKey 00105 + "' recreated") 00106 << endl; 00107 break; 00108 } 00109 default: 00110 break; 00111 } 00112 } 00113 } 00114 else { 00115 cerr << "No account to convert." << endl; 00116 } 00117 } 00118 00119 void 00120 StoreCampaigns(Redis::AsyncConnection & redis, 00121 Campaigns & campaigns) 00122 { 00123 for (auto & it: campaigns) { 00124 const Campaign & campaign = it.second; 00125 if (campaign.validateAll(0)) { 00126 campaign.save(redis); 00127 for (const Strategy & strategy: campaign.strategies) { 00128 strategy.save(redis); 00129 } 00130 } 00131 } 00132 } 00133 00134 } 00135 00136 namespace RTBKIT { 00137 00138 void 00139 RedisRollback:: 00140 perform(const Redis::Address & sourceAddress, 00141 const Redis::Address & targetAddress) 00142 { 00143 auto sourceRedis = make_shared<AsyncConnection>(sourceAddress); 00144 sourceRedis->test(); 00145 00146 std::shared_ptr<Accounts> accounts; 00147 int done(false); 00148 auto callback = [&] (std::shared_ptr<Accounts> newAccounts, 00149 BankerPersistence::PersistenceCallbackStatus status, 00150 const std::string & info) { 00151 if (status != BankerPersistence::PersistenceCallbackStatus::SUCCESS) { 00152 throw ML::Exception("error during account loading: " + info); 00153 } 00154 accounts = newAccounts; 00155 accounts->checkInvariants(); 00156 done = true; 00157 ML::futex_wake(done); 00158 }; 00159 cerr << "* Loading accounts..." << endl; 00160 RedisBankerPersistence storage(sourceRedis); 00161 storage.loadAll("", callback); 00162 while (!done) { 00163 ML::futex_wait(done, false); 00164 } 00165 00166 cerr << "* Converting to campaigns/strategies..." << endl; 00167 Campaigns campaigns; 00168 ConvertAccountsToCampaigns(*accounts, campaigns); 00169 00170 cerr << "* Storing converted accounts to Redis..." << endl; 00171 auto targetRedis = make_shared<AsyncConnection>(targetAddress); 00172 targetRedis->test(); 00173 StoreCampaigns(*targetRedis, campaigns); 00174 cerr << "* Completed" << endl; 00175 } 00176 00177 } // namespace RTBKIT