RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* redis_async_test.cc 00002 Jeremy Barnes, 1 December 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Test for our Redis class. 00006 */ 00007 00008 00009 #define BOOST_TEST_MAIN 00010 #define BOOST_TEST_DYN_LINK 00011 00012 #include "soa/service/redis.h" 00013 #include "soa/service/testing/redis_temporary_server.h" 00014 #include <boost/test/unit_test.hpp> 00015 #include "jml/utils/smart_ptr_utils.h" 00016 #include "jml/utils/string_functions.h" 00017 #include "jml/arch/atomic_ops.h" 00018 #include <boost/thread.hpp> 00019 #include <boost/thread/barrier.hpp> 00020 #include <boost/function.hpp> 00021 #include <iostream> 00022 #include "jml/arch/atomic_ops.h" 00023 #include "jml/arch/timers.h" 00024 #include <linux/futex.h> 00025 #include <unistd.h> 00026 #include <sys/syscall.h> 00027 #include "jml/arch/futex.h" 00028 00029 using namespace std; 00030 using namespace Datacratic; 00031 using namespace ML; 00032 00033 namespace Redis { 00034 00035 extern size_t requestDataCreated; 00036 extern size_t requestDataDestroyed; 00037 extern size_t eventLoopsCreated; 00038 extern size_t eventLoopsDestroyed; 00039 00040 } // namespace Redis 00041 00042 using namespace Redis; 00043 00044 00045 BOOST_AUTO_TEST_CASE( test_redis_async ) 00046 { 00047 RedisTemporaryServer redis; 00048 00049 Redis::AsyncConnection connection(redis); 00050 00051 boost::mutex m; 00052 m.lock(); 00053 00054 auto onResult = [&] (const Redis::Result & result) 00055 { 00056 if (result) { 00057 auto reply = result.reply(); 00058 BOOST_CHECK_EQUAL(reply.type(), Redis::STATUS); 00059 BOOST_CHECK_EQUAL(reply.asString(), "OK"); 00060 m.unlock(); 00061 } 00062 else { 00063 BOOST_CHECK(false); 00064 cerr << "got error " << result.error() << endl; 00065 m.unlock(); 00066 } 00067 }; 00068 00069 connection.queue(SET("hello", "world"), onResult); 00070 00071 m.lock(); 00072 00073 BOOST_CHECK_EQUAL(connection.numRequestsPending(), 0); 00074 00075 auto onResult2 = [&] (const Redis::Result & result) 00076 { 00077 if (result) { 00078 auto reply = result.reply(); 00079 BOOST_CHECK_EQUAL(reply.type(), Redis::STRING); 00080 BOOST_CHECK_EQUAL(reply.asString(), "world"); 00081 cerr << "got reply " << reply << endl; 00082 m.unlock(); 00083 } 00084 else { 00085 BOOST_CHECK(false); 00086 cerr << "got error " << result.error() << endl; 00087 m.unlock(); 00088 } 00089 }; 00090 00091 connection.queue(GET("hello"), onResult2); 00092 00093 m.lock(); 00094 00095 BOOST_CHECK_EQUAL(connection.numRequestsPending(), 0); 00096 BOOST_CHECK_EQUAL(requestDataCreated, requestDataDestroyed); 00097 00098 bool hadTimeout = false; 00099 00100 auto onResult3 = [&] (const Redis::Result & result) 00101 { 00102 if (result) { 00103 m.unlock(); 00104 } 00105 else { 00106 if (result.error() == "timeout") 00107 hadTimeout = true; 00108 m.unlock(); 00109 } 00110 }; 00111 00112 connection.queue(GET("hello"), onResult3, 0.0); 00113 00114 m.lock(); 00115 BOOST_CHECK(hadTimeout); 00116 BOOST_CHECK_EQUAL(connection.numRequestsPending(), 0); 00117 BOOST_CHECK_EQUAL(connection.numTimeoutsPending(), 0); 00118 BOOST_CHECK_EQUAL(requestDataCreated, requestDataDestroyed); 00119 #if 0 00120 hadTimeout = false; 00121 00122 connection.queue(onReply3, onError, 0.000001, onTimeout3, "GET hello"); 00123 00124 m.lock(); 00125 ML::sleep(0.1); 00126 00127 BOOST_CHECK(hadTimeout); 00128 BOOST_CHECK_EQUAL(connection.numRequestsPending(), 0); 00129 BOOST_CHECK_EQUAL(connection.numTimeoutsPending(), 0); 00130 BOOST_CHECK_EQUAL(requestDataCreated, requestDataDestroyed); 00131 #endif 00132 hadTimeout = false; 00133 00134 connection.queue(GET("hello"), onResult2, 10.0); 00135 00136 m.lock(); 00137 00138 BOOST_CHECK(!hadTimeout); 00139 BOOST_CHECK_EQUAL(connection.numRequestsPending(), 0); 00140 BOOST_CHECK_EQUAL(connection.numTimeoutsPending(), 0); 00141 BOOST_CHECK_EQUAL(requestDataCreated, requestDataDestroyed); 00142 unsigned numReplies = 0; 00143 00144 auto onResult4 = [&] (const Redis::Results & results) 00145 { 00146 BOOST_CHECK_EQUAL(results.size(), 5); 00147 if (!results) { 00148 cerr << "got reply " << results << endl; 00149 BOOST_CHECK(false); 00150 m.unlock(); 00151 } 00152 else { 00153 numReplies++; 00154 cerr << "Got replies from multi command" << results << endl; 00155 m.unlock(); 00156 } 00157 }; 00158 00159 vector<Redis::Command> commands = { 00160 MULTI, 00161 SET("lazy", "fox"), 00162 SET("jumped", "over"), 00163 SET("quick", "brown"), 00164 EXEC 00165 }; 00166 00167 connection.queueMulti(commands, onResult4); 00168 m.lock(); 00169 BOOST_CHECK_EQUAL(numReplies, 1) ; 00170 00171 commands = { 00172 GET("lazy"), 00173 GET("jumped"), 00174 GET("quick") 00175 }; 00176 00177 auto onResult5 = [&] (const Redis::Results & results) 00178 { 00179 BOOST_CHECK_EQUAL(results.size(), 3); 00180 if (!results) { 00181 cerr << "got reply " << results << endl; 00182 BOOST_CHECK(false); 00183 m.unlock(); 00184 } 00185 else { 00186 cerr << "Got replies from multi command " << results << endl; 00187 cerr << results.reply(0) << endl; 00188 cerr << results.reply(1) << endl; 00189 cerr << results.reply(2) << endl; 00190 BOOST_CHECK_EQUAL(results.reply(0).asString(), "fox"); 00191 BOOST_CHECK_EQUAL(results.reply(1).asString(), "over"); 00192 BOOST_CHECK_EQUAL(results.reply(2).asString(), "brown"); 00193 m.unlock(); 00194 } 00195 }; 00196 00197 connection.queueMulti(commands, onResult5); 00198 m.lock(); 00199 } 00200 00201 #if 1 00202 BOOST_AUTO_TEST_CASE( test_redis_mt ) 00203 { 00204 using namespace Redis; 00205 00206 volatile bool finished = false; 00207 int nthreads = 4; 00208 00209 RedisTemporaryServer redis; 00210 Redis::AsyncConnection connection(redis); 00211 00212 uint64_t numErrors = 0; 00213 uint64_t numRequests = 0; 00214 00215 auto doRedisThread = [&] (int threadNum) 00216 { 00217 cerr << "doRedisThread" << endl; 00218 00219 volatile int pending = 0; 00220 int wait = 0; 00221 00222 auto finishedRequest = [&] () 00223 { 00224 if (__sync_add_and_fetch(&pending, -1) == 100) 00225 futex_wake(wait); 00226 }; 00227 00228 auto onError = [&] (const std::string & error) 00229 { 00230 cerr << "error: " << error << endl; 00231 ML::atomic_inc(numErrors); 00232 finishedRequest(); 00233 }; 00234 00235 auto onTimeout = [&] () 00236 { 00237 cerr << "got timeout" << endl; 00238 finishedRequest(); 00239 }; 00240 00241 while (!finished) { 00242 //cerr << "doing request" << endl; 00243 int rand = random() % 100000; 00244 00245 string key = ML::format("testkey%d", rand); 00246 Redis::AsyncConnection * redisPtr = &connection; 00247 00248 auto onReply2 = [=] (const Redis::Result & result) 00249 { 00250 if (!result) 00251 onError(result.error()); 00252 else 00253 finishedRequest(); 00254 }; 00255 00256 auto onReply1 = [=] (const Redis::Result & result) 00257 { 00258 if (!result) 00259 onError(result.error()); 00260 else 00261 redisPtr->queue(GET(key), onReply2, 5.0); 00262 }; 00263 00264 ML::atomic_inc(numRequests); 00265 if (__sync_fetch_and_add(&pending, 1) == 2000) 00266 futex_wait(wait, 0); 00267 00268 connection.queue(SET(key, rand), onReply1, 5.0); 00269 } 00270 00271 while (pending != 0) ; 00272 }; 00273 00274 boost::thread_group tg; 00275 00276 for (unsigned i = 0; i < nthreads; ++i) 00277 tg.create_thread(boost::bind<void>(doRedisThread, i)); 00278 00279 ML::sleep(1.0); 00280 finished = true; 00281 00282 tg.join_all(); 00283 00284 cerr << "numRequests = " << numRequests << endl; 00285 00286 BOOST_CHECK_EQUAL(connection.numRequestsPending(), 0); 00287 BOOST_CHECK_EQUAL(connection.numTimeoutsPending(), 0); 00288 BOOST_CHECK_EQUAL(requestDataCreated, requestDataDestroyed); 00289 00290 } 00291 #endif