RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/redis.cc
00001 /* redis.cc
00002    Jeremy Barnes, 14 November 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Redis functionality.
00006 */
00007 
00008 #include "soa/service/redis.h"
00009 #include "jml/utils/guard.h"
00010 #include <boost/thread.hpp>
00011 #include <poll.h>
00012 #include <unistd.h>
00013 #include <fcntl.h>
00014 #include "jml/arch/atomic_ops.h"
00015 #include "jml/arch/backtrace.h"
00016 #include "jml/arch/futex.h"
00017 #include "jml/utils/vector_utils.h"
00018 
00019 
00020 using namespace std;
00021 using namespace Datacratic;
00022 using namespace ML;
00023 
00024 
00025 
00026 
00027 namespace Redis {
00028 
00029 
00030 const Command PING("PING");
00031 const Command HDEL("HDEL");
00032 const Command HGET("HGET");
00033 const Command HGETALL("HGETALL");
00034 const Command HMGET("HMGET");
00035 const Command HMSET("HMSET");
00036 const Command WATCH("WATCH");
00037 const Command MULTI("MULTI");
00038 const Command EXEC("EXEC");
00039 const Command HSET("HSET");
00040 const Command HINCRBY("HINCRBY");
00041 const Command KEYS("KEYS");             
00042 const Command SET("SET");
00043 const Command MSET("MSET");
00044 const Command GET("GET");
00045 const Command MGET("MGET");
00046 const Command EXPIRE("EXPIRE");
00047 const Command RANDOMKEY("RANDOMKEY");
00048 const Command DEL("DEL");
00049 const Command SADD("SADD");
00050 const Command SMEMBERS("SMEMBERS");
00051 const Command TTL("TTL");
00052 
00053 /*****************************************************************************/
00054 /* REPLY                                                                     */
00055 /*****************************************************************************/
00056 
00057 
00058 ReplyType
00059 Reply::
00060 type() const
00061 {
00062     ExcAssert(r_);
00063     switch (r_->type) {
00064     case REDIS_REPLY_STATUS: return STATUS;
00065     case REDIS_REPLY_ERROR: return ERROR;
00066     case REDIS_REPLY_INTEGER: return INTEGER;
00067     case REDIS_REPLY_NIL: return NIL;
00068     case REDIS_REPLY_STRING: return STRING;
00069     case REDIS_REPLY_ARRAY: return ARRAY;
00070     default:
00071         throw ML::Exception("unknown Redis reply type %d", r_->type);
00072     };
00073 }
00074 
00075 std::string
00076 Reply::
00077 getString() const
00078 {
00079     ExcAssert(r_);
00080     return std::string(r_->str, r_->str + r_->len);
00081 }
00082 
00083 std::string
00084 Reply::
00085 asString() const
00086 {
00087     ExcAssert(r_);
00088     switch (r_->type) {
00089     case REDIS_REPLY_STATUS:
00090     case REDIS_REPLY_STRING:
00091     case REDIS_REPLY_ERROR: return getString();
00092     case REDIS_REPLY_INTEGER: return ML::format("%lli", r_->integer);
00093     case REDIS_REPLY_NIL: return "";
00094     case REDIS_REPLY_ARRAY: return asJson().toString();
00095     default:
00096         throw ML::Exception("unknown Redis reply type");
00097     };
00098 }
00099 
00100 long long
00101 Reply::
00102 asInt() const
00103 {
00104     ExcAssert(r_);
00105     ExcAssertEqual(r_->type, REDIS_REPLY_INTEGER);
00106     return r_->integer;
00107 }
00108 
00109 long long
00110 Reply::
00111 asInt(long long defaultIfNotInteger)
00112 {
00113     ExcAssert(r_);
00114     switch (r_->type) {
00115     case REDIS_REPLY_INTEGER: return r_->integer;
00116     case REDIS_REPLY_STRING: {
00117         std::string s = getString();
00118         char * end = 0;
00119         long long result = strtoll(s.c_str(), &end, 10);
00120         if (end != s.c_str() + s.length())
00121             return defaultIfNotInteger;
00122         return result;
00123     }
00124     case REDIS_REPLY_STATUS:
00125     case REDIS_REPLY_ERROR:
00126     case REDIS_REPLY_NIL:
00127     case REDIS_REPLY_ARRAY: return defaultIfNotInteger;
00128     default:
00129         throw ML::Exception("unknown Redis reply type");
00130     };
00131     
00132 }
00133 
00134 Json::Value
00135 Reply::
00136 asJson() const
00137 {
00138     ExcAssert(r_);
00139     Json::Value result;
00140         
00141     switch (r_->type) {
00142 
00143     case REDIS_REPLY_STATUS:
00144         result["status"] = getString();
00145         return result;
00146 
00147     case REDIS_REPLY_ERROR:
00148         result["error"] = getString();
00149         return result;
00150 
00151     case REDIS_REPLY_INTEGER:
00152         result = (Json::Value::UInt)r_->integer;
00153         return result;
00154 
00155     case REDIS_REPLY_NIL:
00156         return result;
00157 
00158     case REDIS_REPLY_STRING:
00159         result = getString();
00160         return result;
00161 
00162     case REDIS_REPLY_ARRAY:
00163         for (unsigned i = 0;  i < r_->elements;  ++i)
00164             result[i] = (*this)[i].asJson();
00165         return result;
00166                 
00167     default:
00168         throw ML::Exception("unknown Redis reply type ");
00169     };
00170             
00171 }
00172 
00173 Reply
00174 Reply::
00175 deepCopy() const
00176 {
00177     return Reply(doDeepCopy(r_.get()), true);
00178 }
00179 
00180 redisReply *
00181 Reply::
00182 doDeepCopy(redisReply * r)
00183 {
00184     redisReply * result = (redisReply *)malloc(sizeof(redisReply));
00185     memset(result, 0, sizeof(redisReply));
00186     result->type = r->type;
00187 
00188     try {
00189         switch (r->type) {
00190 
00191         case REDIS_REPLY_INTEGER:
00192         case REDIS_REPLY_NIL:
00193             result->integer = r->integer;
00194             break;
00195         case REDIS_REPLY_STATUS:
00196         case REDIS_REPLY_ERROR:
00197         case REDIS_REPLY_STRING: {
00198             // Copy the string
00199             char * str = (char *)malloc(r->len);
00200             result->str = str;
00201             result->len = r->len;
00202             std::copy(r->str, r->str + r->len, str);
00203             break;
00204         }
00205         case REDIS_REPLY_ARRAY: {
00206             // Copy the array
00207             redisReply ** arr
00208                 = (redisReply **)malloc(r->elements * sizeof(redisReply *));
00209             for (unsigned i = 0;  i < r->elements;  ++i)
00210                 arr[i] = 0;
00211             result->element = arr;
00212             result->elements = 0;
00213             for (unsigned i = 0;  i < r->elements;  ++i) {
00214                 result->element[i] = doDeepCopy(r->element[i]);
00215                 result->elements = i + 1;
00216             }
00217             break;
00218         }
00219             
00220         default:
00221             throw ML::Exception("unknown Redis reply type %d", r->type);
00222         };
00223     } catch (...) {
00224         freeReplyObject(result);
00225         throw;
00226     }
00227 
00228     return result;
00229 }
00230 
00231 std::ostream & operator << (std::ostream & stream, const Reply & reply)
00232 {
00233     return stream << reply.asString();
00234 }
00235 
00236 
00237 /*****************************************************************************/
00238 /* RESULTS                                                                   */
00239 /*****************************************************************************/
00240 
00241 const std::string
00242 Result::timeoutError("timeout");
00243 
00244 std::ostream & operator << (std::ostream & stream, const Result & result)
00245 {
00246     if (result) return stream << result.reply().asString();
00247     else return stream << result.error();
00248 }
00249 
00250 std::ostream & operator << (std::ostream & stream, const Results & results)
00251 {
00252     for (unsigned i = 0;  i < results.size();  ++i) {
00253         stream << "  " << i << ": " << results[i] << endl;
00254     }
00255     return stream;
00256 }
00257 
00258 
00259 /*****************************************************************************/
00260 /* COMMAND                                                                   */
00261 /*****************************************************************************/
00262 
00263 #if 0
00264 Command::
00265 Command()
00266 {
00267     va_list ap;
00268     va_start(ap, cmdFormat);
00269     ML::call_guard([&] () { va_end(ap); });
00270 
00271     return vqueue(onSuccess, onFailure, Date::notADate(), OnTimeout(),
00272                   cmdFormat, ap);
00273 }
00274 #endif
00275 
00276 std::ostream & operator << (std::ostream & stream, const Command & command)
00277 {
00278     return stream << command.formatStr << command.args;
00279 }
00280 
00281 /*****************************************************************************/
00282 /* ADDRESS                                                                   */
00283 /*****************************************************************************/
00284 
00285 Address::
00286 Address()
00287 {
00288 }
00289 
00290 Address::
00291 Address(const std::string & uri)
00292     : uri_(uri)
00293 {
00294 }
00295 
00296 Address
00297 Address::
00298 tcp(const std::string & host, int port)
00299 {
00300     if (host.find(':') != string::npos)
00301         throw ML::Exception("invalid host has a colon");
00302     if (host.find('/') != string::npos)
00303         throw ML::Exception("invalid host has a slash");
00304 
00305     return Address(host + ":" + to_string(port));
00306 }
00307 
00308 Address
00309 Address::
00310 unix(const std::string & path)
00311 {
00312     if (path.find(':') != string::npos)
00313         throw ML::Exception("invalid path has a colon");
00314     return Address(path);
00315 }
00316 
00317 bool 
00318 Address::
00319 isUnix() const
00320 {
00321     return !uri_.empty()
00322         && uri_.find(':') == string::npos;
00323 }
00324 
00325 bool
00326 Address::
00327 isTcp() const
00328 {
00329     return !uri_.empty()
00330         && uri_.find(':') != string::npos;
00331     
00332 }
00333 
00334 std::string
00335 Address::
00336 unixPath() const
00337 {
00338     if (!isUnix())
00339         throw ML::Exception("address is not unix");
00340     return uri_;
00341 }
00342 
00343 std::string
00344 Address::
00345 tcpHost() const
00346 {
00347     if (!isTcp())
00348         throw ML::Exception("address is not tcp");
00349     auto pos = uri_.find(':');
00350     ExcAssertNotEqual(pos, string::npos);
00351     return string(uri_, 0, pos);
00352 }
00353 
00354 int
00355 Address::
00356 tcpPort() const
00357 {
00358     if (!isTcp())
00359         throw ML::Exception("address is not tcp");
00360     auto pos = uri_.find(':');
00361     ExcAssertNotEqual(pos, string::npos);
00362     return boost::lexical_cast<int>(string(uri_, pos + 1));
00363 }
00364 
00365 std::string
00366 Address::
00367 uri() const
00368 {
00369     return uri_;
00370 }
00371 
00372 
00373 /*****************************************************************************/
00374 /* ASYNC CONNECTION                                                          */
00375 /*****************************************************************************/
00376 
00377 enum {
00378     WAITING = 0,
00379     REPLIED = 1,
00380     TIMEDOUT = 2
00381 };
00382 
00383 size_t requestDataCreated = 0;
00384 size_t requestDataDestroyed = 0;
00385 
00386 struct AsyncConnection::RequestData
00387     : std::enable_shared_from_this<AsyncConnection::RequestData> {
00388     RequestData()
00389     {
00390         ML::atomic_inc(requestDataCreated);
00391     }
00392 
00393     ~RequestData()
00394     {
00395         ML::atomic_inc(requestDataDestroyed);
00396     }
00397 
00398     OnResult onResult;
00399     std::string command;
00400     Date timeout;
00401     AsyncConnection * connection;
00402     int64_t id;
00403     Requests::iterator requestIterator;
00404     Timeouts::iterator timeoutIterator;
00405     int state;
00406 };
00407 
00408 size_t eventLoopsCreated = 0;
00409 size_t eventLoopsDestroyed = 0;
00410 
00411 struct AsyncConnection::EventLoop {
00412 
00413     int wakeupfd[2];
00414     volatile bool finished;
00415     AsyncConnection * connection;
00416     std::shared_ptr<boost::thread> thread;
00417     pollfd fds[2];
00418     volatile int disconnected;
00419 
00420     EventLoop(AsyncConnection * connection)
00421         : finished(false), connection(connection), disconnected(1)
00422     {
00423         ML::atomic_inc(eventLoopsCreated);
00424         
00425         int res = pipe2(wakeupfd, O_NONBLOCK);
00426         if (res == -1)
00427             throw ML::Exception(errno, "pipe2");
00428 
00429         //cerr << "connection on fd " << connection->context_->c.fd << endl;
00430 
00431 
00432         fds[0].fd = wakeupfd[0];
00433         fds[0].events = POLLIN;
00434         fds[1].fd = connection->context_->c.fd;
00435         fds[1].events = 0;
00436 
00437         registerMe(connection->context_);
00438 
00439         thread.reset(new boost::thread(boost::bind(&EventLoop::run, this)));
00440 
00441 #if 0
00442         char buf[1];
00443         res = read(wakeupfd[0], buf, 1);
00444         if (res == -1)
00445             throw ML::Exception(errno, "read");
00446 #endif
00447     }
00448 
00449     ~EventLoop()
00450     {
00451         //cerr << "DESTROYING EVENT LOOP" << endl;
00452         ML::atomic_inc(eventLoopsDestroyed);
00453         shutdown();
00454     }
00455 
00456     void shutdown()
00457     {
00458         if (!thread) return;
00459 
00460         finished = true;
00461         wakeup();
00462         thread->join();
00463         thread.reset();
00464         ::close(wakeupfd[0]);
00465         ::close(wakeupfd[1]);
00466     }
00467 
00468     void wakeup()
00469     {
00470         int res = write(wakeupfd[1], "x", 1);
00471         if (res == -1)
00472             throw ML::Exception("error waking up fd %d: %s", wakeupfd[1],
00473                                 strerror(errno));
00474     }
00475     
00476     void registerMe(redisAsyncContext * context)
00477     {
00478         //cerr << "called registerMe" << endl;
00479         redisAsyncSetConnectCallback(context, onConnect);
00480         redisAsyncSetDisconnectCallback(context, onDisconnect);
00481 
00482         context->ev.data = context->data = this;
00483         context->ev.addRead = startReading;
00484         context->ev.delRead = stopReading;
00485         context->ev.addWrite = startWriting;
00486         context->ev.delWrite = stopWriting;
00487         context->ev.cleanup = cleanup;
00488     }
00489 
00490     void run()
00491     {
00492         //wakeup();
00493 
00494         //cerr << this << " starting run loop" << endl;
00495 
00496         while (!finished) {
00497             //sleep(1);
00498             Date now = Date::now();
00499 
00500             if (connection->earliestTimeout < now)
00501                 connection->expireTimeouts(now);
00502 
00503             double timeLeft = now.secondsUntil(connection->earliestTimeout);
00504 
00505             //cerr << "timeLeft = " << timeLeft << endl;
00506             //cerr << "fds[0].events = " << fds[0].events << endl;
00507             //cerr << "fds[1].events = " << fds[1].events << endl;
00508 
00509             int timeout = std::min(1000.0,
00510                                    std::max<double>(0, 1000 * timeLeft));
00511 
00512             if (connection->earliestTimeout == Date::positiveInfinity())
00513                 timeout = 1000000;
00514 
00515             //cerr << "looping; fd0 = " << fds[1].fd << " timeout = "
00516             //     << timeout << endl;
00517 
00518             int res = poll(fds, 2, timeout);
00519             if (res == -1 && errno != EINTR) {
00520                 cerr << "poll() error: " << strerror(errno) << endl;
00521             }
00522             if (res == 0) continue;  // just a timeout; loop around again
00523 
00524             //cerr << "poll() returned " << res << endl;
00525 
00526             if (fds[0].revents & POLLIN) {
00527                 //cerr << "got wakeup" << endl;
00528                 char buf[128];
00529                 int res = read(fds[0].fd, buf, 128);
00530                 if (res == -1)
00531                     throw ML::Exception(errno, "read from wakeup pipe");
00532                 //cerr << "woken up with " << res << " messages" << endl;
00533             }
00534             if ((fds[1].revents & POLLOUT)
00535                 && (fds[1].events & POLLOUT)) {
00536                 //cerr << "got write on " << fds[1].fd << endl;
00537                 boost::unique_lock<Lock> guard(connection->lock);
00538                 redisAsyncHandleWrite(connection->context_);
00539             }
00540             if ((fds[1].revents & POLLIN)
00541                 && (fds[1].events & POLLIN)) {
00542                 //cerr << "got read on " << fds[1].fd << endl;
00543                 boost::unique_lock<Lock> guard(connection->lock);
00544                 redisAsyncHandleRead(connection->context_);
00545             }
00546 
00547             // Now we don't have the lock anymore, do our callbacks
00548             while (!connection->replyQueue.empty()) {
00549                 try {
00550                     connection->replyQueue.front()();
00551                 } catch (...) {
00552                     cerr << "warning: redis callback threw" << endl;
00553                     //abort();
00554                 }
00555                 connection->replyQueue.pop_front();
00556             }
00557         }
00558 
00559         if (!disconnected) {
00560             // Disconnect
00561             //cerr << this << " calling redisAsyncDisconnect" << endl;
00562             redisAsyncDisconnect(connection->context_);
00563             //redisAsyncFree(connection->context_);
00564             //cerr << this << " done redisAsyncDisconnect" << endl;
00565         }
00566         
00567         // Wait until we get the callback
00568         while (!disconnected) {
00569             futex_wait(disconnected, 0);
00570         }
00571 
00572         //cerr << this << " now disconnected" << endl;
00573     }
00574 
00575     static void onConnect(const redisAsyncContext * context, int status)
00576     {
00577         EventLoop * eventLoop = reinterpret_cast<EventLoop *>(context->data);
00578 
00579         if (status == REDIS_OK)
00580             eventLoop->onConnect(status);
00581         else {
00582             /* This function will be called with an error status if the
00583                connection failed.  For us it's like a disconnection, so
00584                we call into the disconnect code.
00585             */
00586             cerr << "onConnect: code = " << status << " err = " << context->err
00587                  << " errstr = " << context->errstr << " errno = "
00588                  << strerror(errno) << endl;
00589             eventLoop->onDisconnect(status);
00590         }
00591     }
00592 
00593     void onConnect(int status)
00594     {
00595         //cerr << "connection on fd " << connection->context_->c.fd << endl;
00596         //cerr << "status = " << status << endl;
00597         fds[1].fd = connection->context_->c.fd;
00598         wakeup();
00599         disconnected = 0;
00600         futex_wake(disconnected);
00601     }
00602 
00603     static void onDisconnect(const redisAsyncContext * context, int status)
00604     {
00605         //cerr << "onDisconnect" << endl;
00606         EventLoop * eventLoop = reinterpret_cast<EventLoop *>(context->data);
00607         eventLoop->onDisconnect(status);
00608     }
00609 
00610     void onDisconnect(int status)
00611     {
00612         if (status != REDIS_OK) {
00613             cerr << "disconnection with status " << status << endl;
00614             cerr << "onConnect: code = " << status << " err = "
00615                  << connection->context_->err
00616                  << " errstr = "
00617                  << connection->context_->errstr << " errno = "
00618                  << strerror(errno) << endl;
00619         }
00620         disconnected = 1;
00621         futex_wake(disconnected);
00622     }
00623 
00624     static void startReading(void * privData)
00625     {
00626         EventLoop * eventLoop = reinterpret_cast<EventLoop *>(privData);
00627         eventLoop->startReading();
00628     }
00629 
00630     void startReading()
00631     {
00632         //cerr << "start reading" << endl;
00633         //if (fds[1].events & POLLIN) return;  // already reading
00634         fds[1].events |= POLLIN;
00635         wakeup();
00636     }
00637 
00638     static void stopReading(void * privData)
00639     {
00640         EventLoop * eventLoop = reinterpret_cast<EventLoop *>(privData);
00641         eventLoop->stopReading();
00642     }
00643 
00644     void stopReading()
00645     {
00646         //cerr << "stop reading" << endl;
00647         fds[1].events &= ~POLLIN;
00648     }
00649 
00650     static void startWriting(void * privData)
00651     {
00652         EventLoop * eventLoop = reinterpret_cast<EventLoop *>(privData);
00653         eventLoop->startWriting();
00654     }
00655 
00656     void startWriting()
00657     {
00658         //cerr << "start writing" << endl;
00659         //if (fds[1].events & POLLOUT) return;  // already reading
00660         fds[1].events |= POLLOUT;
00661         wakeup();
00662     }
00663 
00664     static void stopWriting(void * privData)
00665     {
00666         EventLoop * eventLoop = reinterpret_cast<EventLoop *>(privData);
00667         eventLoop->stopWriting();
00668     }
00669 
00670     void stopWriting()
00671     {
00672         //cerr << "stop writing" << endl;
00673         fds[1].events &= ~POLLOUT;
00674     }
00675 
00676     static void cleanup(void * privData)
00677     {
00678         EventLoop * eventLoop = reinterpret_cast<EventLoop *>(privData);
00679         eventLoop->cleanup();
00680     }
00681 
00682     void cleanup()
00683     {
00684         //cerr << this << " doing cleanup" << endl;
00685         //backtrace();
00686     }
00687 };
00688 
00689 
00690 AsyncConnection::
00691 AsyncConnection()
00692     : context_(0), idNum(0)
00693 {
00694 }
00695 
00696 AsyncConnection::
00697 AsyncConnection(const Address & address)
00698     : context_(0), idNum(0)
00699 {
00700     connect(address);
00701 }
00702 
00703 AsyncConnection::
00704 ~AsyncConnection()
00705 {
00706     close();
00707 }
00708 
00709 void
00710 AsyncConnection::
00711 connect(const Address & address)
00712 {
00713     //cerr << "connecting to redis " << address.uri() << endl;
00714 
00715     close();
00716 
00717     this->address = address;
00718 
00719     if (address.isTcp()) {
00720         context_ = redisAsyncConnect(address.tcpHost().c_str(),
00721                                      address.tcpPort());
00722     }
00723     else if (address.isUnix()) {
00724         context_ = redisAsyncConnectUnix(address.unixPath().c_str());
00725     }
00726     else throw ML::Exception("cannot connect to address that is neither tcp "
00727                              "or unix");
00728     checkError("connect");
00729 
00730     eventLoop.reset(new EventLoop(this));
00731 }
00732 
00733 void
00734 AsyncConnection::
00735 test()
00736 {
00737     int done = 0;
00738 
00739     string error;
00740     
00741     auto onResponse = [&] (const Redis::Result & result)
00742         {
00743             if (result) {
00744                 //cerr << "got reply " << result.reply() << endl;
00745             }
00746             else error = result.error();
00747             done = 1;
00748             futex_wake(done);
00749         };
00750 
00751     queue(PING, onResponse, 2.0);
00752 
00753     while (!done)
00754         futex_wait(done, 0);
00755     
00756     if (error != "")
00757         throw ML::Exception("couldn't connect to Redis: " + error);
00758 }
00759 
00760 
00761 
00762 void
00763 AsyncConnection::
00764 close()
00765 {
00766     if (!context_) return;
00767 
00768     if (eventLoop) {
00769         eventLoop->shutdown();
00770         eventLoop.reset();
00771     }
00772     
00773     context_ = 0;
00774 }
00775 
00776 void
00777 AsyncConnection::
00778 resultCallback(redisAsyncContext * context, void * reply, void * privData)
00779 {
00780     //cerr << "resultCallback with reply " << reply << " privData "
00781     //     << privData << endl;
00782     //cerr << "context->err = " << context->err << endl;
00783     //cerr << "context->errstr = " << context->errstr << endl;
00784     //cerr << "context->c.errstr = " << context->c.errstr << endl;
00785 
00786     ExcAssert(privData);
00787 
00788     // Get a shared pointer to our data so it doesn't go away
00789     RequestData * dataRawPtr
00790         = reinterpret_cast<RequestData *>(privData);
00791     std::shared_ptr<RequestData> data
00792         = dataRawPtr->shared_from_this();
00793 
00794     //cerr << "command " << data->command << endl;
00795     //cerr << "reply " << reply << endl;
00796 
00797     // Remove from data structures
00798     AsyncConnection * c = data->connection;
00799 
00800     {
00801         boost::unique_lock<Lock> guard(c->lock);
00802         
00803         if (data->requestIterator != c->requests.end()) {
00804             c->requests.erase(data->requestIterator);
00805             data->requestIterator = c->requests.end();
00806         }
00807         
00808         if (data->timeoutIterator != c->timeouts.end()) {
00809             c->timeouts.erase(data->timeoutIterator);
00810             data->timeoutIterator = c->timeouts.end();
00811             if (c->timeouts.empty())
00812                 c->earliestTimeout = Date::positiveInfinity();
00813             else c->earliestTimeout = c->timeouts.begin()->first;
00814         }
00815 
00816         if (data->state != WAITING) return;  // raced; timeout happened
00817         data->state = REPLIED;
00818     }
00819 
00820 
00821     Result result;
00822 
00823     if (reply) {
00824         Reply replyObj((redisReply *)reply, false /* take ownership */);
00825         //cerr << "reply = " << replyObj << endl;
00826         if (replyObj.type() == ERROR) {
00827             // Command error, return it
00828             result = Result(replyObj.asString());
00829         }
00830         else {
00831             // Result (success)
00832             // We have to take a deep copy since the reply object is owned
00833             // by the caller
00834             result = Result(replyObj.deepCopy());
00835         }
00836     }
00837     else {
00838         // Context encountered an error; return it
00839         result = Result(data->connection->context_->errstr);
00840     }
00841 
00842     // Queue up a reply object so it can be called without the lock held.  If
00843     // we call directly from here, then the lock has to be held and so deadlock
00844     // is possible.
00845     c->replyQueue.push_back(std::bind(data->onResult, result));
00846 }
00847 
00848 int64_t
00849 AsyncConnection::
00850 queue(const Command & command,
00851       const OnResult & onResult,
00852       Timeout timeout)
00853 {
00854     boost::unique_lock<Lock> guard(lock);
00855 
00856     ExcAssert(context_);
00857     ExcAssert(!context_->err);
00858     
00859     // Check basics
00860     if (timeout.expiry.isADate() && Date::now() >= timeout.expiry) {
00861         onResult(Result(Result::timeoutError));
00862         return -1;
00863     }
00864 
00865     int64_t id = idNum++;
00866     
00867     // Create data structure to be passed around
00868     std::shared_ptr<RequestData> data(new RequestData);
00869     data->onResult = onResult;
00870     data->timeout = timeout.expiry;
00871     data->command = command.formatStr;
00872     //data->timeout = Date::notADate();
00873     data->connection = this;
00874     data->id = id;
00875     data->requestIterator = requests.end();
00876     data->timeoutIterator = timeouts.end();
00877     data->state = WAITING;
00878 
00879     ExcAssertEqual(requests.count(id), 0);
00880 
00881     //cerr << "data = " << data << endl;
00882 
00883     auto it = requests.insert(make_pair(id, data)).first;
00884     data->requestIterator = it;
00885 
00886     // Does the servicing thread possibly need to be woken up?
00887     bool needWakeup = false;
00888     
00889     if (timeout.expiry.isADate()) {
00890         needWakeup = !timeouts.empty()
00891             && timeout.expiry < timeouts.begin()->first;
00892         data->timeoutIterator = timeouts.insert(make_pair(timeout.expiry, it));
00893     }
00894     
00895     vector<const char *> argv = command.argv();
00896     vector<size_t> argl = command.argl();
00897 
00898     int result = redisAsyncCommandArgv(context_, resultCallback, data.get(),
00899                                        command.argc(),
00900                                        &argv[0],
00901                                        &argl[0]);
00902     
00903     if (result != REDIS_OK) {
00904         //cerr << "result not OK" << endl;
00905         resultCallback(context_, 0, data.get());
00906         return -1;
00907     }
00908     
00909     if (needWakeup)
00910         eventLoop->wakeup();
00911     
00912     return id;
00913 }
00914 
00915 Result
00916 AsyncConnection::
00917 exec(const Command & command, Timeout timeout)
00918 {
00919     Result result;
00920     int done = 0;
00921 
00922     auto onResponse = [&] (const Redis::Result & redisResult)
00923         {
00924             result = redisResult.deepCopy();
00925             done = 1;
00926             futex_wake(done);
00927         };
00928 
00929     queue(command, onResponse, timeout);
00930 
00931     while (!done)
00932         futex_wait(done, 0);
00933  
00934     return result;
00935 }
00936 
00937 struct AsyncConnection::MultiAggregator
00938     : public Results {
00939 
00940     MultiAggregator(int size,
00941                     const OnResults & onResults)
00942         : numDone(0), onResults(onResults)
00943     {
00944         resize(size);
00945     }
00946     
00947     // Something succeeded
00948     void result(int i, const Result & result)
00949     {
00950         at(i) = result.deepCopy();
00951         finish();
00952     }
00953 
00954     void finish()
00955     {
00956         if (__sync_add_and_fetch(&numDone, 1) != size())
00957             return;
00958         onResults(*this);
00959     }
00960     
00961     int numDone;
00962     OnResults onResults;
00963 };
00964 
00965 void
00966 AsyncConnection::
00967 queueMulti(const std::vector<Command> & commands,
00968            const OnResults & onResults,
00969            Timeout timeout)
00970 {
00971     if (commands.empty())
00972         throw ML::Exception("can't call queueMulti with an empty list "
00973                             "of commands");
00974     
00975     auto results
00976         = std::make_shared<MultiAggregator>(commands.size(), onResults);
00977     
00978     // Make sure they all get executed as a block
00979     boost::unique_lock<Lock> guard(lock);
00980     
00981     // Now queue them one at a time
00982     for (unsigned i = 0;  i < commands.size();  ++i) {
00983         queue(commands[i],
00984               std::bind(&MultiAggregator::result, results, i,
00985                         std::placeholders::_1),
00986               timeout);
00987     }
00988 }
00989 
00990 Results
00991 AsyncConnection::
00992 execMulti(const std::vector<Command> & commands, Timeout timeout)
00993 {
00994     Results results;
00995     int done = 0;
00996 
00997     auto onResponse = [&] (const Redis::Results & redisResults)
00998         {
00999             results = redisResults;
01000             done = 1;
01001             futex_wake(done);
01002         };
01003 
01004     queueMulti(commands, onResponse, timeout);
01005 
01006     while (!done)
01007         futex_wait(done, 0);
01008  
01009     return results;
01010 }
01011 
01012 void
01013 AsyncConnection::
01014 cancel(int handle)
01015 {
01016     throw ML::Exception("AsyncConnection::cancel(): not done");
01017 }
01018 
01019 void
01020 AsyncConnection::
01021 expireTimeouts(Date now)
01022 {
01023     boost::unique_lock<Lock> guard(lock);
01024     
01025     auto it = timeouts.begin(), end = timeouts.end();
01026     for (;  it != end;  ++it) {
01027         if (it->first > now) break;
01028 
01029         auto resultIt = it->second;
01030         auto data = resultIt->second;
01031 
01032         data->state = TIMEDOUT;
01033         data->onResult(Result(Result::timeoutError));
01034         data->timeoutIterator = end;
01035 
01036         // Let it be cleaned up from hiredis once it's finished
01037     }
01038 
01039     timeouts.erase(timeouts.begin(), it);
01040 
01041     if (timeouts.empty())
01042         earliestTimeout = Date::positiveInfinity();
01043     else earliestTimeout = timeouts.begin()->first;
01044 }
01045 
01046 } // namespace Redis
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator