RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
core/banker/migration/redis_rollback.cc
00001 /* redis_rollback.cc
00002    Wolfgang Sourdeau, 7 January 2013
00003    Copyright (c) 2013 Datacratic.  All rights reserved.
00004    
00005    Redis migration rollback class
00006  */
00007 
00008 #include <jml/arch/futex.h>
00009 #include "soa/service/redis.h"
00010 
00011 #include "rtbkit/core/banker/account.h"
00012 #include "rtbkit/core/banker/master_banker.h"
00013 
00014 #include "redis_old_types.h"
00015 
00016 #include "redis_rollback.h"
00017 
00018 using namespace std;
00019 using namespace Redis;
00020 
00021 using namespace Datacratic;
00022 using namespace RTBKIT;
00023 
00024 namespace {
00025 
00026 long long int
00027 CurrencyPoolToLongLong(const CurrencyPool & pool)
00028 {
00029     long long int result(0);
00030 
00031     for (const Amount & amount: pool.currencyAmounts) {
00032         if (amount.currencyCode != CurrencyCode::CC_USD) {
00033             throw ML::Exception("unhandled currency code: "
00034                                 + Amount::getCurrencyStr(amount.currencyCode));
00035         }
00036         result += amount.value;
00037     }
00038 
00039     return result;
00040 }
00041 
00042 void
00043 ConvertAccountsToCampaigns(const Accounts & accounts, Campaigns & campaigns)
00044 {
00045     /* conversion steps:
00046 
00047        campaign:
00048          transferred = summary.allocated
00049          available =   summary.budget - transferred
00050 
00051        strategy:
00052          transferred = summary.budget
00053          spent =       summary.spent
00054          available =   transferred - spent
00055     */
00056 
00057     std::vector<AccountKey> keys = accounts.getAccountKeys();
00058     if (keys.size() > 0) {
00059 #if 0 /* code to produce artificial expenses and see how they are converted
00060          back (requires non-const accounts) */
00061         for (AccountKey & key: accounts.getAccountKeys()) {
00062             if (key.size() == 2) {
00063                 accounts.setBalance(key, accounts.getAvailable(key) + MicroUSD(1234), AT_NONE);
00064                 AccountKey childKey = key.childKey("legacyImported");
00065                 cerr << "child key: " << childKey << endl;
00066                 if ((accounts.getAvailable(key) - MicroUSD(123)).isNonNegative()) {
00067                     accounts.setBalance(childKey, MicroUSD(123), AT_NONE);
00068                     accounts.addSpend(childKey, MicroUSD(12));
00069                 }
00070             }
00071         }
00072 #endif
00073 
00074         cerr << keys.size() << " accounts found (including subaccounts)" << endl;
00075         for (AccountKey & key: accounts.getAccountKeys()) {
00076             const string & campaignKey = key[0];
00077             const AccountSummary & summary = accounts.getAccountSummary(key);
00078             Campaign & campaign = campaigns[campaignKey];
00079             switch (key.size()) {
00080             case 1: { /* campaign */
00081                 campaign.key_ = campaignKey;
00082                 throw ML::Exception("this code is now obsolete due to missing"
00083                                     " members in AccountSummary");
00084 #if 0
00085                 campaign.transferred_
00086                     = CurrencyPoolToLongLong(summary.allocated);
00087                 campaign.available_
00088                     = CurrencyPoolToLongLong(summary.budget) - campaign.transferred_;
00089 #endif
00090                 cerr << "- campaign 'campaigns:" + campaignKey + "' recreated"
00091                      << endl;
00092                 break;
00093             }
00094             case 2: { /* strategy (budget) */
00095                 const string & strategyKey = key[1];
00096                 Strategy strategy(strategyKey, campaignKey);
00097                 strategy.valid_ = true; /* we trust the consistency of
00098                                            accounts */
00099                 strategy.transferred_ = CurrencyPoolToLongLong(summary.budget);
00100                 strategy.spent_ = CurrencyPoolToLongLong(summary.spent);
00101                 strategy.available_ = strategy.transferred_ - strategy.spent_;
00102                 campaign.strategies.push_back(strategy);
00103                 cerr << ("- strategy 'campaigns:"
00104                          + campaignKey + ":" + strategyKey
00105                          + "' recreated")
00106                      << endl;
00107                 break;
00108             }
00109             default:
00110                 break;
00111             }
00112         }
00113     }
00114     else {
00115         cerr << "No account to convert." << endl;
00116     }
00117 }
00118 
00119 void
00120 StoreCampaigns(Redis::AsyncConnection & redis,
00121                Campaigns & campaigns)
00122 {
00123     for (auto & it: campaigns) {
00124         const Campaign & campaign = it.second;
00125         if (campaign.validateAll(0)) {
00126             campaign.save(redis);
00127             for (const Strategy & strategy: campaign.strategies) {
00128                 strategy.save(redis);
00129             }
00130         }
00131     }
00132 }
00133 
00134 }
00135 
00136 namespace RTBKIT {
00137 
00138 void
00139 RedisRollback::
00140 perform(const Redis::Address & sourceAddress,
00141         const Redis::Address & targetAddress)
00142 {
00143     auto sourceRedis = make_shared<AsyncConnection>(sourceAddress);
00144     sourceRedis->test();
00145 
00146     std::shared_ptr<Accounts> accounts;
00147     int done(false);
00148     auto callback = [&] (std::shared_ptr<Accounts> newAccounts,
00149                          BankerPersistence::PersistenceCallbackStatus status,
00150                          const std::string & info) {
00151         if (status != BankerPersistence::PersistenceCallbackStatus::SUCCESS) {
00152             throw ML::Exception("error during account loading: " + info);
00153         }
00154         accounts = newAccounts;
00155         accounts->checkInvariants();
00156         done = true;
00157         ML::futex_wake(done);
00158     };
00159     cerr << "* Loading accounts..." << endl;
00160     RedisBankerPersistence storage(sourceRedis);
00161     storage.loadAll("", callback);
00162     while (!done) {
00163         ML::futex_wait(done, false);
00164     }
00165 
00166     cerr << "* Converting to campaigns/strategies..." << endl;
00167     Campaigns campaigns;
00168     ConvertAccountsToCampaigns(*accounts, campaigns);
00169 
00170     cerr << "* Storing converted accounts to Redis..." << endl;
00171     auto targetRedis = make_shared<AsyncConnection>(targetAddress);
00172     targetRedis->test();
00173     StoreCampaigns(*targetRedis, campaigns);
00174     cerr << "* Completed" << endl;
00175 }
00176 
00177 } // namespace RTBKIT
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator