RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
core/banker/testing/banker_behaviour_test.cc
00001 /* banker_behaviour_test.cc
00002    Wolfgang Sourdeau, 20 December 2012
00003    Copyright (c) 2012 Datacratic Inc.  All rights reserved.
00004 
00005    Functional tests of the master and slave bankers
00006 */
00007 
00008 #define BOOST_TEST_MAIN
00009 #define BOOST_TEST_DYN_LINK
00010 
00011 #include <unordered_set>
00012 
00013 #include "boost/test/unit_test.hpp"
00014 #include "jml/arch/format.h"
00015 
00016 #include "soa/service/service_base.h"
00017 #include "soa/service/testing/redis_temporary_server.h"
00018 #include "soa/service/testing/zookeeper_temporary_server.h"
00019 
00020 #include "rtbkit/core/banker/master_banker.h"
00021 #include "rtbkit/core/banker/slave_banker.h"
00022 
00023 #include "banker_temporary_server.h"
00024 
00025 using namespace std;
00026 
00027 using namespace Datacratic;
00028 using namespace RTBKIT;
00029 using namespace Redis;
00030 
00031 namespace {
00032 
00033 void
00034 RedisReplyFillSetOfStrings(const Redis::Reply & reply,
00035                            unordered_set<string> & aSet) {
00036     ExcAssert(reply.type() == Redis::ARRAY);
00037     for (int i = 0; i < reply.length(); i++) {
00038         aSet.insert(reply[i].asString());
00039     }
00040 }
00041 
00042 }
00043 
00044 #if 0
00045 /* make sure that requests from the slave banker are properly deferred when
00046  * the master banker is not available */
00047 BOOST_AUTO_TEST_CASE( test_banker_deferred )
00048 {
00049     ZooKeeper::TemporaryServer zookeeper;
00050     zookeeper.start();
00051 
00052     auto proxies = std::make_shared<ServiceProxies>();
00053     proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort()));
00054 
00055     /* spawn slave */
00056     SlaveBanker slave(proxies->zmqContext);
00057     slave.init(proxies->config, "slave");
00058     slave.start();
00059 
00060     /* invoke addSpendAccount but wait 3 seconds before spawning master */
00061     int done(false);
00062     auto onSpendAdded = [&] (std::exception_ptr exc, ShadowAccount &&shadowAccount) {
00063         cerr << "(slave) onSpendAdded..." << endl;
00064         done = true;
00065         ML::futex_wake(done);
00066     };
00067     slave.addSpendAccount({"hello", "world"}, CurrencyPool(), onSpendAdded);
00068     ML::sleep(1);
00069 
00070     /* spawn master */
00071     RedisTemporaryServer redis;
00072     BankerTemporaryServer master(redis, ML::format("localhost:%d", zookeeper.getPort()));
00073 
00074     /* wait for the addSpendAccount op to complete */
00075     while (!done) {
00076         ML::futex_wait(done, false);
00077     }
00078 
00079     /* shut down the master banker now to force a write to redis */
00080     master.shutdown();
00081 
00082     /* check that "hello:world" has been saved */
00083     auto connection = std::make_shared<AsyncConnection>(redis);
00084     Redis::Result result = connection->exec(SMEMBERS("banker:accounts"), 5);
00085     BOOST_CHECK(result.ok());
00086     const Reply & keysReply = result.reply();
00087     unordered_set<string> keys;
00088     RedisReplyFillSetOfStrings(keysReply, keys);
00089     BOOST_CHECK_EQUAL(keys.count("hello:world"), 1);
00090 }
00091 #endif
00092 
00093 /* make sure that the slave banker does create a spend account, even when a
00094  * "legacyImported" account exists */
00095 BOOST_AUTO_TEST_CASE( test_banker_slave_banker_accounts )
00096 {
00097     ZooKeeper::TemporaryServer zookeeper;
00098     zookeeper.start();
00099 
00100     auto proxies = std::make_shared<ServiceProxies>();
00101     proxies->useZookeeper(ML::format("localhost:%d", zookeeper.getPort()));
00102 
00103     /* spawn master */
00104     RedisTemporaryServer redis;
00105 
00106     /* create some initial accounts */
00107     {
00108         MasterBanker master(proxies, "initBanker");
00109         auto storage = make_shared<RedisBankerPersistence>(redis);
00110         master.init(storage);
00111         master.monitorProviderClient.inhibit_ = true;
00112         master.start();
00113 
00114         master.accounts.createAccount({"top"}, AT_BUDGET);
00115         master.accounts.setBudget({"top"}, MicroUSD(15729914170));
00116         master.accounts.setBalance({"top", "sub"}, MicroUSD(11005485799),
00117                                    AT_BUDGET);
00118         master.accounts.setBalance({"top", "sub", "legacyImported"},
00119                                    MicroUSD(10991979974),
00120                                    AT_SPEND);
00121         master.accounts.importSpend({"top", "sub", "legacyImported"},
00122                                     MicroUSD(10991979974));
00123         /* save the initial accounts state (the async save is guaranteed to
00124          * be over when "master" is destroyed) */
00125         master.saveState();
00126     }
00127 
00128     /* instantiate a slave and add a spend account from it */
00129     {
00130         /* spawn a master banker server process */
00131         BankerTemporaryServer master(redis, ML::format("localhost:%d", zookeeper.getPort()));
00132         ML::sleep(2);
00133 
00134         /* spawn slave */
00135         SlaveBanker slave(proxies->zmqContext);
00136         slave.init(proxies->config, "slaveBanker");
00137         slave.start();
00138 
00139         slave.addSpendAccountSync({"top", "sub"});
00140         
00141         ML::sleep(2);
00142     }
00143 
00144     /* ensure that "top:sub:slaveBanker" has been created. If not, an
00145      * exception will be thrown. */
00146     {
00147         MasterBanker master(proxies, "initBanker2");
00148         auto storage = make_shared<RedisBankerPersistence>(redis);
00149         master.init(storage);
00150         master.monitorProviderClient.inhibit_ = true;
00151         master.start();
00152         auto account
00153             = master.accounts.getAccount({"top", "sub", "slaveBanker"});
00154         cerr << "account: " << account;
00155     }
00156 }
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator