RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
core/banker/migration/redis_migration.cc
00001 /* redis_migration.cc
00002    Wolfgang Sourdeau, 17 December 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004    
00005    Redis migration class from campaign:strategy schema to the new accounts
00006    schema
00007  */
00008 
00009 #include <jml/arch/futex.h>
00010 #include "soa/service/redis.h"
00011 
00012 #include "rtbkit/core/banker/account.h"
00013 #include "rtbkit/core/banker/master_banker.h"
00014 
00015 #include "redis_old_types.h"
00016 #include "redis_utils.h"
00017 
00018 #include "redis_migration.h"
00019 
00020 using namespace std;
00021 using namespace Redis;
00022 
00023 using namespace Datacratic;
00024 using namespace RTBKIT;
00025 
00026 typedef vector<string> ListOfStrings;
00027 
00028 /* REDISMIGRATION */
00029 
00030 namespace {
00031 
00032 /* load all keys with "campaigns:" */
00033 ListOfStrings
00034 FetchKeys(AsyncConnection & redis)
00035 {
00036     ListOfStrings keys;
00037 
00038     /* KEYS campaigns:* */
00039     Result keysResult = redis.exec(KEYS(CampaignsPrefix + "*"));
00040     if (!keysResult.ok())
00041         throw ML::Exception("redis error: " + keysResult.error());
00042 
00043     const Reply & keysReply = keysResult.reply();
00044     ExcAssert(keysReply.type() == ARRAY);
00045 
00046     int keysCount = keysReply.length();
00047     for (int i = 0; i < keysCount; i++) {
00048         keys.push_back(keysReply[i]);
00049     }
00050 
00051     return keys;
00052 }
00053 
00054 /* from all keys loaded above, build and load basic Campaign and Strategy
00055  * instances */
00056 void
00057 LoadCampaignsAndStrategies(AsyncConnection & redis, ListOfStrings & keys,
00058                            Campaigns & campaigns, Strategies & strategies,
00059                            int acceptedDelta)
00060 {
00061     for (string & fullKey: keys) {
00062         ListOfStrings parts = ML::split(fullKey, ':');
00063         string campaignKey = parts[1];
00064         switch (parts.size()) {
00065         case 2: {
00066             Campaign newCampaign(campaignKey);
00067             newCampaign.load(redis);
00068             campaigns.insert({campaignKey, newCampaign});
00069             break;
00070         }
00071         case 3: {
00072             string & strategyKey = parts[2];
00073             Strategy newStrategy(strategyKey, campaignKey);
00074             newStrategy.load(redis, acceptedDelta);
00075             strategies.insert({strategyKey, newStrategy});
00076             break;
00077         }
00078         default:
00079             cerr << "! element with key '" << fullKey << "' ignored" << endl;
00080         }
00081     }
00082 }
00083 
00084 /* load full campaigns (including strategies) from redis storage */
00085 void
00086 LoadCampaigns(shared_ptr<AsyncConnection> & redis, Campaigns & campaigns,
00087               int acceptedDelta)
00088 {
00089     ListOfStrings keys(FetchKeys(*redis));
00090 
00091     if (keys.size() == 0) {
00092         cerr << "No campaigns key found." << endl;
00093         return;
00094     }
00095 
00096     Strategies strategies;
00097     int valids(0), total(0);
00098     LoadCampaignsAndStrategies(*redis, keys, campaigns, strategies,
00099                                acceptedDelta);
00100     for (auto & pair: strategies) {
00101         pair.second.assignToParent(campaigns);
00102     } 
00103     for (auto & pair: campaigns) {
00104         if (pair.second.validateAll(acceptedDelta)) {
00105             valids++;
00106         }
00107         total++;
00108     }
00109     cerr << valids << " valid campaigns found on " << total << endl;
00110 }
00111 
00112 void
00113 ConvertCampaignsToAccounts(Campaigns & campaigns, Accounts & accounts)
00114 {
00115     for (auto & cPair: campaigns) {
00116         const Campaign & campaign = cPair.second;
00117         AccountKey parentKey(campaign.key_);
00118         if (campaign.validateAll(0)) {
00119             /* campaign = parent account (budget),
00120                strategy = child account (strategy budget),
00121                strategy-spent = grandchild account (strategy spent) */
00122             accounts.createBudgetAccount(parentKey);
00123 
00124             /* compute initial budget */
00125             MicroUSD budget(campaign.available_ + campaign.transferred_);
00126             accounts.setBudget(parentKey, CurrencyPool(budget));
00127             // parentAccount.checkInvariants();
00128             
00129             for (const Strategy & strategy: campaign.strategies) {
00130                 AccountKey budgetKey = parentKey.childKey(strategy.key_);
00131                 Account budgetAccount = accounts.createBudgetAccount(budgetKey);
00132                 MicroUSD childTransferred(strategy.transferred_);
00133 
00134                 accounts.setBalance(budgetKey, CurrencyPool(childTransferred), AT_NONE);
00135 
00136                 // cerr << "budgetAccount: " << budgetAccount.toJson().toString()
00137                 //      << endl;
00138                 AccountKey spendKey = budgetKey.childKey("legacyImported");
00139                 Account spendAccount = accounts.createSpendAccount(spendKey);
00140                 MicroUSD childSpent(strategy.spent_);
00141                 accounts.setBalance(spendKey, childSpent, AT_NONE);
00142                 accounts.importSpend(spendKey, childSpent);
00143 
00144                 // cerr << "spendAccount: " << spendAccount.toJson().toString()
00145                 //      << endl;
00146             }
00147             // cerr << "parentAccount: " << parentAccount.toJson().toString()
00148             //      << endl;
00149             // throw ML::Exception("breakpoint");
00150             cerr << "- conversion of " << parentKey << " completed" << endl;
00151         }
00152         else {
00153             cerr << "! conversion of " << parentKey << " skipped (invalid)" << endl;
00154         }
00155     }
00156 }
00157 
00158 void StoreAccounts(shared_ptr<Redis::AsyncConnection> & redis,
00159                    Accounts & accounts) {
00160     RedisBankerPersistence storage(redis);
00161     volatile int done(0);
00162     BankerPersistence::OnSavedCallback callback
00163         = [&](BankerPersistence::PersistenceCallbackStatus status,
00164               const std::string & info) {
00165         switch (status) {
00166         case BankerPersistence::SUCCESS: {
00167             cerr << "- accounts successfully saved" << endl;
00168             break;
00169         }
00170         case BankerPersistence::DATA_INCONSISTENCY: {
00171             Json::Value accountKeys = Json::parse(info);
00172             ExcAssert(accountKeys.type() == Json::arrayValue);
00173             for (Json::Value jsonKey: accountKeys) {
00174                 ExcAssert(jsonKey.type() == Json::stringValue);
00175                 string keyStr = jsonKey.asString();
00176                 accounts.markAccountOutOfSync(AccountKey(keyStr));
00177                 cerr << "! account '" << keyStr << "' is out of sync" << endl;
00178             }
00179             break;
00180         }
00181         case BankerPersistence::BACKEND_ERROR: {
00182             /* the backend is unavailable */
00183             cerr << "! a redis error occurred: " + info << endl;
00184         }
00185         default: {
00186             throw ML::Exception("erroneous status code");
00187         }
00188         }
00189         done = 1;
00190         ML::futex_wake(done);
00191     };
00192     storage.saveAll(accounts, callback);
00193     while (done == 0) {
00194         ML::futex_wait(done, 0);
00195     }
00196 }
00197 
00198 }
00199 
00200 void
00201 RedisMigration::
00202 perform(const Redis::Address & sourceAddress, int acceptedDelta,
00203         const Redis::Address & targetAddress)
00204 {
00205     cerr << "* Loading campaigns and strategies..." << endl;
00206     std::shared_ptr<AsyncConnection> sourceRedis
00207         = make_shared<AsyncConnection>(sourceAddress);
00208     sourceRedis->test();
00209     Campaigns campaigns;
00210     LoadCampaigns(sourceRedis, campaigns, acceptedDelta);
00211 
00212     cerr << "* Converting to accounts..." << endl;
00213     Accounts accounts;
00214     ConvertCampaignsToAccounts(campaigns, accounts);
00215     accounts.checkInvariants();
00216 
00217     std::vector<AccountKey> allKeys = accounts.getAccountKeys(AccountKey(), 1);
00218 
00219 #if 0
00220     cerr << "* Dumping account summaries..." << endl;
00221 
00222     for (AccountKey & key: allKeys) {
00223         cerr << "key: " << key << ":" << endl;
00224         cerr << accounts.getAccountSummary(key) << endl;
00225     }
00226 #elif 0
00227 /*
00228 key: transat_48_82:
00229 toplevel b:4985552748USD/1M s:4824138480USD/1M i:0 a:4843285829USD/1M j:0
00230   transat_48_82_btpe_241 b:2048712718USD/1M s:2037451854USD/1M i:0 a:2037451854USD/1M j:0
00231     legacyImported b:2037451854USD/1M s:2037451854USD/1M i:0 a:0 j:0
00232   transat_48_82_elastic_price_240 b:803044669USD/1M s:803044585USD/1M i:0 a:803044585USD/1M j:0
00233     legacyImported b:803044585USD/1M s:803044585USD/1M i:0 a:0 j:0
00234   transat_48_82_probe2_263 b:351880852USD/1M s:350587957USD/1M i:0 a:350587957USD/1M j:0
00235     legacyImported b:350587957USD/1M s:350587957USD/1M i:0 a:0 j:0
00236   transat_48_82_probe_239 b:1639647590USD/1M s:1633054084USD/1M i:0 a:1633054084USD/1M j:0
00237     legacyImported b:1633054084USD/1M s:1633054084USD/1M i:0 a:0 j:0
00238 
00239  */
00240     ListOfStrings keys = {"transat_48_82", "transat_48_82_btpe_241",
00241                           "legacyImported"};
00242     AccountKey key;
00243     for (string & keyStr: keys) {
00244         key = key.childKey(keyStr);
00245         const Account & account = accounts.getAccount(key);
00246         cerr << "json dump of " << key.toString() << ":" << endl;
00247         cerr << account.toJson();
00248     }
00249 #endif
00250 
00251     cerr << "* Storing converted accounts to Redis..." << endl;
00252     std::shared_ptr<AsyncConnection> targetRedis
00253         = make_shared<AsyncConnection>(targetAddress);
00254     targetRedis->test();
00255     StoreAccounts(targetRedis, accounts);
00256     cerr << "* Completed" << endl;
00257 }
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator