RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/redis.h
00001 /* redis.h                                                         -*- C++ -*-
00002    Jeremy Barnes, 14 November 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Connection to redis.
00006 */
00007 
00008 #ifndef __redis__redis_h__
00009 #define __redis__redis_h__
00010 
00011 #include <hiredis/hiredis.h>
00012 #include <hiredis/async.h>
00013 #include <string>
00014 #include "jml/arch/exception.h"
00015 #include "jml/utils/exc_assert.h"
00016 #include "jml/utils/string_functions.h"
00017 #include "jml/utils/unnamed_bool.h"
00018 #include "jml/utils/ring_buffer.h"
00019 #include "soa/jsoncpp/json.h"
00020 #include "soa/types/date.h"
00021 #include <boost/shared_ptr.hpp>
00022 #include <boost/function.hpp>
00023 #include <boost/thread/thread.hpp>
00024 #include <boost/thread/recursive_mutex.hpp>
00025 #include <deque>
00026 
00027 
00028 namespace Redis {
00029 
00030 
00031 using Datacratic::Date;
00032 
00033 
00034 enum ReplyType {
00035     STATUS,
00036     ERROR,
00037     INTEGER,
00038     NIL,
00039     STRING,
00040     ARRAY
00041 };
00042 
00043 
00044 /*****************************************************************************/
00045 /* REPLY                                                                     */
00046 /*****************************************************************************/
00047 
00048 struct Reply {
00049 
00050     static void doDelete(redisReply * reply)
00051     {
00052         freeReplyObject(reply);
00053     }
00054     
00055     static void noDelete(redisReply * reply)
00056     {
00057     }
00058 
00059     Reply()
00060     {
00061     }
00062 
00063     Reply(redisReply * reply, bool needDelete)
00064         : r_(reply, needDelete ? doDelete : noDelete)
00065     {
00066     }
00067 
00068     bool initialized() const { return !!r_; }
00069 
00071     Reply deepCopy() const;
00072 
00073     static redisReply * doDeepCopy(redisReply * r);
00074 
00075     operator std::string () const
00076     {
00077         return asString();
00078     }
00079 
00080     operator long long int () const
00081     {
00082         return asInt();
00083     }
00084 
00085     operator Json::Value () const
00086     {
00087         return asJson();
00088     }
00089 
00090     ReplyType type() const;
00091 
00092     std::string getString() const;
00093 
00094     std::string asString() const;
00095         
00096     long long asInt() const;
00097 
00098     long long asInt(long long defaultIfNotInteger);
00099 
00100     Json::Value asJson() const;
00101 
00102     Reply operator [] (size_t index) const
00103     {
00104         ExcAssert(r_);
00105         ExcAssertLess(index, length());
00106         return Reply(r_->element[index], false);
00107     }
00108 
00109     ssize_t length() const
00110     {
00111         ExcAssert(r_);
00112         ExcAssertEqual(r_->type, REDIS_REPLY_ARRAY);
00113         return r_->elements;
00114     }
00115 
00116 private:
00117     std::shared_ptr<redisReply> r_;
00118     bool needDelete_;
00119 };
00120 
00121 std::ostream & operator << (std::ostream & stream, const Reply & reply);
00122 
00123 
00127 struct Result {
00128     Result()
00129     {
00130     }
00131 
00132     Result(const std::string & error)
00133         : error_(error)
00134     {
00135     }
00136 
00137     Result(const Reply & reply)
00138         : reply_(reply)
00139     {
00140     }
00141 
00142     Result deepCopy() const
00143     {
00144         if (ok())
00145             return Result(reply_.deepCopy());
00146         else return Result(error_);
00147     }
00148 
00149     Reply reply_;
00150     std::string error_;
00151 
00155     static const std::string timeoutError;
00156     
00157     bool ok() const { return error_.empty(); }
00158     
00159     bool timedOut() const { return error_ == timeoutError; }
00160 
00161     JML_IMPLEMENT_OPERATOR_BOOL(ok());
00162 
00163     const Reply & reply() const
00164     {
00165         if (!error_.empty())
00166             throw ML::Exception("attempt to read reply from Redis result with"
00167                                 "error " + error_);
00168         ExcAssert(reply_.initialized());
00169         return reply_;
00170     }
00171 
00172     const std::string & error() const
00173     {
00174         return error_;
00175     }
00176 };
00177 
00178 std::ostream & operator << (std::ostream & stream, const Result & result);
00179 
00180 
00182 struct Results : public std::vector<Result> {
00183     bool ok() const
00184     {
00185         for (auto & r: *this)
00186             if (!r)
00187                 return false;
00188         return true;
00189     }
00190 
00191     JML_IMPLEMENT_OPERATOR_BOOL(ok());
00192 
00193     const Reply & reply(int index) const
00194     {
00195         return at(index).reply();
00196     }
00197 
00198     std::string error() const
00199     {
00200         for (auto & r: *this)
00201             if (!r)
00202                 return r.error();
00203         return "";
00204     }
00205 
00206     bool timedOut() const
00207     {
00208         for (auto & r: *this)
00209             if (r.timedOut())
00210                 return true;
00211         return false;
00212     }
00213 };
00214 
00215 std::ostream & operator << (std::ostream & stream, const Results & results);
00216 
00217 
00218 /*****************************************************************************/
00219 /* COMMAND                                                                   */
00220 /*****************************************************************************/
00221 
00222 struct Command {
00223     Command()
00224     {
00225     }
00226 
00227     //explicit Command(const char * args, ...);
00228 
00229     Command(const std::string & command)
00230         : formatStr(command)
00231     {
00232     }
00233 
00234     Command(const std::string & cmd,
00235             const std::initializer_list<std::string> & args)
00236         : formatStr(cmd)
00237     {
00238         for (auto arg: args)
00239             addArg(arg);
00240     }
00241     
00242     std::string formatStr;
00243     std::vector<std::string> args;
00244 
00245     //std::string formatted() const;
00246 
00247     void addArg(const std::string & arg)
00248     {
00249         args.push_back(arg);
00250     }
00251 
00252     void addArg(int64_t arg)
00253     {
00254         args.push_back(std::to_string(arg));
00255     }
00256 
00257     template<typename Arg, typename... Args>
00258     Command operator () (const Arg & head, Args&&... tail) const
00259     {
00260         Command result = *this;
00261         result.addArg(head);
00262         return result(std::forward<Args>(tail)...);
00263     }
00264     
00265     Command operator () () const
00266     {
00267         return *this;
00268     }
00269     
00270     int argc() const
00271     {
00272         return args.size() + 1;
00273     }
00274     
00275     std::vector<const char *> argv() const
00276     {
00277         std::vector<const char *> result;
00278         result.push_back(formatStr.c_str());
00279         for (const std::string & arg: args)
00280             result.push_back(arg.c_str());
00281         return result;
00282     }
00283 
00284     std::vector<size_t> argl() const
00285     {
00286         std::vector<size_t> result;
00287         result.push_back(formatStr.length());
00288         for (const std::string & arg: args)
00289             result.push_back(arg.length());
00290         return result;
00291     }
00292 };
00293 
00294 std::ostream & operator << (std::ostream & stream, const Command & command);
00295 
00296 // Commands ready to be constructed
00297 
00298 extern const Command PING;
00299 extern const Command HDEL;
00300 extern const Command HGET;
00301 extern const Command HGETALL;
00302 extern const Command HMGET;
00303 extern const Command HMSET;
00304 extern const Command WATCH;
00305 extern const Command MULTI;
00306 extern const Command EXEC;
00307 extern const Command HSET;
00308 extern const Command HINCRBY;
00309 extern const Command KEYS;                           
00310 extern const Command SET;
00311 extern const Command MSET;
00312 extern const Command GET;
00313 extern const Command MGET;
00314 extern const Command EXPIRE;
00315 extern const Command RANDOMKEY;
00316 extern const Command DEL;
00317 extern const Command SADD;
00318 extern const Command SMEMBERS;
00319 extern const Command TTL;
00320 
00321 
00322 /*****************************************************************************/
00323 /* ADDRESS                                                                   */
00324 /*****************************************************************************/
00325 
00328 struct Address {
00329     Address();
00330     Address(const std::string & uri);
00331 
00332     static Address tcp(const std::string & host, int port);
00333     static Address unix(const std::string & path);
00334 
00335     bool isUnix() const;
00336     bool isTcp() const;
00337 
00338     std::string unixPath() const;
00339     std::string tcpHost() const;
00340     int tcpPort() const;
00341 
00342     std::string uri() const;
00343 
00344     std::string uri_;
00345 };
00346 
00347 
00348 /*****************************************************************************/
00349 /* ASYNC CONNECTION                                                          */
00350 /*****************************************************************************/
00351 
00354 struct AsyncConnection {
00355     
00356     AsyncConnection();
00357     
00358     AsyncConnection(const Address & address);
00359 
00360     ~AsyncConnection();
00361 
00362     void connect(const Address & address);
00363 
00368     void test();
00369 
00370     void close();
00371 
00372     // Struct to specify a timeout, either absolute or relative
00373     struct Timeout {
00374         Timeout() // no timeout
00375             : expiry(Date::positiveInfinity())
00376         {
00377         }
00378 
00379         Timeout(double relativeTime)
00380             : expiry(Date::now().plusSeconds(relativeTime))
00381         {
00382         }
00383 
00384         Timeout(Date absoluteTime)
00385             : expiry(absoluteTime)
00386         {
00387         }
00388 
00389         Date expiry;
00390     };
00391 
00392     typedef boost::function<void (const Result &)> OnResult;
00393     typedef boost::function<void (const Results &)> OnResults;
00394 
00398     int64_t queue(const Command & command,
00399                   const OnResult & onResult = OnResult(),
00400                   Timeout timeout = Timeout());
00401 
00403     Result exec(const Command & command, Timeout timeout = Timeout());
00404 
00406     void queueMulti(const std::vector<Command> & commands,
00407                     const OnResults & onResults = OnResults(),
00408                     Timeout timeout = Timeout());
00409     
00411     Results execMulti(const std::vector<Command> & command,
00412                       Timeout timeout = Timeout());
00413     
00415     void cancel(int handle);
00416     
00417     size_t numRequestsPending() const
00418     {
00419         return requests.size();
00420     }
00421 
00422     size_t numTimeoutsPending() const
00423     {
00424         return timeouts.size();
00425     }
00426     
00427 private:
00428     std::deque<std::function<void ()> > replyQueue;
00429     
00430     static void resultCallback(redisAsyncContext * context, void *, void *);
00431 
00432     struct RequestData;
00433 
00434     typedef boost::recursive_mutex Lock;
00435     Lock lock;
00436 
00437     typedef std::map<uint64_t, std::shared_ptr<RequestData> > Requests;
00438     Requests requests;
00439     
00440     typedef std::multimap<Datacratic::Date, Requests::iterator> Timeouts;
00441     Timeouts timeouts;
00442 
00446     void expireTimeouts(Datacratic::Date now);
00447 
00448     Datacratic::Date earliestTimeout;
00449 
00450     void checkError(const char * command)
00451     {
00452         if (!context_)
00453             throw ML::Exception("no connection to Redis");
00454 
00455         if (context_->err) {
00456             throw ML::Exception("Redis command %s returned error %s",
00457                                 command, context_->errstr);
00458         }
00459     }
00460 
00461     Address address;
00462     redisAsyncContext * context_;
00463     int64_t idNum;
00464 
00465     struct EventLoop;
00466     std::shared_ptr<EventLoop> eventLoop;
00467 
00468     struct MultiAggregator;
00469 };
00470 
00471 } // namespace Datacratic
00472 
00473 #endif /* __redis__redis_h__ */
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator