RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* redis_migration.cc 00002 Wolfgang Sourdeau, 17 December 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 Redis migration class from campaign:strategy schema to the new accounts 00006 schema 00007 */ 00008 00009 #include <jml/arch/futex.h> 00010 #include "soa/service/redis.h" 00011 00012 #include "rtbkit/core/banker/account.h" 00013 #include "rtbkit/core/banker/master_banker.h" 00014 00015 #include "redis_old_types.h" 00016 #include "redis_utils.h" 00017 00018 #include "redis_migration.h" 00019 00020 using namespace std; 00021 using namespace Redis; 00022 00023 using namespace Datacratic; 00024 using namespace RTBKIT; 00025 00026 typedef vector<string> ListOfStrings; 00027 00028 /* REDISMIGRATION */ 00029 00030 namespace { 00031 00032 /* load all keys with "campaigns:" */ 00033 ListOfStrings 00034 FetchKeys(AsyncConnection & redis) 00035 { 00036 ListOfStrings keys; 00037 00038 /* KEYS campaigns:* */ 00039 Result keysResult = redis.exec(KEYS(CampaignsPrefix + "*")); 00040 if (!keysResult.ok()) 00041 throw ML::Exception("redis error: " + keysResult.error()); 00042 00043 const Reply & keysReply = keysResult.reply(); 00044 ExcAssert(keysReply.type() == ARRAY); 00045 00046 int keysCount = keysReply.length(); 00047 for (int i = 0; i < keysCount; i++) { 00048 keys.push_back(keysReply[i]); 00049 } 00050 00051 return keys; 00052 } 00053 00054 /* from all keys loaded above, build and load basic Campaign and Strategy 00055 * instances */ 00056 void 00057 LoadCampaignsAndStrategies(AsyncConnection & redis, ListOfStrings & keys, 00058 Campaigns & campaigns, Strategies & strategies, 00059 int acceptedDelta) 00060 { 00061 for (string & fullKey: keys) { 00062 ListOfStrings parts = ML::split(fullKey, ':'); 00063 string campaignKey = parts[1]; 00064 switch (parts.size()) { 00065 case 2: { 00066 Campaign newCampaign(campaignKey); 00067 newCampaign.load(redis); 00068 campaigns.insert({campaignKey, newCampaign}); 00069 break; 00070 } 00071 case 3: { 00072 string & strategyKey = parts[2]; 00073 Strategy newStrategy(strategyKey, campaignKey); 00074 newStrategy.load(redis, acceptedDelta); 00075 strategies.insert({strategyKey, newStrategy}); 00076 break; 00077 } 00078 default: 00079 cerr << "! element with key '" << fullKey << "' ignored" << endl; 00080 } 00081 } 00082 } 00083 00084 /* load full campaigns (including strategies) from redis storage */ 00085 void 00086 LoadCampaigns(shared_ptr<AsyncConnection> & redis, Campaigns & campaigns, 00087 int acceptedDelta) 00088 { 00089 ListOfStrings keys(FetchKeys(*redis)); 00090 00091 if (keys.size() == 0) { 00092 cerr << "No campaigns key found." << endl; 00093 return; 00094 } 00095 00096 Strategies strategies; 00097 int valids(0), total(0); 00098 LoadCampaignsAndStrategies(*redis, keys, campaigns, strategies, 00099 acceptedDelta); 00100 for (auto & pair: strategies) { 00101 pair.second.assignToParent(campaigns); 00102 } 00103 for (auto & pair: campaigns) { 00104 if (pair.second.validateAll(acceptedDelta)) { 00105 valids++; 00106 } 00107 total++; 00108 } 00109 cerr << valids << " valid campaigns found on " << total << endl; 00110 } 00111 00112 void 00113 ConvertCampaignsToAccounts(Campaigns & campaigns, Accounts & accounts) 00114 { 00115 for (auto & cPair: campaigns) { 00116 const Campaign & campaign = cPair.second; 00117 AccountKey parentKey(campaign.key_); 00118 if (campaign.validateAll(0)) { 00119 /* campaign = parent account (budget), 00120 strategy = child account (strategy budget), 00121 strategy-spent = grandchild account (strategy spent) */ 00122 accounts.createBudgetAccount(parentKey); 00123 00124 /* compute initial budget */ 00125 MicroUSD budget(campaign.available_ + campaign.transferred_); 00126 accounts.setBudget(parentKey, CurrencyPool(budget)); 00127 // parentAccount.checkInvariants(); 00128 00129 for (const Strategy & strategy: campaign.strategies) { 00130 AccountKey budgetKey = parentKey.childKey(strategy.key_); 00131 Account budgetAccount = accounts.createBudgetAccount(budgetKey); 00132 MicroUSD childTransferred(strategy.transferred_); 00133 00134 accounts.setBalance(budgetKey, CurrencyPool(childTransferred), AT_NONE); 00135 00136 // cerr << "budgetAccount: " << budgetAccount.toJson().toString() 00137 // << endl; 00138 AccountKey spendKey = budgetKey.childKey("legacyImported"); 00139 Account spendAccount = accounts.createSpendAccount(spendKey); 00140 MicroUSD childSpent(strategy.spent_); 00141 accounts.setBalance(spendKey, childSpent, AT_NONE); 00142 accounts.importSpend(spendKey, childSpent); 00143 00144 // cerr << "spendAccount: " << spendAccount.toJson().toString() 00145 // << endl; 00146 } 00147 // cerr << "parentAccount: " << parentAccount.toJson().toString() 00148 // << endl; 00149 // throw ML::Exception("breakpoint"); 00150 cerr << "- conversion of " << parentKey << " completed" << endl; 00151 } 00152 else { 00153 cerr << "! conversion of " << parentKey << " skipped (invalid)" << endl; 00154 } 00155 } 00156 } 00157 00158 void StoreAccounts(shared_ptr<Redis::AsyncConnection> & redis, 00159 Accounts & accounts) { 00160 RedisBankerPersistence storage(redis); 00161 volatile int done(0); 00162 BankerPersistence::OnSavedCallback callback 00163 = [&](BankerPersistence::PersistenceCallbackStatus status, 00164 const std::string & info) { 00165 switch (status) { 00166 case BankerPersistence::SUCCESS: { 00167 cerr << "- accounts successfully saved" << endl; 00168 break; 00169 } 00170 case BankerPersistence::DATA_INCONSISTENCY: { 00171 Json::Value accountKeys = Json::parse(info); 00172 ExcAssert(accountKeys.type() == Json::arrayValue); 00173 for (Json::Value jsonKey: accountKeys) { 00174 ExcAssert(jsonKey.type() == Json::stringValue); 00175 string keyStr = jsonKey.asString(); 00176 accounts.markAccountOutOfSync(AccountKey(keyStr)); 00177 cerr << "! account '" << keyStr << "' is out of sync" << endl; 00178 } 00179 break; 00180 } 00181 case BankerPersistence::BACKEND_ERROR: { 00182 /* the backend is unavailable */ 00183 cerr << "! a redis error occurred: " + info << endl; 00184 } 00185 default: { 00186 throw ML::Exception("erroneous status code"); 00187 } 00188 } 00189 done = 1; 00190 ML::futex_wake(done); 00191 }; 00192 storage.saveAll(accounts, callback); 00193 while (done == 0) { 00194 ML::futex_wait(done, 0); 00195 } 00196 } 00197 00198 } 00199 00200 void 00201 RedisMigration:: 00202 perform(const Redis::Address & sourceAddress, int acceptedDelta, 00203 const Redis::Address & targetAddress) 00204 { 00205 cerr << "* Loading campaigns and strategies..." << endl; 00206 std::shared_ptr<AsyncConnection> sourceRedis 00207 = make_shared<AsyncConnection>(sourceAddress); 00208 sourceRedis->test(); 00209 Campaigns campaigns; 00210 LoadCampaigns(sourceRedis, campaigns, acceptedDelta); 00211 00212 cerr << "* Converting to accounts..." << endl; 00213 Accounts accounts; 00214 ConvertCampaignsToAccounts(campaigns, accounts); 00215 accounts.checkInvariants(); 00216 00217 std::vector<AccountKey> allKeys = accounts.getAccountKeys(AccountKey(), 1); 00218 00219 #if 0 00220 cerr << "* Dumping account summaries..." << endl; 00221 00222 for (AccountKey & key: allKeys) { 00223 cerr << "key: " << key << ":" << endl; 00224 cerr << accounts.getAccountSummary(key) << endl; 00225 } 00226 #elif 0 00227 /* 00228 key: transat_48_82: 00229 toplevel b:4985552748USD/1M s:4824138480USD/1M i:0 a:4843285829USD/1M j:0 00230 transat_48_82_btpe_241 b:2048712718USD/1M s:2037451854USD/1M i:0 a:2037451854USD/1M j:0 00231 legacyImported b:2037451854USD/1M s:2037451854USD/1M i:0 a:0 j:0 00232 transat_48_82_elastic_price_240 b:803044669USD/1M s:803044585USD/1M i:0 a:803044585USD/1M j:0 00233 legacyImported b:803044585USD/1M s:803044585USD/1M i:0 a:0 j:0 00234 transat_48_82_probe2_263 b:351880852USD/1M s:350587957USD/1M i:0 a:350587957USD/1M j:0 00235 legacyImported b:350587957USD/1M s:350587957USD/1M i:0 a:0 j:0 00236 transat_48_82_probe_239 b:1639647590USD/1M s:1633054084USD/1M i:0 a:1633054084USD/1M j:0 00237 legacyImported b:1633054084USD/1M s:1633054084USD/1M i:0 a:0 j:0 00238 00239 */ 00240 ListOfStrings keys = {"transat_48_82", "transat_48_82_btpe_241", 00241 "legacyImported"}; 00242 AccountKey key; 00243 for (string & keyStr: keys) { 00244 key = key.childKey(keyStr); 00245 const Account & account = accounts.getAccount(key); 00246 cerr << "json dump of " << key.toString() << ":" << endl; 00247 cerr << account.toJson(); 00248 } 00249 #endif 00250 00251 cerr << "* Storing converted accounts to Redis..." << endl; 00252 std::shared_ptr<AsyncConnection> targetRedis 00253 = make_shared<AsyncConnection>(targetAddress); 00254 targetRedis->test(); 00255 StoreAccounts(targetRedis, accounts); 00256 cerr << "* Completed" << endl; 00257 }