![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* redis.h -*- C++ -*- 00002 Jeremy Barnes, 14 November 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Connection to redis. 00006 */ 00007 00008 #ifndef __redis__redis_h__ 00009 #define __redis__redis_h__ 00010 00011 #include <hiredis/hiredis.h> 00012 #include <hiredis/async.h> 00013 #include <string> 00014 #include "jml/arch/exception.h" 00015 #include "jml/utils/exc_assert.h" 00016 #include "jml/utils/string_functions.h" 00017 #include "jml/utils/unnamed_bool.h" 00018 #include "jml/utils/ring_buffer.h" 00019 #include "soa/jsoncpp/json.h" 00020 #include "soa/types/date.h" 00021 #include <boost/shared_ptr.hpp> 00022 #include <boost/function.hpp> 00023 #include <boost/thread/thread.hpp> 00024 #include <boost/thread/recursive_mutex.hpp> 00025 #include <deque> 00026 00027 00028 namespace Redis { 00029 00030 00031 using Datacratic::Date; 00032 00033 00034 enum ReplyType { 00035 STATUS, 00036 ERROR, 00037 INTEGER, 00038 NIL, 00039 STRING, 00040 ARRAY 00041 }; 00042 00043 00044 /*****************************************************************************/ 00045 /* REPLY */ 00046 /*****************************************************************************/ 00047 00048 struct Reply { 00049 00050 static void doDelete(redisReply * reply) 00051 { 00052 freeReplyObject(reply); 00053 } 00054 00055 static void noDelete(redisReply * reply) 00056 { 00057 } 00058 00059 Reply() 00060 { 00061 } 00062 00063 Reply(redisReply * reply, bool needDelete) 00064 : r_(reply, needDelete ? doDelete : noDelete) 00065 { 00066 } 00067 00068 bool initialized() const { return !!r_; } 00069 00071 Reply deepCopy() const; 00072 00073 static redisReply * doDeepCopy(redisReply * r); 00074 00075 operator std::string () const 00076 { 00077 return asString(); 00078 } 00079 00080 operator long long int () const 00081 { 00082 return asInt(); 00083 } 00084 00085 operator Json::Value () const 00086 { 00087 return asJson(); 00088 } 00089 00090 ReplyType type() const; 00091 00092 std::string getString() const; 00093 00094 std::string asString() const; 00095 00096 long long asInt() const; 00097 00098 long long asInt(long long defaultIfNotInteger); 00099 00100 Json::Value asJson() const; 00101 00102 Reply operator [] (size_t index) const 00103 { 00104 ExcAssert(r_); 00105 ExcAssertLess(index, length()); 00106 return Reply(r_->element[index], false); 00107 } 00108 00109 ssize_t length() const 00110 { 00111 ExcAssert(r_); 00112 ExcAssertEqual(r_->type, REDIS_REPLY_ARRAY); 00113 return r_->elements; 00114 } 00115 00116 private: 00117 std::shared_ptr<redisReply> r_; 00118 bool needDelete_; 00119 }; 00120 00121 std::ostream & operator << (std::ostream & stream, const Reply & reply); 00122 00123 00127 struct Result { 00128 Result() 00129 { 00130 } 00131 00132 Result(const std::string & error) 00133 : error_(error) 00134 { 00135 } 00136 00137 Result(const Reply & reply) 00138 : reply_(reply) 00139 { 00140 } 00141 00142 Result deepCopy() const 00143 { 00144 if (ok()) 00145 return Result(reply_.deepCopy()); 00146 else return Result(error_); 00147 } 00148 00149 Reply reply_; 00150 std::string error_; 00151 00155 static const std::string timeoutError; 00156 00157 bool ok() const { return error_.empty(); } 00158 00159 bool timedOut() const { return error_ == timeoutError; } 00160 00161 JML_IMPLEMENT_OPERATOR_BOOL(ok()); 00162 00163 const Reply & reply() const 00164 { 00165 if (!error_.empty()) 00166 throw ML::Exception("attempt to read reply from Redis result with" 00167 "error " + error_); 00168 ExcAssert(reply_.initialized()); 00169 return reply_; 00170 } 00171 00172 const std::string & error() const 00173 { 00174 return error_; 00175 } 00176 }; 00177 00178 std::ostream & operator << (std::ostream & stream, const Result & result); 00179 00180 00182 struct Results : public std::vector<Result> { 00183 bool ok() const 00184 { 00185 for (auto & r: *this) 00186 if (!r) 00187 return false; 00188 return true; 00189 } 00190 00191 JML_IMPLEMENT_OPERATOR_BOOL(ok()); 00192 00193 const Reply & reply(int index) const 00194 { 00195 return at(index).reply(); 00196 } 00197 00198 std::string error() const 00199 { 00200 for (auto & r: *this) 00201 if (!r) 00202 return r.error(); 00203 return ""; 00204 } 00205 00206 bool timedOut() const 00207 { 00208 for (auto & r: *this) 00209 if (r.timedOut()) 00210 return true; 00211 return false; 00212 } 00213 }; 00214 00215 std::ostream & operator << (std::ostream & stream, const Results & results); 00216 00217 00218 /*****************************************************************************/ 00219 /* COMMAND */ 00220 /*****************************************************************************/ 00221 00222 struct Command { 00223 Command() 00224 { 00225 } 00226 00227 //explicit Command(const char * args, ...); 00228 00229 Command(const std::string & command) 00230 : formatStr(command) 00231 { 00232 } 00233 00234 Command(const std::string & cmd, 00235 const std::initializer_list<std::string> & args) 00236 : formatStr(cmd) 00237 { 00238 for (auto arg: args) 00239 addArg(arg); 00240 } 00241 00242 std::string formatStr; 00243 std::vector<std::string> args; 00244 00245 //std::string formatted() const; 00246 00247 void addArg(const std::string & arg) 00248 { 00249 args.push_back(arg); 00250 } 00251 00252 void addArg(int64_t arg) 00253 { 00254 args.push_back(std::to_string(arg)); 00255 } 00256 00257 template<typename Arg, typename... Args> 00258 Command operator () (const Arg & head, Args&&... tail) const 00259 { 00260 Command result = *this; 00261 result.addArg(head); 00262 return result(std::forward<Args>(tail)...); 00263 } 00264 00265 Command operator () () const 00266 { 00267 return *this; 00268 } 00269 00270 int argc() const 00271 { 00272 return args.size() + 1; 00273 } 00274 00275 std::vector<const char *> argv() const 00276 { 00277 std::vector<const char *> result; 00278 result.push_back(formatStr.c_str()); 00279 for (const std::string & arg: args) 00280 result.push_back(arg.c_str()); 00281 return result; 00282 } 00283 00284 std::vector<size_t> argl() const 00285 { 00286 std::vector<size_t> result; 00287 result.push_back(formatStr.length()); 00288 for (const std::string & arg: args) 00289 result.push_back(arg.length()); 00290 return result; 00291 } 00292 }; 00293 00294 std::ostream & operator << (std::ostream & stream, const Command & command); 00295 00296 // Commands ready to be constructed 00297 00298 extern const Command PING; 00299 extern const Command HDEL; 00300 extern const Command HGET; 00301 extern const Command HGETALL; 00302 extern const Command HMGET; 00303 extern const Command HMSET; 00304 extern const Command WATCH; 00305 extern const Command MULTI; 00306 extern const Command EXEC; 00307 extern const Command HSET; 00308 extern const Command HINCRBY; 00309 extern const Command KEYS; 00310 extern const Command SET; 00311 extern const Command MSET; 00312 extern const Command GET; 00313 extern const Command MGET; 00314 extern const Command EXPIRE; 00315 extern const Command RANDOMKEY; 00316 extern const Command DEL; 00317 extern const Command SADD; 00318 extern const Command SMEMBERS; 00319 extern const Command TTL; 00320 00321 00322 /*****************************************************************************/ 00323 /* ADDRESS */ 00324 /*****************************************************************************/ 00325 00328 struct Address { 00329 Address(); 00330 Address(const std::string & uri); 00331 00332 static Address tcp(const std::string & host, int port); 00333 static Address unix(const std::string & path); 00334 00335 bool isUnix() const; 00336 bool isTcp() const; 00337 00338 std::string unixPath() const; 00339 std::string tcpHost() const; 00340 int tcpPort() const; 00341 00342 std::string uri() const; 00343 00344 std::string uri_; 00345 }; 00346 00347 00348 /*****************************************************************************/ 00349 /* ASYNC CONNECTION */ 00350 /*****************************************************************************/ 00351 00354 struct AsyncConnection { 00355 00356 AsyncConnection(); 00357 00358 AsyncConnection(const Address & address); 00359 00360 ~AsyncConnection(); 00361 00362 void connect(const Address & address); 00363 00368 void test(); 00369 00370 void close(); 00371 00372 // Struct to specify a timeout, either absolute or relative 00373 struct Timeout { 00374 Timeout() // no timeout 00375 : expiry(Date::positiveInfinity()) 00376 { 00377 } 00378 00379 Timeout(double relativeTime) 00380 : expiry(Date::now().plusSeconds(relativeTime)) 00381 { 00382 } 00383 00384 Timeout(Date absoluteTime) 00385 : expiry(absoluteTime) 00386 { 00387 } 00388 00389 Date expiry; 00390 }; 00391 00392 typedef boost::function<void (const Result &)> OnResult; 00393 typedef boost::function<void (const Results &)> OnResults; 00394 00398 int64_t queue(const Command & command, 00399 const OnResult & onResult = OnResult(), 00400 Timeout timeout = Timeout()); 00401 00403 Result exec(const Command & command, Timeout timeout = Timeout()); 00404 00406 void queueMulti(const std::vector<Command> & commands, 00407 const OnResults & onResults = OnResults(), 00408 Timeout timeout = Timeout()); 00409 00411 Results execMulti(const std::vector<Command> & command, 00412 Timeout timeout = Timeout()); 00413 00415 void cancel(int handle); 00416 00417 size_t numRequestsPending() const 00418 { 00419 return requests.size(); 00420 } 00421 00422 size_t numTimeoutsPending() const 00423 { 00424 return timeouts.size(); 00425 } 00426 00427 private: 00428 std::deque<std::function<void ()> > replyQueue; 00429 00430 static void resultCallback(redisAsyncContext * context, void *, void *); 00431 00432 struct RequestData; 00433 00434 typedef boost::recursive_mutex Lock; 00435 Lock lock; 00436 00437 typedef std::map<uint64_t, std::shared_ptr<RequestData> > Requests; 00438 Requests requests; 00439 00440 typedef std::multimap<Datacratic::Date, Requests::iterator> Timeouts; 00441 Timeouts timeouts; 00442 00446 void expireTimeouts(Datacratic::Date now); 00447 00448 Datacratic::Date earliestTimeout; 00449 00450 void checkError(const char * command) 00451 { 00452 if (!context_) 00453 throw ML::Exception("no connection to Redis"); 00454 00455 if (context_->err) { 00456 throw ML::Exception("Redis command %s returned error %s", 00457 command, context_->errstr); 00458 } 00459 } 00460 00461 Address address; 00462 redisAsyncContext * context_; 00463 int64_t idNum; 00464 00465 struct EventLoop; 00466 std::shared_ptr<EventLoop> eventLoop; 00467 00468 struct MultiAggregator; 00469 }; 00470 00471 } // namespace Datacratic 00472 00473 #endif /* __redis__redis_h__ */