RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* master_banker_test.cc 00002 Wolfgang Sourdeau, 13 December 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 Unit tests for RedisBankerPersistence class 00006 */ 00007 00008 #define BOOST_TEST_MAIN 00009 #define BOOST_TEST_DYN_LINK 00010 00011 #include <boost/test/unit_test.hpp> 00012 #include <jml/arch/futex.h> 00013 #include "soa/service/redis.h" 00014 #include "soa/service/testing/redis_temporary_server.h" 00015 00016 #include "rtbkit/core/banker/account.h" 00017 #include "rtbkit/core/banker/master_banker.h" 00018 00019 using namespace std; 00020 00021 using namespace Datacratic; 00022 using namespace RTBKIT; 00023 using namespace Redis; 00024 00025 BOOST_AUTO_TEST_CASE( test_redis_persistence_loadall ) 00026 { 00027 RedisTemporaryServer redis; 00028 std::shared_ptr<AsyncConnection> connection 00029 = std::make_shared<AsyncConnection>(redis); 00030 RedisBankerPersistence storage(connection); 00031 int done(false); 00032 00033 /* test the behaviour of rp with an empty database */ 00034 auto OnLoaded_EmptyRedis = [&] (std::shared_ptr<Accounts> accounts, 00035 BankerPersistence::PersistenceCallbackStatus status, 00036 const string & info) { 00037 BOOST_CHECK_EQUAL(status, BankerPersistence::SUCCESS); 00038 BOOST_CHECK_EQUAL(info, ""); 00039 00040 /* "banker:accounts" does not exist */ 00041 vector<AccountKey> acctKeys = accounts->getAccountKeys(); 00042 BOOST_CHECK_EQUAL(acctKeys.size(), 0); 00043 done = true; 00044 ML::futex_wake(done); 00045 }; 00046 storage.loadAll("", OnLoaded_EmptyRedis); 00047 while (!done) { 00048 ML::futex_wait(done, false); 00049 } 00050 00051 /* wrong value type for banker:accounts */ 00052 connection->exec(SET("banker:accounts", "myvalue")); 00053 done = false; 00054 auto OnLoaded_BadBankerAccounts1 00055 = [&] (std::shared_ptr<Accounts> accounts, 00056 BankerPersistence::PersistenceCallbackStatus status, 00057 const string & info) { 00058 /* this could be a DATA_INCONSISTENCY error, but it is handled 00059 directly by the backend */ 00060 BOOST_CHECK_EQUAL(status, BankerPersistence::BACKEND_ERROR); 00061 BOOST_CHECK(info.length() != 0); /* we ignore the actual message */ 00062 done = true; 00063 ML::futex_wake(done); 00064 }; 00065 storage.loadAll("", OnLoaded_BadBankerAccounts1); 00066 while (!done) { 00067 ML::futex_wait(done, false); 00068 } 00069 connection->exec(DEL("banker:accounts")); 00070 00071 /* nil/void entries in banker:accounts */ 00072 Command sadd(SADD("banker:accounts")); 00073 sadd.addArg("account1"); 00074 connection->exec(sadd); 00075 done = false; 00076 auto OnLoaded_BadBankerAccounts2 00077 = [&] (std::shared_ptr<Accounts> accounts, 00078 BankerPersistence::PersistenceCallbackStatus status, 00079 const string & info) { 00080 BOOST_CHECK_EQUAL(status, BankerPersistence::DATA_INCONSISTENCY); 00081 BOOST_CHECK(info.length() != 0); /* we ignore the actual message */ 00082 // cerr << "error = " + info << endl; 00083 done = true; 00084 ML::futex_wake(done); 00085 }; 00086 storage.loadAll("", OnLoaded_BadBankerAccounts2); 00087 while (!done) { 00088 ML::futex_wait(done, false); 00089 } 00090 00091 /* valid account1 */ 00092 Account account1; 00093 account1.type = AT_BUDGET; 00094 account1.budgetIncreases = Amount(MicroUSD(123456)); 00095 account1.budgetDecreases = Amount(MicroUSD(0)); 00096 account1.recycledIn = Amount(MicroUSD(987654)); 00097 account1.allocatedIn = Amount(MicroUSD(4567)); 00098 account1.commitmentsRetired = Amount(MicroUSD(898989)); 00099 account1.adjustmentsIn = Amount(MicroUSD(17171717)); 00100 account1.recycledOut = Amount(MicroUSD(64949494)); 00101 account1.allocatedOut = Amount(MicroUSD(8778777)); 00102 account1.commitmentsMade = Amount(MicroUSD(10101010)); 00103 account1.adjustmentsOut = Amount(MicroUSD(9999999)); 00104 account1.spent = Amount(MicroUSD(10111213)); 00105 account1.balance = ((account1.budgetIncreases + account1.recycledIn 00106 + account1.allocatedIn + account1.commitmentsRetired 00107 + account1.adjustmentsIn) 00108 - (account1.spent + account1.recycledOut 00109 + account1.allocatedOut + account1.commitmentsMade 00110 + account1.adjustmentsOut)); 00111 00112 Json::Value account1Json(account1.toJson()); 00113 connection->exec(SET("banker-account1", account1Json.toString())); 00114 connection->exec(SET("banker-account2", account1Json.toString())); 00115 done = false; 00116 auto OnLoaded_ValidAccount 00117 = [&] (std::shared_ptr<Accounts> accounts, 00118 BankerPersistence::PersistenceCallbackStatus status, 00119 const string & info) { 00120 BOOST_CHECK_EQUAL(status, BankerPersistence::SUCCESS); 00121 /* error message should be empty */ 00122 BOOST_CHECK(info.length() == 0); 00123 00124 /* only returned account = "account1" */ 00125 std::vector<AccountKey> accountKeys = accounts->getAccountKeys(); 00126 BOOST_CHECK_EQUAL(accountKeys.size(), 1); 00127 BOOST_CHECK_EQUAL(accountKeys[0].toString(), "account1"); 00128 00129 Account storedAccount = accounts->getAccount(accountKeys[0]); 00130 Json::Value storedAccountJson = storedAccount.toJson(); 00131 BOOST_CHECK_EQUAL(account1Json, storedAccountJson); 00132 done = true; 00133 ML::futex_wake(done); 00134 }; 00135 storage.loadAll("", OnLoaded_ValidAccount); 00136 while (!done) { 00137 ML::futex_wait(done, false); 00138 } 00139 } 00140 00141 BOOST_AUTO_TEST_CASE( test_redis_persistence_saveall ) 00142 { 00143 RedisTemporaryServer redis; 00144 std::shared_ptr<AsyncConnection> connection 00145 = std::make_shared<AsyncConnection>(redis); 00146 RedisBankerPersistence storage(connection); 00147 int done(false); 00148 00149 /* generic callback for all subsequent saveAll invocations */ 00150 BankerPersistence::PersistenceCallbackStatus lastStatus; 00151 string lastInfo; 00152 auto OnSavedCallback 00153 = [&] (BankerPersistence::PersistenceCallbackStatus status, 00154 const string & info) { 00155 lastStatus = status; 00156 lastInfo = info; 00157 done = true; 00158 ML::futex_wake(done); 00159 }; 00160 00161 /* basic account setup */ 00162 Accounts accounts; 00163 AccountKey parentKey("parent"), childKey("parent:child"); 00164 accounts.createAccount(parentKey, AT_BUDGET); 00165 accounts.createAccount(childKey, AT_SPEND); 00166 accounts.setBudget(parentKey, MicroUSD(123456)); 00167 accounts.setBalance(childKey, MicroUSD(1234), AT_NONE); 00168 00169 /* 1. we save an account that does not exist yet in the storage */ 00170 storage.saveAll(accounts, OnSavedCallback); 00171 while (!done) { 00172 ML::futex_wait(done, false); 00173 } 00174 00175 /* this operation should succeed */ 00176 BOOST_CHECK_EQUAL(lastStatus, BankerPersistence::SUCCESS); 00177 BOOST_CHECK(lastInfo.length() == 0); 00178 00179 /* the new accounts should have been registered in the "banker:accounts" 00180 set */ 00181 Redis::Result result = connection->exec(SMEMBERS("banker:accounts"), 5); 00182 BOOST_CHECK(result.ok()); 00183 const Reply & keysReply = result.reply(); 00184 BOOST_CHECK_EQUAL(keysReply.type(), ARRAY); 00185 BOOST_CHECK_EQUAL(keysReply.length(), 2); 00186 BOOST_CHECK((keysReply[0].asString() == "parent" 00187 && keysReply[1].asString() == "parent:child") 00188 || (keysReply[0].asString() == "parent:child" 00189 && keysReply[1].asString() == "parent")); 00190 00191 /* make sure that the correct data has been stored for "parent" */ 00192 result = connection->exec(GET("banker-parent"), 5); 00193 BOOST_CHECK(result.ok()); 00194 const Reply & parentReply = result.reply(); 00195 BOOST_CHECK_EQUAL(parentReply.type(), STRING); 00196 Json::Value accountJson(accounts.getAccount(parentKey).toJson()); 00197 accountJson["spent-tracking"] = Json::Value(Json::objectValue); 00198 Json::Value storageJson = Json::parse(parentReply.asString()); 00199 BOOST_CHECK_EQUAL(accountJson, storageJson); 00200 00201 /* make sure that the correct data has been stored for "parent:child" */ 00202 result = connection->exec(GET("banker-parent:child"), 5); 00203 BOOST_CHECK(result.ok()); 00204 const Reply & childReply = result.reply(); 00205 BOOST_CHECK_EQUAL(childReply.type(), STRING); 00206 accountJson = accounts.getAccount(childKey).toJson(); 00207 storageJson = Json::parse(childReply.asString()); 00208 BOOST_CHECK_EQUAL(accountJson, storageJson); 00209 00210 /* 2. we update an existing account and reperform the same tests */ 00211 accounts.importSpend(childKey, MicroUSD(123)); 00212 done = false; 00213 storage.saveAll(accounts, OnSavedCallback); 00214 while (!done) { 00215 ML::futex_wait(done, false); 00216 } 00217 00218 /* this operation should succeed */ 00219 BOOST_CHECK_EQUAL(lastStatus, BankerPersistence::SUCCESS); 00220 BOOST_CHECK(lastInfo.length() == 0); 00221 00222 /* the same accounts should be registered in the "banker:accounts" set */ 00223 result = connection->exec(SMEMBERS("banker:accounts"), 5); 00224 BOOST_CHECK(result.ok()); 00225 const Reply & retryKeysReply = result.reply(); 00226 BOOST_CHECK_EQUAL(retryKeysReply.type(), ARRAY); 00227 BOOST_CHECK_EQUAL(retryKeysReply.length(), 2); 00228 BOOST_CHECK((retryKeysReply[0].asString() == "parent" 00229 && retryKeysReply[1].asString() == "parent:child") 00230 || (retryKeysReply[0].asString() == "parent:child" 00231 && retryKeysReply[1].asString() == "parent")); 00232 00233 /* make sure "parent:child" has been properly updated */ 00234 result = connection->exec(GET("banker-parent:child"), 5); 00235 BOOST_CHECK(result.ok()); 00236 const Reply & updatedChildReply = result.reply(); 00237 BOOST_CHECK_EQUAL(updatedChildReply.type(), STRING); 00238 accountJson = accounts.getAccount(childKey).toJson(); 00239 storageJson = Json::parse(updatedChildReply.asString()); 00240 BOOST_CHECK_EQUAL(accountJson, storageJson); 00241 00242 /* 3. we save another instance of the save accounts, like when two bankers 00243 * are running concurrently, and test the error reporting */ 00244 00245 /* we save the "future" copy */ 00246 Accounts accounts2 = accounts; 00247 accounts2.importSpend(childKey, MicroUSD(123)); 00248 done = false; 00249 accountJson = accounts.getAccount(childKey).toJson(); 00250 storage.saveAll(accounts2, OnSavedCallback); 00251 while (!done) { 00252 ML::futex_wait(done, false); 00253 } 00254 /* this operation should succeed */ 00255 BOOST_CHECK_EQUAL(lastStatus, BankerPersistence::SUCCESS); 00256 BOOST_CHECK(lastInfo.length() == 0); 00257 /* make sure "parent:child" has been properly updated with the value from 00258 * accounts2 */ 00259 result = connection->exec(GET("banker-parent:child"), 5); 00260 BOOST_CHECK(result.ok()); 00261 const Reply & updatedChild2Reply = result.reply(); 00262 BOOST_CHECK_EQUAL(updatedChild2Reply.type(), STRING); 00263 accountJson = accounts2.getAccount(childKey).toJson(); 00264 storageJson = Json::parse(updatedChild2Reply.asString()); 00265 BOOST_CHECK_EQUAL(accountJson, storageJson); 00266 00267 /* we attempt to save a previous state */ 00268 done = false; 00269 storage.saveAll(accounts, OnSavedCallback); 00270 while (!done) { 00271 ML::futex_wait(done, false); 00272 } 00273 /* this operation should fail */ 00274 BOOST_CHECK_EQUAL(lastStatus, BankerPersistence::DATA_INCONSISTENCY); 00275 /* account "parent:child" is reported as out of sync */ 00276 BOOST_CHECK_EQUAL(lastInfo, "[\"parent:child\"]"); 00277 00278 /* 4. we ensure that accounts marked as out of sync are silently 00279 * ignored */ 00280 Json::Value expectedStorageJson = storageJson; 00281 00282 accounts2.importSpend(childKey, MicroUSD(12)); 00283 accounts2.markAccountOutOfSync(childKey); 00284 00285 done = false; 00286 storage.saveAll(accounts2, OnSavedCallback); 00287 while (!done) { 00288 ML::futex_wait(done, false); 00289 } 00290 /* this operation should succeed */ 00291 BOOST_CHECK_EQUAL(lastStatus, BankerPersistence::SUCCESS); 00292 BOOST_CHECK(lastInfo.length() == 0); 00293 00294 result = connection->exec(GET("banker-parent:child"), 5); 00295 BOOST_CHECK(result.ok()); 00296 const Reply &outOfSyncChildReply = result.reply(); 00297 BOOST_CHECK_EQUAL(outOfSyncChildReply.type(), STRING); 00298 storageJson = Json::parse(outOfSyncChildReply.asString()); 00299 00300 /* the last expense of 12 mUSD must not be present in the stored account */ 00301 BOOST_CHECK_EQUAL(expectedStorageJson, storageJson); 00302 }