RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* banker_behaviour_test.cc 00002 Wolfgang Sourdeau, 20 December 2012 00003 Copyright (c) 2012 Datacratic Inc. All rights reserved. 00004 00005 Functional tests of the master and slave bankers 00006 */ 00007 00008 #define BOOST_TEST_MAIN 00009 #define BOOST_TEST_DYN_LINK 00010 00011 #include <unordered_set> 00012 00013 #include "boost/test/unit_test.hpp" 00014 #include "jml/arch/format.h" 00015 00016 #include "soa/service/service_base.h" 00017 #include "soa/service/testing/redis_temporary_server.h" 00018 #include "soa/service/testing/zookeeper_temporary_server.h" 00019 00020 #include "rtbkit/core/banker/master_banker.h" 00021 #include "rtbkit/core/banker/slave_banker.h" 00022 00023 #include "banker_temporary_server.h" 00024 00025 using namespace std; 00026 00027 using namespace Datacratic; 00028 using namespace RTBKIT; 00029 using namespace Redis; 00030 00031 namespace { 00032 00033 void 00034 RedisReplyFillSetOfStrings(const Redis::Reply & reply, 00035 unordered_set<string> & aSet) { 00036 ExcAssert(reply.type() == Redis::ARRAY); 00037 for (int i = 0; i < reply.length(); i++) { 00038 aSet.insert(reply[i].asString()); 00039 } 00040 } 00041 00042 } 00043 00044 #if 0 00045 /* make sure that requests from the slave banker are properly deferred when 00046 * the master banker is not available */ 00047 BOOST_AUTO_TEST_CASE( test_banker_deferred ) 00048 { 00049 ZooKeeper::TemporaryServer zookeeper; 00050 zookeeper.start(); 00051 00052 auto proxies = std::make_shared<ServiceProxies>(); 00053 proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort())); 00054 00055 /* spawn slave */ 00056 SlaveBanker slave(proxies->zmqContext); 00057 slave.init(proxies->config, "slave"); 00058 slave.start(); 00059 00060 /* invoke addSpendAccount but wait 3 seconds before spawning master */ 00061 int done(false); 00062 auto onSpendAdded = [&] (std::exception_ptr exc, ShadowAccount &&shadowAccount) { 00063 cerr << "(slave) onSpendAdded..." << endl; 00064 done = true; 00065 ML::futex_wake(done); 00066 }; 00067 slave.addSpendAccount({"hello", "world"}, CurrencyPool(), onSpendAdded); 00068 ML::sleep(1); 00069 00070 /* spawn master */ 00071 RedisTemporaryServer redis; 00072 BankerTemporaryServer master(redis, ML::format("localhost:%d", zookeeper.getPort())); 00073 00074 /* wait for the addSpendAccount op to complete */ 00075 while (!done) { 00076 ML::futex_wait(done, false); 00077 } 00078 00079 /* shut down the master banker now to force a write to redis */ 00080 master.shutdown(); 00081 00082 /* check that "hello:world" has been saved */ 00083 auto connection = std::make_shared<AsyncConnection>(redis); 00084 Redis::Result result = connection->exec(SMEMBERS("banker:accounts"), 5); 00085 BOOST_CHECK(result.ok()); 00086 const Reply & keysReply = result.reply(); 00087 unordered_set<string> keys; 00088 RedisReplyFillSetOfStrings(keysReply, keys); 00089 BOOST_CHECK_EQUAL(keys.count("hello:world"), 1); 00090 } 00091 #endif 00092 00093 /* make sure that the slave banker does create a spend account, even when a 00094 * "legacyImported" account exists */ 00095 BOOST_AUTO_TEST_CASE( test_banker_slave_banker_accounts ) 00096 { 00097 ZooKeeper::TemporaryServer zookeeper; 00098 zookeeper.start(); 00099 00100 auto proxies = std::make_shared<ServiceProxies>(); 00101 proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort())); 00102 00103 /* spawn master */ 00104 RedisTemporaryServer redis; 00105 00106 /* create some initial accounts */ 00107 { 00108 MasterBanker master(proxies, "initBanker"); 00109 auto storage = make_shared<RedisBankerPersistence>(redis); 00110 master.init(storage); 00111 master.monitorProviderClient.inhibit_ = true; 00112 master.start(); 00113 00114 master.accounts.createAccount({"top"}, AT_BUDGET); 00115 master.accounts.setBudget({"top"}, MicroUSD(15729914170)); 00116 master.accounts.setBalance({"top", "sub"}, MicroUSD(11005485799), 00117 AT_BUDGET); 00118 master.accounts.setBalance({"top", "sub", "legacyImported"}, 00119 MicroUSD(10991979974), 00120 AT_SPEND); 00121 master.accounts.importSpend({"top", "sub", "legacyImported"}, 00122 MicroUSD(10991979974)); 00123 /* save the initial accounts state (the async save is guaranteed to 00124 * be over when "master" is destroyed) */ 00125 master.saveState(); 00126 } 00127 00128 /* instantiate a slave and add a spend account from it */ 00129 { 00130 /* spawn a master banker server process */ 00131 BankerTemporaryServer master(redis, ML::format("localhost:%d", zookeeper.getPort())); 00132 ML::sleep(2); 00133 00134 /* spawn slave */ 00135 SlaveBanker slave(proxies->zmqContext); 00136 slave.init(proxies->config, "slaveBanker"); 00137 slave.start(); 00138 00139 slave.addSpendAccountSync({"top", "sub"}); 00140 00141 ML::sleep(2); 00142 } 00143 00144 /* ensure that "top:sub:slaveBanker" has been created. If not, an 00145 * exception will be thrown. */ 00146 { 00147 MasterBanker master(proxies, "initBanker2"); 00148 auto storage = make_shared<RedisBankerPersistence>(redis); 00149 master.init(storage); 00150 master.monitorProviderClient.inhibit_ = true; 00151 master.start(); 00152 auto account 00153 = master.accounts.getAccount({"top", "sub", "slaveBanker"}); 00154 cerr << "account: " << account; 00155 } 00156 }