RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
core/banker/testing/redis_persistence_test.cc
00001 /* master_banker_test.cc
00002    Wolfgang Sourdeau, 13 December 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004    
00005    Unit tests for RedisBankerPersistence class
00006 */
00007 
00008 #define BOOST_TEST_MAIN
00009 #define BOOST_TEST_DYN_LINK
00010 
00011 #include <boost/test/unit_test.hpp>
00012 #include <jml/arch/futex.h>
00013 #include "soa/service/redis.h"
00014 #include "soa/service/testing/redis_temporary_server.h"
00015 
00016 #include "rtbkit/core/banker/account.h"
00017 #include "rtbkit/core/banker/master_banker.h"
00018 
00019 using namespace std;
00020 
00021 using namespace Datacratic;
00022 using namespace RTBKIT;
00023 using namespace Redis;
00024 
00025 BOOST_AUTO_TEST_CASE( test_redis_persistence_loadall )
00026 {
00027     RedisTemporaryServer redis;
00028     std::shared_ptr<AsyncConnection> connection
00029         = std::make_shared<AsyncConnection>(redis);
00030     RedisBankerPersistence storage(connection);
00031     int done(false);
00032 
00033     /* test the behaviour of rp with an empty database */
00034     auto OnLoaded_EmptyRedis = [&] (std::shared_ptr<Accounts> accounts,
00035                                     BankerPersistence::PersistenceCallbackStatus status,
00036                                     const string & info) {
00037         BOOST_CHECK_EQUAL(status, BankerPersistence::SUCCESS);
00038         BOOST_CHECK_EQUAL(info, "");
00039 
00040         /* "banker:accounts" does not exist */
00041         vector<AccountKey> acctKeys = accounts->getAccountKeys();
00042         BOOST_CHECK_EQUAL(acctKeys.size(), 0);
00043         done = true;
00044         ML::futex_wake(done);
00045     };
00046     storage.loadAll("", OnLoaded_EmptyRedis);
00047     while (!done) {
00048         ML::futex_wait(done, false);
00049     }
00050 
00051     /* wrong value type for banker:accounts */
00052     connection->exec(SET("banker:accounts", "myvalue"));
00053     done = false;
00054     auto OnLoaded_BadBankerAccounts1
00055         = [&] (std::shared_ptr<Accounts> accounts,
00056                BankerPersistence::PersistenceCallbackStatus status,
00057                const string & info) {
00058         /* this could be a DATA_INCONSISTENCY error, but it is handled
00059            directly by the backend */
00060         BOOST_CHECK_EQUAL(status, BankerPersistence::BACKEND_ERROR);
00061         BOOST_CHECK(info.length() != 0); /* we ignore the actual message */
00062         done = true;
00063         ML::futex_wake(done);
00064     };
00065     storage.loadAll("", OnLoaded_BadBankerAccounts1);
00066     while (!done) {
00067         ML::futex_wait(done, false);
00068     }
00069     connection->exec(DEL("banker:accounts"));
00070 
00071     /* nil/void entries in banker:accounts */
00072     Command sadd(SADD("banker:accounts"));
00073     sadd.addArg("account1");
00074     connection->exec(sadd);
00075     done = false;
00076     auto OnLoaded_BadBankerAccounts2
00077         = [&] (std::shared_ptr<Accounts> accounts,
00078                BankerPersistence::PersistenceCallbackStatus status,
00079                const string & info) {
00080         BOOST_CHECK_EQUAL(status, BankerPersistence::DATA_INCONSISTENCY);
00081         BOOST_CHECK(info.length() != 0); /* we ignore the actual message */
00082         // cerr << "error = " + info << endl;
00083         done = true;
00084         ML::futex_wake(done);
00085     };
00086     storage.loadAll("", OnLoaded_BadBankerAccounts2);
00087     while (!done) {
00088         ML::futex_wait(done, false);
00089     }
00090 
00091     /* valid account1 */
00092     Account account1;
00093     account1.type = AT_BUDGET;
00094     account1.budgetIncreases = Amount(MicroUSD(123456));
00095     account1.budgetDecreases = Amount(MicroUSD(0));
00096     account1.recycledIn = Amount(MicroUSD(987654));
00097     account1.allocatedIn = Amount(MicroUSD(4567));
00098     account1.commitmentsRetired = Amount(MicroUSD(898989));
00099     account1.adjustmentsIn = Amount(MicroUSD(17171717));
00100     account1.recycledOut = Amount(MicroUSD(64949494));
00101     account1.allocatedOut = Amount(MicroUSD(8778777));
00102     account1.commitmentsMade = Amount(MicroUSD(10101010));
00103     account1.adjustmentsOut = Amount(MicroUSD(9999999));
00104     account1.spent = Amount(MicroUSD(10111213));
00105     account1.balance = ((account1.budgetIncreases + account1.recycledIn
00106                          + account1.allocatedIn + account1.commitmentsRetired
00107                          + account1.adjustmentsIn)
00108                         - (account1.spent + account1.recycledOut
00109                            + account1.allocatedOut + account1.commitmentsMade
00110                            + account1.adjustmentsOut));
00111 
00112     Json::Value account1Json(account1.toJson());
00113     connection->exec(SET("banker-account1", account1Json.toString()));
00114     connection->exec(SET("banker-account2", account1Json.toString()));
00115     done = false;
00116     auto OnLoaded_ValidAccount
00117         = [&] (std::shared_ptr<Accounts> accounts,
00118                BankerPersistence::PersistenceCallbackStatus status,
00119                const string & info) {
00120         BOOST_CHECK_EQUAL(status, BankerPersistence::SUCCESS);
00121         /* error message should be empty */
00122         BOOST_CHECK(info.length() == 0);
00123 
00124         /* only returned account = "account1" */
00125         std::vector<AccountKey> accountKeys = accounts->getAccountKeys();
00126         BOOST_CHECK_EQUAL(accountKeys.size(), 1);
00127         BOOST_CHECK_EQUAL(accountKeys[0].toString(), "account1");
00128 
00129         Account storedAccount = accounts->getAccount(accountKeys[0]);
00130         Json::Value storedAccountJson = storedAccount.toJson();
00131         BOOST_CHECK_EQUAL(account1Json, storedAccountJson);
00132         done = true;
00133         ML::futex_wake(done);
00134     };
00135     storage.loadAll("", OnLoaded_ValidAccount);
00136     while (!done) {
00137         ML::futex_wait(done, false);
00138     }
00139 }
00140 
00141 BOOST_AUTO_TEST_CASE( test_redis_persistence_saveall )
00142 {
00143     RedisTemporaryServer redis;
00144     std::shared_ptr<AsyncConnection> connection
00145         = std::make_shared<AsyncConnection>(redis);
00146     RedisBankerPersistence storage(connection);
00147     int done(false);
00148 
00149     /* generic callback for all subsequent saveAll invocations */
00150     BankerPersistence::PersistenceCallbackStatus lastStatus;
00151     string lastInfo;
00152     auto OnSavedCallback
00153         = [&] (BankerPersistence::PersistenceCallbackStatus status,
00154                const string & info) {
00155         lastStatus = status;
00156         lastInfo = info;
00157         done = true;
00158         ML::futex_wake(done);
00159     };
00160 
00161     /* basic account setup */
00162     Accounts accounts;
00163     AccountKey parentKey("parent"), childKey("parent:child");
00164     accounts.createAccount(parentKey, AT_BUDGET);
00165     accounts.createAccount(childKey, AT_SPEND);
00166     accounts.setBudget(parentKey, MicroUSD(123456));
00167     accounts.setBalance(childKey, MicroUSD(1234), AT_NONE);
00168 
00169     /* 1. we save an account that does not exist yet in the storage */
00170     storage.saveAll(accounts, OnSavedCallback);
00171     while (!done) {
00172         ML::futex_wait(done, false);
00173     }
00174 
00175     /* this operation should succeed */
00176     BOOST_CHECK_EQUAL(lastStatus, BankerPersistence::SUCCESS);
00177     BOOST_CHECK(lastInfo.length() == 0);
00178 
00179     /* the new accounts should have been registered in the "banker:accounts"
00180        set */
00181     Redis::Result result = connection->exec(SMEMBERS("banker:accounts"), 5);
00182     BOOST_CHECK(result.ok());
00183     const Reply & keysReply = result.reply();
00184     BOOST_CHECK_EQUAL(keysReply.type(), ARRAY);
00185     BOOST_CHECK_EQUAL(keysReply.length(), 2);
00186     BOOST_CHECK((keysReply[0].asString() == "parent"
00187                  && keysReply[1].asString() == "parent:child")
00188                 || (keysReply[0].asString() == "parent:child"
00189                     && keysReply[1].asString() == "parent"));
00190 
00191     /* make sure that the correct data has been stored for "parent" */
00192     result = connection->exec(GET("banker-parent"), 5);
00193     BOOST_CHECK(result.ok());
00194     const Reply & parentReply = result.reply();
00195     BOOST_CHECK_EQUAL(parentReply.type(), STRING);
00196     Json::Value accountJson(accounts.getAccount(parentKey).toJson());
00197     accountJson["spent-tracking"] = Json::Value(Json::objectValue);
00198     Json::Value storageJson = Json::parse(parentReply.asString());
00199     BOOST_CHECK_EQUAL(accountJson, storageJson);
00200 
00201     /* make sure that the correct data has been stored for "parent:child" */
00202     result = connection->exec(GET("banker-parent:child"), 5);
00203     BOOST_CHECK(result.ok());
00204     const Reply & childReply = result.reply();
00205     BOOST_CHECK_EQUAL(childReply.type(), STRING);
00206     accountJson = accounts.getAccount(childKey).toJson();
00207     storageJson = Json::parse(childReply.asString());
00208     BOOST_CHECK_EQUAL(accountJson, storageJson);
00209 
00210     /* 2. we update an existing account and reperform the same tests */
00211     accounts.importSpend(childKey, MicroUSD(123));
00212     done = false;
00213     storage.saveAll(accounts, OnSavedCallback);
00214     while (!done) {
00215         ML::futex_wait(done, false);
00216     }
00217 
00218     /* this operation should succeed */
00219     BOOST_CHECK_EQUAL(lastStatus, BankerPersistence::SUCCESS);
00220     BOOST_CHECK(lastInfo.length() == 0);
00221 
00222     /* the same accounts should be registered in the "banker:accounts" set */
00223     result = connection->exec(SMEMBERS("banker:accounts"), 5);
00224     BOOST_CHECK(result.ok());
00225     const Reply & retryKeysReply = result.reply();
00226     BOOST_CHECK_EQUAL(retryKeysReply.type(), ARRAY);
00227     BOOST_CHECK_EQUAL(retryKeysReply.length(), 2);
00228     BOOST_CHECK((retryKeysReply[0].asString() == "parent"
00229                  && retryKeysReply[1].asString() == "parent:child")
00230                 || (retryKeysReply[0].asString() == "parent:child"
00231                     && retryKeysReply[1].asString() == "parent"));
00232 
00233     /* make sure "parent:child" has been properly updated */
00234     result = connection->exec(GET("banker-parent:child"), 5);
00235     BOOST_CHECK(result.ok());
00236     const Reply & updatedChildReply = result.reply();
00237     BOOST_CHECK_EQUAL(updatedChildReply.type(), STRING);
00238     accountJson = accounts.getAccount(childKey).toJson();
00239     storageJson = Json::parse(updatedChildReply.asString());
00240     BOOST_CHECK_EQUAL(accountJson, storageJson);
00241 
00242     /* 3. we save another instance of the save accounts, like when two bankers
00243      * are running concurrently, and test the error reporting */
00244 
00245     /* we save the "future" copy */
00246     Accounts accounts2 = accounts;
00247     accounts2.importSpend(childKey, MicroUSD(123));
00248     done = false;
00249     accountJson = accounts.getAccount(childKey).toJson();
00250     storage.saveAll(accounts2, OnSavedCallback);
00251     while (!done) {
00252         ML::futex_wait(done, false);
00253     }
00254     /* this operation should succeed */
00255     BOOST_CHECK_EQUAL(lastStatus, BankerPersistence::SUCCESS);
00256     BOOST_CHECK(lastInfo.length() == 0);
00257     /* make sure "parent:child" has been properly updated with the value from
00258      * accounts2 */
00259     result = connection->exec(GET("banker-parent:child"), 5);
00260     BOOST_CHECK(result.ok());
00261     const Reply & updatedChild2Reply = result.reply();
00262     BOOST_CHECK_EQUAL(updatedChild2Reply.type(), STRING);
00263     accountJson = accounts2.getAccount(childKey).toJson();
00264     storageJson = Json::parse(updatedChild2Reply.asString());
00265     BOOST_CHECK_EQUAL(accountJson, storageJson);
00266 
00267     /* we attempt to save a previous state */
00268     done = false;
00269     storage.saveAll(accounts, OnSavedCallback);
00270     while (!done) {
00271         ML::futex_wait(done, false);
00272     }
00273     /* this operation should fail */
00274     BOOST_CHECK_EQUAL(lastStatus, BankerPersistence::DATA_INCONSISTENCY);
00275     /* account "parent:child" is reported as out of sync */
00276     BOOST_CHECK_EQUAL(lastInfo, "[\"parent:child\"]");
00277 
00278     /* 4. we ensure that accounts marked as out of sync are silently
00279      * ignored */
00280     Json::Value expectedStorageJson = storageJson;
00281 
00282     accounts2.importSpend(childKey, MicroUSD(12));
00283     accounts2.markAccountOutOfSync(childKey);
00284 
00285     done = false;
00286     storage.saveAll(accounts2, OnSavedCallback);
00287     while (!done) {
00288         ML::futex_wait(done, false);
00289     }
00290     /* this operation should succeed */
00291     BOOST_CHECK_EQUAL(lastStatus, BankerPersistence::SUCCESS);
00292     BOOST_CHECK(lastInfo.length() == 0);
00293 
00294     result = connection->exec(GET("banker-parent:child"), 5);
00295     BOOST_CHECK(result.ok());
00296     const Reply &outOfSyncChildReply = result.reply();
00297     BOOST_CHECK_EQUAL(outOfSyncChildReply.type(), STRING);
00298     storageJson = Json::parse(outOfSyncChildReply.asString());
00299 
00300     /* the last expense of 12 mUSD must not be present in the stored account */
00301     BOOST_CHECK_EQUAL(expectedStorageJson, storageJson);
00302 }
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator