RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* redis.cc 00002 Jeremy Barnes, 14 November 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Redis functionality. 00006 */ 00007 00008 #include "soa/service/redis.h" 00009 #include "jml/utils/guard.h" 00010 #include <boost/thread.hpp> 00011 #include <poll.h> 00012 #include <unistd.h> 00013 #include <fcntl.h> 00014 #include "jml/arch/atomic_ops.h" 00015 #include "jml/arch/backtrace.h" 00016 #include "jml/arch/futex.h" 00017 #include "jml/utils/vector_utils.h" 00018 00019 00020 using namespace std; 00021 using namespace Datacratic; 00022 using namespace ML; 00023 00024 00025 00026 00027 namespace Redis { 00028 00029 00030 const Command PING("PING"); 00031 const Command HDEL("HDEL"); 00032 const Command HGET("HGET"); 00033 const Command HGETALL("HGETALL"); 00034 const Command HMGET("HMGET"); 00035 const Command HMSET("HMSET"); 00036 const Command WATCH("WATCH"); 00037 const Command MULTI("MULTI"); 00038 const Command EXEC("EXEC"); 00039 const Command HSET("HSET"); 00040 const Command HINCRBY("HINCRBY"); 00041 const Command KEYS("KEYS"); 00042 const Command SET("SET"); 00043 const Command MSET("MSET"); 00044 const Command GET("GET"); 00045 const Command MGET("MGET"); 00046 const Command EXPIRE("EXPIRE"); 00047 const Command RANDOMKEY("RANDOMKEY"); 00048 const Command DEL("DEL"); 00049 const Command SADD("SADD"); 00050 const Command SMEMBERS("SMEMBERS"); 00051 const Command TTL("TTL"); 00052 00053 /*****************************************************************************/ 00054 /* REPLY */ 00055 /*****************************************************************************/ 00056 00057 00058 ReplyType 00059 Reply:: 00060 type() const 00061 { 00062 ExcAssert(r_); 00063 switch (r_->type) { 00064 case REDIS_REPLY_STATUS: return STATUS; 00065 case REDIS_REPLY_ERROR: return ERROR; 00066 case REDIS_REPLY_INTEGER: return INTEGER; 00067 case REDIS_REPLY_NIL: return NIL; 00068 case REDIS_REPLY_STRING: return STRING; 00069 case REDIS_REPLY_ARRAY: return ARRAY; 00070 default: 00071 throw ML::Exception("unknown Redis reply type %d", r_->type); 00072 }; 00073 } 00074 00075 std::string 00076 Reply:: 00077 getString() const 00078 { 00079 ExcAssert(r_); 00080 return std::string(r_->str, r_->str + r_->len); 00081 } 00082 00083 std::string 00084 Reply:: 00085 asString() const 00086 { 00087 ExcAssert(r_); 00088 switch (r_->type) { 00089 case REDIS_REPLY_STATUS: 00090 case REDIS_REPLY_STRING: 00091 case REDIS_REPLY_ERROR: return getString(); 00092 case REDIS_REPLY_INTEGER: return ML::format("%lli", r_->integer); 00093 case REDIS_REPLY_NIL: return ""; 00094 case REDIS_REPLY_ARRAY: return asJson().toString(); 00095 default: 00096 throw ML::Exception("unknown Redis reply type"); 00097 }; 00098 } 00099 00100 long long 00101 Reply:: 00102 asInt() const 00103 { 00104 ExcAssert(r_); 00105 ExcAssertEqual(r_->type, REDIS_REPLY_INTEGER); 00106 return r_->integer; 00107 } 00108 00109 long long 00110 Reply:: 00111 asInt(long long defaultIfNotInteger) 00112 { 00113 ExcAssert(r_); 00114 switch (r_->type) { 00115 case REDIS_REPLY_INTEGER: return r_->integer; 00116 case REDIS_REPLY_STRING: { 00117 std::string s = getString(); 00118 char * end = 0; 00119 long long result = strtoll(s.c_str(), &end, 10); 00120 if (end != s.c_str() + s.length()) 00121 return defaultIfNotInteger; 00122 return result; 00123 } 00124 case REDIS_REPLY_STATUS: 00125 case REDIS_REPLY_ERROR: 00126 case REDIS_REPLY_NIL: 00127 case REDIS_REPLY_ARRAY: return defaultIfNotInteger; 00128 default: 00129 throw ML::Exception("unknown Redis reply type"); 00130 }; 00131 00132 } 00133 00134 Json::Value 00135 Reply:: 00136 asJson() const 00137 { 00138 ExcAssert(r_); 00139 Json::Value result; 00140 00141 switch (r_->type) { 00142 00143 case REDIS_REPLY_STATUS: 00144 result["status"] = getString(); 00145 return result; 00146 00147 case REDIS_REPLY_ERROR: 00148 result["error"] = getString(); 00149 return result; 00150 00151 case REDIS_REPLY_INTEGER: 00152 result = (Json::Value::UInt)r_->integer; 00153 return result; 00154 00155 case REDIS_REPLY_NIL: 00156 return result; 00157 00158 case REDIS_REPLY_STRING: 00159 result = getString(); 00160 return result; 00161 00162 case REDIS_REPLY_ARRAY: 00163 for (unsigned i = 0; i < r_->elements; ++i) 00164 result[i] = (*this)[i].asJson(); 00165 return result; 00166 00167 default: 00168 throw ML::Exception("unknown Redis reply type "); 00169 }; 00170 00171 } 00172 00173 Reply 00174 Reply:: 00175 deepCopy() const 00176 { 00177 return Reply(doDeepCopy(r_.get()), true); 00178 } 00179 00180 redisReply * 00181 Reply:: 00182 doDeepCopy(redisReply * r) 00183 { 00184 redisReply * result = (redisReply *)malloc(sizeof(redisReply)); 00185 memset(result, 0, sizeof(redisReply)); 00186 result->type = r->type; 00187 00188 try { 00189 switch (r->type) { 00190 00191 case REDIS_REPLY_INTEGER: 00192 case REDIS_REPLY_NIL: 00193 result->integer = r->integer; 00194 break; 00195 case REDIS_REPLY_STATUS: 00196 case REDIS_REPLY_ERROR: 00197 case REDIS_REPLY_STRING: { 00198 // Copy the string 00199 char * str = (char *)malloc(r->len); 00200 result->str = str; 00201 result->len = r->len; 00202 std::copy(r->str, r->str + r->len, str); 00203 break; 00204 } 00205 case REDIS_REPLY_ARRAY: { 00206 // Copy the array 00207 redisReply ** arr 00208 = (redisReply **)malloc(r->elements * sizeof(redisReply *)); 00209 for (unsigned i = 0; i < r->elements; ++i) 00210 arr[i] = 0; 00211 result->element = arr; 00212 result->elements = 0; 00213 for (unsigned i = 0; i < r->elements; ++i) { 00214 result->element[i] = doDeepCopy(r->element[i]); 00215 result->elements = i + 1; 00216 } 00217 break; 00218 } 00219 00220 default: 00221 throw ML::Exception("unknown Redis reply type %d", r->type); 00222 }; 00223 } catch (...) { 00224 freeReplyObject(result); 00225 throw; 00226 } 00227 00228 return result; 00229 } 00230 00231 std::ostream & operator << (std::ostream & stream, const Reply & reply) 00232 { 00233 return stream << reply.asString(); 00234 } 00235 00236 00237 /*****************************************************************************/ 00238 /* RESULTS */ 00239 /*****************************************************************************/ 00240 00241 const std::string 00242 Result::timeoutError("timeout"); 00243 00244 std::ostream & operator << (std::ostream & stream, const Result & result) 00245 { 00246 if (result) return stream << result.reply().asString(); 00247 else return stream << result.error(); 00248 } 00249 00250 std::ostream & operator << (std::ostream & stream, const Results & results) 00251 { 00252 for (unsigned i = 0; i < results.size(); ++i) { 00253 stream << " " << i << ": " << results[i] << endl; 00254 } 00255 return stream; 00256 } 00257 00258 00259 /*****************************************************************************/ 00260 /* COMMAND */ 00261 /*****************************************************************************/ 00262 00263 #if 0 00264 Command:: 00265 Command() 00266 { 00267 va_list ap; 00268 va_start(ap, cmdFormat); 00269 ML::call_guard([&] () { va_end(ap); }); 00270 00271 return vqueue(onSuccess, onFailure, Date::notADate(), OnTimeout(), 00272 cmdFormat, ap); 00273 } 00274 #endif 00275 00276 std::ostream & operator << (std::ostream & stream, const Command & command) 00277 { 00278 return stream << command.formatStr << command.args; 00279 } 00280 00281 /*****************************************************************************/ 00282 /* ADDRESS */ 00283 /*****************************************************************************/ 00284 00285 Address:: 00286 Address() 00287 { 00288 } 00289 00290 Address:: 00291 Address(const std::string & uri) 00292 : uri_(uri) 00293 { 00294 } 00295 00296 Address 00297 Address:: 00298 tcp(const std::string & host, int port) 00299 { 00300 if (host.find(':') != string::npos) 00301 throw ML::Exception("invalid host has a colon"); 00302 if (host.find('/') != string::npos) 00303 throw ML::Exception("invalid host has a slash"); 00304 00305 return Address(host + ":" + to_string(port)); 00306 } 00307 00308 Address 00309 Address:: 00310 unix(const std::string & path) 00311 { 00312 if (path.find(':') != string::npos) 00313 throw ML::Exception("invalid path has a colon"); 00314 return Address(path); 00315 } 00316 00317 bool 00318 Address:: 00319 isUnix() const 00320 { 00321 return !uri_.empty() 00322 && uri_.find(':') == string::npos; 00323 } 00324 00325 bool 00326 Address:: 00327 isTcp() const 00328 { 00329 return !uri_.empty() 00330 && uri_.find(':') != string::npos; 00331 00332 } 00333 00334 std::string 00335 Address:: 00336 unixPath() const 00337 { 00338 if (!isUnix()) 00339 throw ML::Exception("address is not unix"); 00340 return uri_; 00341 } 00342 00343 std::string 00344 Address:: 00345 tcpHost() const 00346 { 00347 if (!isTcp()) 00348 throw ML::Exception("address is not tcp"); 00349 auto pos = uri_.find(':'); 00350 ExcAssertNotEqual(pos, string::npos); 00351 return string(uri_, 0, pos); 00352 } 00353 00354 int 00355 Address:: 00356 tcpPort() const 00357 { 00358 if (!isTcp()) 00359 throw ML::Exception("address is not tcp"); 00360 auto pos = uri_.find(':'); 00361 ExcAssertNotEqual(pos, string::npos); 00362 return boost::lexical_cast<int>(string(uri_, pos + 1)); 00363 } 00364 00365 std::string 00366 Address:: 00367 uri() const 00368 { 00369 return uri_; 00370 } 00371 00372 00373 /*****************************************************************************/ 00374 /* ASYNC CONNECTION */ 00375 /*****************************************************************************/ 00376 00377 enum { 00378 WAITING = 0, 00379 REPLIED = 1, 00380 TIMEDOUT = 2 00381 }; 00382 00383 size_t requestDataCreated = 0; 00384 size_t requestDataDestroyed = 0; 00385 00386 struct AsyncConnection::RequestData 00387 : std::enable_shared_from_this<AsyncConnection::RequestData> { 00388 RequestData() 00389 { 00390 ML::atomic_inc(requestDataCreated); 00391 } 00392 00393 ~RequestData() 00394 { 00395 ML::atomic_inc(requestDataDestroyed); 00396 } 00397 00398 OnResult onResult; 00399 std::string command; 00400 Date timeout; 00401 AsyncConnection * connection; 00402 int64_t id; 00403 Requests::iterator requestIterator; 00404 Timeouts::iterator timeoutIterator; 00405 int state; 00406 }; 00407 00408 size_t eventLoopsCreated = 0; 00409 size_t eventLoopsDestroyed = 0; 00410 00411 struct AsyncConnection::EventLoop { 00412 00413 int wakeupfd[2]; 00414 volatile bool finished; 00415 AsyncConnection * connection; 00416 std::shared_ptr<boost::thread> thread; 00417 pollfd fds[2]; 00418 volatile int disconnected; 00419 00420 EventLoop(AsyncConnection * connection) 00421 : finished(false), connection(connection), disconnected(1) 00422 { 00423 ML::atomic_inc(eventLoopsCreated); 00424 00425 int res = pipe2(wakeupfd, O_NONBLOCK); 00426 if (res == -1) 00427 throw ML::Exception(errno, "pipe2"); 00428 00429 //cerr << "connection on fd " << connection->context_->c.fd << endl; 00430 00431 00432 fds[0].fd = wakeupfd[0]; 00433 fds[0].events = POLLIN; 00434 fds[1].fd = connection->context_->c.fd; 00435 fds[1].events = 0; 00436 00437 registerMe(connection->context_); 00438 00439 thread.reset(new boost::thread(boost::bind(&EventLoop::run, this))); 00440 00441 #if 0 00442 char buf[1]; 00443 res = read(wakeupfd[0], buf, 1); 00444 if (res == -1) 00445 throw ML::Exception(errno, "read"); 00446 #endif 00447 } 00448 00449 ~EventLoop() 00450 { 00451 //cerr << "DESTROYING EVENT LOOP" << endl; 00452 ML::atomic_inc(eventLoopsDestroyed); 00453 shutdown(); 00454 } 00455 00456 void shutdown() 00457 { 00458 if (!thread) return; 00459 00460 finished = true; 00461 wakeup(); 00462 thread->join(); 00463 thread.reset(); 00464 ::close(wakeupfd[0]); 00465 ::close(wakeupfd[1]); 00466 } 00467 00468 void wakeup() 00469 { 00470 int res = write(wakeupfd[1], "x", 1); 00471 if (res == -1) 00472 throw ML::Exception("error waking up fd %d: %s", wakeupfd[1], 00473 strerror(errno)); 00474 } 00475 00476 void registerMe(redisAsyncContext * context) 00477 { 00478 //cerr << "called registerMe" << endl; 00479 redisAsyncSetConnectCallback(context, onConnect); 00480 redisAsyncSetDisconnectCallback(context, onDisconnect); 00481 00482 context->ev.data = context->data = this; 00483 context->ev.addRead = startReading; 00484 context->ev.delRead = stopReading; 00485 context->ev.addWrite = startWriting; 00486 context->ev.delWrite = stopWriting; 00487 context->ev.cleanup = cleanup; 00488 } 00489 00490 void run() 00491 { 00492 //wakeup(); 00493 00494 //cerr << this << " starting run loop" << endl; 00495 00496 while (!finished) { 00497 //sleep(1); 00498 Date now = Date::now(); 00499 00500 if (connection->earliestTimeout < now) 00501 connection->expireTimeouts(now); 00502 00503 double timeLeft = now.secondsUntil(connection->earliestTimeout); 00504 00505 //cerr << "timeLeft = " << timeLeft << endl; 00506 //cerr << "fds[0].events = " << fds[0].events << endl; 00507 //cerr << "fds[1].events = " << fds[1].events << endl; 00508 00509 int timeout = std::min(1000.0, 00510 std::max<double>(0, 1000 * timeLeft)); 00511 00512 if (connection->earliestTimeout == Date::positiveInfinity()) 00513 timeout = 1000000; 00514 00515 //cerr << "looping; fd0 = " << fds[1].fd << " timeout = " 00516 // << timeout << endl; 00517 00518 int res = poll(fds, 2, timeout); 00519 if (res == -1 && errno != EINTR) { 00520 cerr << "poll() error: " << strerror(errno) << endl; 00521 } 00522 if (res == 0) continue; // just a timeout; loop around again 00523 00524 //cerr << "poll() returned " << res << endl; 00525 00526 if (fds[0].revents & POLLIN) { 00527 //cerr << "got wakeup" << endl; 00528 char buf[128]; 00529 int res = read(fds[0].fd, buf, 128); 00530 if (res == -1) 00531 throw ML::Exception(errno, "read from wakeup pipe"); 00532 //cerr << "woken up with " << res << " messages" << endl; 00533 } 00534 if ((fds[1].revents & POLLOUT) 00535 && (fds[1].events & POLLOUT)) { 00536 //cerr << "got write on " << fds[1].fd << endl; 00537 boost::unique_lock<Lock> guard(connection->lock); 00538 redisAsyncHandleWrite(connection->context_); 00539 } 00540 if ((fds[1].revents & POLLIN) 00541 && (fds[1].events & POLLIN)) { 00542 //cerr << "got read on " << fds[1].fd << endl; 00543 boost::unique_lock<Lock> guard(connection->lock); 00544 redisAsyncHandleRead(connection->context_); 00545 } 00546 00547 // Now we don't have the lock anymore, do our callbacks 00548 while (!connection->replyQueue.empty()) { 00549 try { 00550 connection->replyQueue.front()(); 00551 } catch (...) { 00552 cerr << "warning: redis callback threw" << endl; 00553 //abort(); 00554 } 00555 connection->replyQueue.pop_front(); 00556 } 00557 } 00558 00559 if (!disconnected) { 00560 // Disconnect 00561 //cerr << this << " calling redisAsyncDisconnect" << endl; 00562 redisAsyncDisconnect(connection->context_); 00563 //redisAsyncFree(connection->context_); 00564 //cerr << this << " done redisAsyncDisconnect" << endl; 00565 } 00566 00567 // Wait until we get the callback 00568 while (!disconnected) { 00569 futex_wait(disconnected, 0); 00570 } 00571 00572 //cerr << this << " now disconnected" << endl; 00573 } 00574 00575 static void onConnect(const redisAsyncContext * context, int status) 00576 { 00577 EventLoop * eventLoop = reinterpret_cast<EventLoop *>(context->data); 00578 00579 if (status == REDIS_OK) 00580 eventLoop->onConnect(status); 00581 else { 00582 /* This function will be called with an error status if the 00583 connection failed. For us it's like a disconnection, so 00584 we call into the disconnect code. 00585 */ 00586 cerr << "onConnect: code = " << status << " err = " << context->err 00587 << " errstr = " << context->errstr << " errno = " 00588 << strerror(errno) << endl; 00589 eventLoop->onDisconnect(status); 00590 } 00591 } 00592 00593 void onConnect(int status) 00594 { 00595 //cerr << "connection on fd " << connection->context_->c.fd << endl; 00596 //cerr << "status = " << status << endl; 00597 fds[1].fd = connection->context_->c.fd; 00598 wakeup(); 00599 disconnected = 0; 00600 futex_wake(disconnected); 00601 } 00602 00603 static void onDisconnect(const redisAsyncContext * context, int status) 00604 { 00605 //cerr << "onDisconnect" << endl; 00606 EventLoop * eventLoop = reinterpret_cast<EventLoop *>(context->data); 00607 eventLoop->onDisconnect(status); 00608 } 00609 00610 void onDisconnect(int status) 00611 { 00612 if (status != REDIS_OK) { 00613 cerr << "disconnection with status " << status << endl; 00614 cerr << "onConnect: code = " << status << " err = " 00615 << connection->context_->err 00616 << " errstr = " 00617 << connection->context_->errstr << " errno = " 00618 << strerror(errno) << endl; 00619 } 00620 disconnected = 1; 00621 futex_wake(disconnected); 00622 } 00623 00624 static void startReading(void * privData) 00625 { 00626 EventLoop * eventLoop = reinterpret_cast<EventLoop *>(privData); 00627 eventLoop->startReading(); 00628 } 00629 00630 void startReading() 00631 { 00632 //cerr << "start reading" << endl; 00633 //if (fds[1].events & POLLIN) return; // already reading 00634 fds[1].events |= POLLIN; 00635 wakeup(); 00636 } 00637 00638 static void stopReading(void * privData) 00639 { 00640 EventLoop * eventLoop = reinterpret_cast<EventLoop *>(privData); 00641 eventLoop->stopReading(); 00642 } 00643 00644 void stopReading() 00645 { 00646 //cerr << "stop reading" << endl; 00647 fds[1].events &= ~POLLIN; 00648 } 00649 00650 static void startWriting(void * privData) 00651 { 00652 EventLoop * eventLoop = reinterpret_cast<EventLoop *>(privData); 00653 eventLoop->startWriting(); 00654 } 00655 00656 void startWriting() 00657 { 00658 //cerr << "start writing" << endl; 00659 //if (fds[1].events & POLLOUT) return; // already reading 00660 fds[1].events |= POLLOUT; 00661 wakeup(); 00662 } 00663 00664 static void stopWriting(void * privData) 00665 { 00666 EventLoop * eventLoop = reinterpret_cast<EventLoop *>(privData); 00667 eventLoop->stopWriting(); 00668 } 00669 00670 void stopWriting() 00671 { 00672 //cerr << "stop writing" << endl; 00673 fds[1].events &= ~POLLOUT; 00674 } 00675 00676 static void cleanup(void * privData) 00677 { 00678 EventLoop * eventLoop = reinterpret_cast<EventLoop *>(privData); 00679 eventLoop->cleanup(); 00680 } 00681 00682 void cleanup() 00683 { 00684 //cerr << this << " doing cleanup" << endl; 00685 //backtrace(); 00686 } 00687 }; 00688 00689 00690 AsyncConnection:: 00691 AsyncConnection() 00692 : context_(0), idNum(0) 00693 { 00694 } 00695 00696 AsyncConnection:: 00697 AsyncConnection(const Address & address) 00698 : context_(0), idNum(0) 00699 { 00700 connect(address); 00701 } 00702 00703 AsyncConnection:: 00704 ~AsyncConnection() 00705 { 00706 close(); 00707 } 00708 00709 void 00710 AsyncConnection:: 00711 connect(const Address & address) 00712 { 00713 //cerr << "connecting to redis " << address.uri() << endl; 00714 00715 close(); 00716 00717 this->address = address; 00718 00719 if (address.isTcp()) { 00720 context_ = redisAsyncConnect(address.tcpHost().c_str(), 00721 address.tcpPort()); 00722 } 00723 else if (address.isUnix()) { 00724 context_ = redisAsyncConnectUnix(address.unixPath().c_str()); 00725 } 00726 else throw ML::Exception("cannot connect to address that is neither tcp " 00727 "or unix"); 00728 checkError("connect"); 00729 00730 eventLoop.reset(new EventLoop(this)); 00731 } 00732 00733 void 00734 AsyncConnection:: 00735 test() 00736 { 00737 int done = 0; 00738 00739 string error; 00740 00741 auto onResponse = [&] (const Redis::Result & result) 00742 { 00743 if (result) { 00744 //cerr << "got reply " << result.reply() << endl; 00745 } 00746 else error = result.error(); 00747 done = 1; 00748 futex_wake(done); 00749 }; 00750 00751 queue(PING, onResponse, 2.0); 00752 00753 while (!done) 00754 futex_wait(done, 0); 00755 00756 if (error != "") 00757 throw ML::Exception("couldn't connect to Redis: " + error); 00758 } 00759 00760 00761 00762 void 00763 AsyncConnection:: 00764 close() 00765 { 00766 if (!context_) return; 00767 00768 if (eventLoop) { 00769 eventLoop->shutdown(); 00770 eventLoop.reset(); 00771 } 00772 00773 context_ = 0; 00774 } 00775 00776 void 00777 AsyncConnection:: 00778 resultCallback(redisAsyncContext * context, void * reply, void * privData) 00779 { 00780 //cerr << "resultCallback with reply " << reply << " privData " 00781 // << privData << endl; 00782 //cerr << "context->err = " << context->err << endl; 00783 //cerr << "context->errstr = " << context->errstr << endl; 00784 //cerr << "context->c.errstr = " << context->c.errstr << endl; 00785 00786 ExcAssert(privData); 00787 00788 // Get a shared pointer to our data so it doesn't go away 00789 RequestData * dataRawPtr 00790 = reinterpret_cast<RequestData *>(privData); 00791 std::shared_ptr<RequestData> data 00792 = dataRawPtr->shared_from_this(); 00793 00794 //cerr << "command " << data->command << endl; 00795 //cerr << "reply " << reply << endl; 00796 00797 // Remove from data structures 00798 AsyncConnection * c = data->connection; 00799 00800 { 00801 boost::unique_lock<Lock> guard(c->lock); 00802 00803 if (data->requestIterator != c->requests.end()) { 00804 c->requests.erase(data->requestIterator); 00805 data->requestIterator = c->requests.end(); 00806 } 00807 00808 if (data->timeoutIterator != c->timeouts.end()) { 00809 c->timeouts.erase(data->timeoutIterator); 00810 data->timeoutIterator = c->timeouts.end(); 00811 if (c->timeouts.empty()) 00812 c->earliestTimeout = Date::positiveInfinity(); 00813 else c->earliestTimeout = c->timeouts.begin()->first; 00814 } 00815 00816 if (data->state != WAITING) return; // raced; timeout happened 00817 data->state = REPLIED; 00818 } 00819 00820 00821 Result result; 00822 00823 if (reply) { 00824 Reply replyObj((redisReply *)reply, false /* take ownership */); 00825 //cerr << "reply = " << replyObj << endl; 00826 if (replyObj.type() == ERROR) { 00827 // Command error, return it 00828 result = Result(replyObj.asString()); 00829 } 00830 else { 00831 // Result (success) 00832 // We have to take a deep copy since the reply object is owned 00833 // by the caller 00834 result = Result(replyObj.deepCopy()); 00835 } 00836 } 00837 else { 00838 // Context encountered an error; return it 00839 result = Result(data->connection->context_->errstr); 00840 } 00841 00842 // Queue up a reply object so it can be called without the lock held. If 00843 // we call directly from here, then the lock has to be held and so deadlock 00844 // is possible. 00845 c->replyQueue.push_back(std::bind(data->onResult, result)); 00846 } 00847 00848 int64_t 00849 AsyncConnection:: 00850 queue(const Command & command, 00851 const OnResult & onResult, 00852 Timeout timeout) 00853 { 00854 boost::unique_lock<Lock> guard(lock); 00855 00856 ExcAssert(context_); 00857 ExcAssert(!context_->err); 00858 00859 // Check basics 00860 if (timeout.expiry.isADate() && Date::now() >= timeout.expiry) { 00861 onResult(Result(Result::timeoutError)); 00862 return -1; 00863 } 00864 00865 int64_t id = idNum++; 00866 00867 // Create data structure to be passed around 00868 std::shared_ptr<RequestData> data(new RequestData); 00869 data->onResult = onResult; 00870 data->timeout = timeout.expiry; 00871 data->command = command.formatStr; 00872 //data->timeout = Date::notADate(); 00873 data->connection = this; 00874 data->id = id; 00875 data->requestIterator = requests.end(); 00876 data->timeoutIterator = timeouts.end(); 00877 data->state = WAITING; 00878 00879 ExcAssertEqual(requests.count(id), 0); 00880 00881 //cerr << "data = " << data << endl; 00882 00883 auto it = requests.insert(make_pair(id, data)).first; 00884 data->requestIterator = it; 00885 00886 // Does the servicing thread possibly need to be woken up? 00887 bool needWakeup = false; 00888 00889 if (timeout.expiry.isADate()) { 00890 needWakeup = !timeouts.empty() 00891 && timeout.expiry < timeouts.begin()->first; 00892 data->timeoutIterator = timeouts.insert(make_pair(timeout.expiry, it)); 00893 } 00894 00895 vector<const char *> argv = command.argv(); 00896 vector<size_t> argl = command.argl(); 00897 00898 int result = redisAsyncCommandArgv(context_, resultCallback, data.get(), 00899 command.argc(), 00900 &argv[0], 00901 &argl[0]); 00902 00903 if (result != REDIS_OK) { 00904 //cerr << "result not OK" << endl; 00905 resultCallback(context_, 0, data.get()); 00906 return -1; 00907 } 00908 00909 if (needWakeup) 00910 eventLoop->wakeup(); 00911 00912 return id; 00913 } 00914 00915 Result 00916 AsyncConnection:: 00917 exec(const Command & command, Timeout timeout) 00918 { 00919 Result result; 00920 int done = 0; 00921 00922 auto onResponse = [&] (const Redis::Result & redisResult) 00923 { 00924 result = redisResult.deepCopy(); 00925 done = 1; 00926 futex_wake(done); 00927 }; 00928 00929 queue(command, onResponse, timeout); 00930 00931 while (!done) 00932 futex_wait(done, 0); 00933 00934 return result; 00935 } 00936 00937 struct AsyncConnection::MultiAggregator 00938 : public Results { 00939 00940 MultiAggregator(int size, 00941 const OnResults & onResults) 00942 : numDone(0), onResults(onResults) 00943 { 00944 resize(size); 00945 } 00946 00947 // Something succeeded 00948 void result(int i, const Result & result) 00949 { 00950 at(i) = result.deepCopy(); 00951 finish(); 00952 } 00953 00954 void finish() 00955 { 00956 if (__sync_add_and_fetch(&numDone, 1) != size()) 00957 return; 00958 onResults(*this); 00959 } 00960 00961 int numDone; 00962 OnResults onResults; 00963 }; 00964 00965 void 00966 AsyncConnection:: 00967 queueMulti(const std::vector<Command> & commands, 00968 const OnResults & onResults, 00969 Timeout timeout) 00970 { 00971 if (commands.empty()) 00972 throw ML::Exception("can't call queueMulti with an empty list " 00973 "of commands"); 00974 00975 auto results 00976 = std::make_shared<MultiAggregator>(commands.size(), onResults); 00977 00978 // Make sure they all get executed as a block 00979 boost::unique_lock<Lock> guard(lock); 00980 00981 // Now queue them one at a time 00982 for (unsigned i = 0; i < commands.size(); ++i) { 00983 queue(commands[i], 00984 std::bind(&MultiAggregator::result, results, i, 00985 std::placeholders::_1), 00986 timeout); 00987 } 00988 } 00989 00990 Results 00991 AsyncConnection:: 00992 execMulti(const std::vector<Command> & commands, Timeout timeout) 00993 { 00994 Results results; 00995 int done = 0; 00996 00997 auto onResponse = [&] (const Redis::Results & redisResults) 00998 { 00999 results = redisResults; 01000 done = 1; 01001 futex_wake(done); 01002 }; 01003 01004 queueMulti(commands, onResponse, timeout); 01005 01006 while (!done) 01007 futex_wait(done, 0); 01008 01009 return results; 01010 } 01011 01012 void 01013 AsyncConnection:: 01014 cancel(int handle) 01015 { 01016 throw ML::Exception("AsyncConnection::cancel(): not done"); 01017 } 01018 01019 void 01020 AsyncConnection:: 01021 expireTimeouts(Date now) 01022 { 01023 boost::unique_lock<Lock> guard(lock); 01024 01025 auto it = timeouts.begin(), end = timeouts.end(); 01026 for (; it != end; ++it) { 01027 if (it->first > now) break; 01028 01029 auto resultIt = it->second; 01030 auto data = resultIt->second; 01031 01032 data->state = TIMEDOUT; 01033 data->onResult(Result(Result::timeoutError)); 01034 data->timeoutIterator = end; 01035 01036 // Let it be cleaned up from hiredis once it's finished 01037 } 01038 01039 timeouts.erase(timeouts.begin(), it); 01040 01041 if (timeouts.empty()) 01042 earliestTimeout = Date::positiveInfinity(); 01043 else earliestTimeout = timeouts.begin()->first; 01044 } 01045 01046 } // namespace Redis