RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/testing/redis_async_test.cc
00001 /* redis_async_test.cc
00002    Jeremy Barnes, 1 December 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Test for our Redis class.
00006 */
00007 
00008 
00009 #define BOOST_TEST_MAIN
00010 #define BOOST_TEST_DYN_LINK
00011 
00012 #include "soa/service/redis.h"
00013 #include "soa/service/testing/redis_temporary_server.h"
00014 #include <boost/test/unit_test.hpp>
00015 #include "jml/utils/smart_ptr_utils.h"
00016 #include "jml/utils/string_functions.h"
00017 #include "jml/arch/atomic_ops.h"
00018 #include <boost/thread.hpp>
00019 #include <boost/thread/barrier.hpp>
00020 #include <boost/function.hpp>
00021 #include <iostream>
00022 #include "jml/arch/atomic_ops.h"
00023 #include "jml/arch/timers.h"
00024 #include <linux/futex.h>
00025 #include <unistd.h>
00026 #include <sys/syscall.h>
00027 #include "jml/arch/futex.h"
00028 
00029 using namespace std;
00030 using namespace Datacratic;
00031 using namespace ML;
00032 
00033 namespace Redis {
00034 
00035 extern size_t requestDataCreated;
00036 extern size_t requestDataDestroyed;
00037 extern size_t eventLoopsCreated;
00038 extern size_t eventLoopsDestroyed;
00039 
00040 } // namespace Redis
00041 
00042 using namespace Redis;
00043 
00044 
00045 BOOST_AUTO_TEST_CASE( test_redis_async )
00046 {
00047     RedisTemporaryServer redis;
00048 
00049     Redis::AsyncConnection connection(redis);
00050     
00051     boost::mutex m;
00052     m.lock();
00053 
00054     auto onResult = [&] (const Redis::Result & result)
00055         {
00056             if (result) {
00057                 auto reply = result.reply();
00058                 BOOST_CHECK_EQUAL(reply.type(), Redis::STATUS);
00059                 BOOST_CHECK_EQUAL(reply.asString(), "OK");
00060                 m.unlock();
00061             }
00062             else {
00063                 BOOST_CHECK(false);
00064                 cerr << "got error " << result.error() << endl;
00065                 m.unlock();
00066             }
00067         };
00068 
00069     connection.queue(SET("hello", "world"), onResult);
00070     
00071     m.lock();
00072 
00073     BOOST_CHECK_EQUAL(connection.numRequestsPending(), 0);
00074 
00075     auto onResult2 = [&] (const Redis::Result & result)
00076         {
00077             if (result) {
00078                 auto reply = result.reply();
00079                 BOOST_CHECK_EQUAL(reply.type(), Redis::STRING);
00080                 BOOST_CHECK_EQUAL(reply.asString(), "world");
00081                 cerr << "got reply " << reply << endl;
00082                 m.unlock();
00083             }
00084             else {
00085                 BOOST_CHECK(false);
00086                 cerr << "got error " << result.error() << endl;
00087                 m.unlock();
00088             }
00089         };
00090 
00091     connection.queue(GET("hello"), onResult2);
00092     
00093     m.lock();
00094 
00095     BOOST_CHECK_EQUAL(connection.numRequestsPending(), 0);
00096     BOOST_CHECK_EQUAL(requestDataCreated, requestDataDestroyed);
00097 
00098     bool hadTimeout = false;
00099 
00100     auto onResult3 = [&] (const Redis::Result & result)
00101         {
00102             if (result) {
00103                 m.unlock();
00104             }
00105             else {
00106                 if (result.error() == "timeout")
00107                     hadTimeout = true;
00108                 m.unlock();
00109             }
00110         };
00111 
00112     connection.queue(GET("hello"), onResult3, 0.0);
00113     
00114     m.lock();
00115     BOOST_CHECK(hadTimeout);
00116     BOOST_CHECK_EQUAL(connection.numRequestsPending(), 0);
00117     BOOST_CHECK_EQUAL(connection.numTimeoutsPending(), 0);
00118     BOOST_CHECK_EQUAL(requestDataCreated, requestDataDestroyed);
00119 #if 0
00120     hadTimeout = false;
00121 
00122     connection.queue(onReply3, onError, 0.000001, onTimeout3, "GET hello");
00123 
00124     m.lock();
00125     ML::sleep(0.1);
00126 
00127     BOOST_CHECK(hadTimeout);
00128     BOOST_CHECK_EQUAL(connection.numRequestsPending(), 0);
00129     BOOST_CHECK_EQUAL(connection.numTimeoutsPending(), 0);
00130     BOOST_CHECK_EQUAL(requestDataCreated, requestDataDestroyed);
00131 #endif
00132     hadTimeout = false;
00133     
00134     connection.queue(GET("hello"), onResult2, 10.0);
00135 
00136     m.lock();
00137     
00138     BOOST_CHECK(!hadTimeout);
00139     BOOST_CHECK_EQUAL(connection.numRequestsPending(), 0);
00140     BOOST_CHECK_EQUAL(connection.numTimeoutsPending(), 0);
00141     BOOST_CHECK_EQUAL(requestDataCreated, requestDataDestroyed);
00142     unsigned numReplies = 0;
00143 
00144     auto onResult4 = [&] (const Redis::Results & results)
00145         {
00146             BOOST_CHECK_EQUAL(results.size(), 5);
00147             if (!results) {
00148                 cerr << "got reply " << results << endl;
00149                 BOOST_CHECK(false);
00150                 m.unlock();
00151             }
00152             else {
00153                 numReplies++;
00154                 cerr << "Got replies from multi command" << results << endl;
00155                 m.unlock();
00156             }
00157         };
00158 
00159     vector<Redis::Command> commands = {
00160         MULTI,
00161         SET("lazy", "fox"),
00162         SET("jumped", "over"),
00163         SET("quick", "brown"),
00164         EXEC
00165     };
00166 
00167     connection.queueMulti(commands, onResult4);
00168     m.lock();
00169     BOOST_CHECK_EQUAL(numReplies, 1) ;
00170 
00171     commands = {
00172         GET("lazy"),
00173         GET("jumped"),
00174         GET("quick")
00175     };
00176 
00177     auto onResult5 = [&] (const Redis::Results & results)
00178         {
00179             BOOST_CHECK_EQUAL(results.size(), 3);
00180             if (!results) {
00181                 cerr << "got reply " << results << endl;
00182                 BOOST_CHECK(false);
00183                 m.unlock();
00184             }
00185             else {
00186                 cerr << "Got replies from multi command " << results << endl;
00187                 cerr << results.reply(0) << endl;
00188                 cerr << results.reply(1) << endl;
00189                 cerr << results.reply(2) << endl;
00190                 BOOST_CHECK_EQUAL(results.reply(0).asString(), "fox");
00191                 BOOST_CHECK_EQUAL(results.reply(1).asString(), "over");
00192                 BOOST_CHECK_EQUAL(results.reply(2).asString(), "brown");
00193                 m.unlock();
00194             }
00195         };
00196 
00197     connection.queueMulti(commands, onResult5);
00198     m.lock();
00199 }
00200 
00201 #if 1
00202 BOOST_AUTO_TEST_CASE( test_redis_mt )
00203 {
00204     using namespace Redis;
00205 
00206     volatile bool finished = false;
00207     int nthreads = 4;
00208 
00209     RedisTemporaryServer redis;
00210     Redis::AsyncConnection connection(redis);
00211 
00212     uint64_t numErrors = 0;
00213     uint64_t numRequests = 0;
00214 
00215     auto doRedisThread = [&] (int threadNum)
00216         {
00217             cerr << "doRedisThread" << endl;
00218 
00219             volatile int pending = 0;
00220             int wait = 0;
00221 
00222             auto finishedRequest = [&] ()
00223             {
00224                 if (__sync_add_and_fetch(&pending, -1) == 100)
00225                     futex_wake(wait);
00226             };
00227             
00228             auto onError = [&] (const std::string & error)
00229             {
00230                 cerr << "error: " << error << endl;
00231                 ML::atomic_inc(numErrors);
00232                 finishedRequest();
00233             };
00234             
00235             auto onTimeout = [&] ()
00236             {
00237                 cerr << "got timeout" << endl;
00238                 finishedRequest();
00239             };
00240             
00241             while (!finished) {
00242                 //cerr << "doing request" << endl;
00243                 int rand = random() % 100000;
00244 
00245                 string key = ML::format("testkey%d", rand);
00246                 Redis::AsyncConnection * redisPtr = &connection;
00247 
00248                 auto onReply2 = [=] (const Redis::Result & result)
00249                 {
00250                     if (!result)
00251                         onError(result.error());
00252                     else
00253                         finishedRequest();
00254                 };
00255 
00256                 auto onReply1 = [=] (const Redis::Result & result)
00257                 {
00258                     if (!result)
00259                         onError(result.error());
00260                     else
00261                         redisPtr->queue(GET(key), onReply2, 5.0);
00262                 };
00263 
00264                 ML::atomic_inc(numRequests);
00265                 if (__sync_fetch_and_add(&pending, 1) == 2000)
00266                     futex_wait(wait, 0);
00267                 
00268                 connection.queue(SET(key, rand), onReply1, 5.0);
00269             }
00270 
00271             while (pending != 0) ;
00272         };
00273 
00274     boost::thread_group tg;
00275         
00276     for (unsigned i = 0;  i < nthreads;  ++i)
00277         tg.create_thread(boost::bind<void>(doRedisThread, i));
00278 
00279     ML::sleep(1.0);
00280     finished = true;
00281     
00282     tg.join_all();
00283 
00284     cerr << "numRequests = " << numRequests << endl;
00285 
00286     BOOST_CHECK_EQUAL(connection.numRequestsPending(), 0);
00287     BOOST_CHECK_EQUAL(connection.numTimeoutsPending(), 0);
00288     BOOST_CHECK_EQUAL(requestDataCreated, requestDataDestroyed);
00289 
00290 }
00291 #endif
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator