RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/zmq_utils.h
00001 /* zmq_utils.h                                                     -*- C++ -*-
00002    Jeremy Barnes, 15 March 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    Utilities for zmq.
00006 */
00007 
00008 #pragma once
00009 
00010 #include <unistd.h>
00011 #include <string>
00012 #include <iostream>
00013 #include <cstdio>
00014 #include <memory>
00015 #include <boost/utility.hpp>
00016 #include "soa/service/zmq.hpp"
00017 #include "soa/jsoncpp/value.h"
00018 #include "soa/types/date.h"
00019 #include "soa/service/port_range_service.h"
00020 #include "jml/arch/format.h"
00021 #include "jml/arch/exception.h"
00022 #include "jml/compiler/compiler.h"
00023 
00024 #if 0
00025 #define BLOCK_FLAG 0
00026 #else
00027 #define BLOCK_FLAG ZMQ_DONTWAIT
00028 #endif
00029 
00030 namespace Datacratic {
00031 
00032 inline void setIdentity(zmq::socket_t & sock, const std::string & identity)
00033 {
00034     sock.setsockopt(ZMQ_IDENTITY, (void *)identity.c_str(), identity.size());
00035 }
00036 
00037 inline void subscribeChannel(zmq::socket_t & sock, const std::string & channel)
00038 {
00039     sock.setsockopt(ZMQ_SUBSCRIBE, channel.c_str(), channel.length());
00040 }
00041 
00042 inline void unsubscribeChannel(zmq::socket_t & sock, const std::string & channel)
00043 {
00044     sock.setsockopt(ZMQ_UNSUBSCRIBE, channel.c_str(), channel.length());
00045 }
00046 
00047 inline void setHwm(zmq::socket_t & sock, int hwm)
00048 {
00049     sock.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
00050     sock.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
00051 }
00052 
00053 inline void throwSocketError(const char *data)
00054 {
00055     throw ML::Exception(errno,
00056                         "unhandled error (" + std::to_string(errno) + ")",
00057                         data);
00058 }
00059 
00061 inline std::pair<bool, bool>
00062 getEvents(zmq::socket_t & sock)
00063 {
00064     int events = 0;
00065     size_t events_size = sizeof(events);
00066     sock.getsockopt(ZMQ_EVENTS, &events, &events_size);
00067     return std::make_pair(events & ZMQ_POLLIN, events & ZMQ_POLLOUT);
00068 }
00069 
00070 inline std::string recvMesg(zmq::socket_t & sock)
00071 {
00072      zmq::message_t message;
00073      while (!sock.recv(&message, 0)) ;
00074 
00075      return std::string((const char *)message.data(),
00076                         ((const char *)message.data()) + message.size());
00077 }
00078 
00079 inline std::pair<std::string, bool>
00080 recvMesgNonBlocking(zmq::socket_t & sock)
00081 {
00082     zmq::message_t message;
00083     bool got = sock.recv(&message, ZMQ_NOBLOCK);
00084     if (!got)
00085         return std::make_pair("", false);
00086     return std::make_pair
00087         (std::string((const char *)message.data(),
00088                      ((const char *)message.data()) + message.size()),
00089          true);
00090 }
00091 
00092 inline void recvMesg(zmq::socket_t & sock, char & val)
00093 {
00094     zmq::message_t message;
00095     while (!sock.recv(&message, 0)) ;
00096     if (message.size() != 1)
00097         throw ML::Exception("invalid char message size");
00098     val = *(const char *)message.data();
00099 }
00100 
00101 inline std::vector<std::string> recvAll(zmq::socket_t & sock)
00102 {
00103     std::vector<std::string> result;
00104 
00105     int64_t more = 1;
00106     size_t more_size = sizeof (more);
00107 
00108     while (more) {
00109         result.push_back(recvMesg(sock));
00110         sock.getsockopt(ZMQ_RCVMORE, &more, &more_size);
00111     }
00112 
00113     return result;
00114 }
00115 
00116 inline std::vector<std::string>
00117 recvAllNonBlocking(zmq::socket_t & sock)
00118 {
00119     std::vector<std::string> result;
00120 
00121     std::string msg0;
00122     bool got;
00123     std::tie(msg0, got) = recvMesgNonBlocking(sock);
00124     if (got) {
00125         result.push_back(msg0);
00126 
00127         for (;;) {
00128             int64_t more = 1;
00129             size_t more_size = sizeof (more);
00130             sock.getsockopt(ZMQ_RCVMORE, &more, &more_size);
00131             //using namespace std;
00132             //cerr << "result.size() " << result.size()
00133             //     << " more " << more << endl;
00134             if (!more) break;
00135             result.push_back(recvMesg(sock));
00136         }
00137     }
00138     
00139     return result;
00140 }
00141 
00142 inline zmq::message_t encodeMessage(const std::string & message)
00143 {
00144     return message;
00145 }
00146 
00147 inline zmq::message_t encodeMessage(const char * msg)
00148 {
00149     size_t sz = strlen(msg);
00150     zmq::message_t zmsg(sz);
00151     std::copy(msg, msg + sz, (char *)zmsg.data());
00152     return zmsg;
00153 }
00154 
00155 inline zmq::message_t encodeMessage(const Date & date)
00156 {
00157     return ML::format("%.5f", date.secondsSinceEpoch());
00158 }
00159 
00160 inline zmq::message_t encodeMessage(int i)
00161 {
00162     return ML::format("%d", i);
00163 }
00164 
00165 inline zmq::message_t encodeMessage(char c)
00166 {
00167     return ML::format("%c", c);
00168 }
00169 
00170 inline std::string chomp(const std::string & s)
00171 {
00172     const char * start = s.c_str();
00173     const char * end = start + s.length();
00174     
00175     while (end > start && end[-1] == '\n') --end;
00176     
00177     if (end == start + s.length()) return s;
00178     return std::string(start, end);
00179 }
00180 
00181 inline zmq::message_t encodeMessage(const Json::Value & j)
00182 {
00183     return chomp(j.toString());
00184 }
00185 
00186 inline bool sendMesg(zmq::socket_t & sock,
00187                      const std::string & msg,
00188                      int options = 0)
00189 {
00190     zmq::message_t msg1(msg.size());
00191     std::copy(msg.begin(), msg.end(), (char *)msg1.data());
00192     return sock.send(msg1, options);
00193 }
00194     
00195 inline bool sendMesg(zmq::socket_t & sock,
00196                      const char * msg,
00197                      int options = 0)
00198 {
00199     size_t sz = strlen(msg);
00200     zmq::message_t msg1(sz);
00201     std::copy(msg, msg + sz, (char *)msg1.data());
00202     return sock.send(msg1, options);
00203 }
00204     
00205 inline bool sendMesg(zmq::socket_t & sock,
00206                      const void * msg,
00207                      size_t sz,
00208                      int options = 0)
00209 {
00210     zmq::message_t msg1(sz);
00211     memcpy(msg1.data(), msg, sz);
00212     return sock.send(msg1, options);
00213 }
00214 
00215 template<typename T>
00216 inline bool sendMesg(zmq::socket_t & sock,
00217                      const T & obj,
00218                      int options = 0)
00219 {
00220     return sock.send(encodeMessage(obj), options);
00221 }
00222 
00223 template<typename T>
00224 bool sendMesg(zmq::socket_t & socket, const std::shared_ptr<T> & val,
00225               int flags = 0)
00226 {
00227     return sendMesg(socket, sharedPtrToMessage(val), flags);
00228 }
00229 
00230 inline void sendAll(zmq::socket_t & sock,
00231                     const std::vector<std::string> & message,
00232                     int lastFlags = 0)
00233 {
00234     if (message.empty())
00235         throw ML::Exception("can't send an empty message vector");
00236 
00237     for (unsigned i = 0;  i < message.size() - 1;  ++i)
00238         if (!sendMesg(sock, message[i], ZMQ_SNDMORE | BLOCK_FLAG)) {
00239             throwSocketError(__FUNCTION__);
00240         }
00241     if (!sendMesg(sock, message.back(), lastFlags | BLOCK_FLAG)) {
00242         throwSocketError(__FUNCTION__);
00243     }
00244 }
00245 
00246 inline void sendAll(zmq::socket_t & sock,
00247                     const std::initializer_list<std::string> & message,
00248                     int lastFlags = 0)
00249 {
00250     sendAll(sock, std::vector<std::string>(message));
00251 }
00252 
00253 #if 0
00254 template<typename T>
00255 inline void sendAll(zmq::socket_t & socket,
00256                     const std::vector<T> & vals,
00257                     int lastFlags)
00258 {
00259     if (vals.empty()) {
00260         throw ML::Exception("can't send empty vector");
00261     }
00262     for (int i = 0;  i < vals.size() - 1;  ++i)
00263         sendMesg(socket, vals[i], ZMQ_SNDMORE | BLOCK_FLAG);
00264     sendMesg(socket, vals.back(), lastFlags | BLOCK_FLAG);
00265 }
00266 #endif
00267 
00268 template<typename Arg1>
00269 void sendMessage(zmq::socket_t & socket,
00270                  const Arg1 & arg1)
00271 {
00272     if (!sendMesg(socket, arg1, 0)) {
00273         throwSocketError(__FUNCTION__);
00274     }
00275 }
00276 
00277 inline void sendMessage(zmq::socket_t & socket,
00278                         const std::vector<std::string> & args)
00279 {
00280     sendAll(socket, args, 0);
00281 }
00282 
00283 template<typename Arg1, typename... Args>
00284 void sendMessage(zmq::socket_t & socket,
00285                  const Arg1 & arg1,
00286                  Args... args)
00287 {
00288     if (!sendMesg(socket, arg1, ZMQ_SNDMORE | BLOCK_FLAG)) {
00289         throwSocketError(__FUNCTION__);
00290     }
00291     sendMessage(socket, args...);
00292 }
00293 
00294 /* non-throwing versions, where EAGAIN cases would return false */
00295 template<typename Arg1>
00296 bool trySendMessage(zmq::socket_t & socket, const Arg1 & arg1)
00297 {
00298     if (!sendMesg(socket, arg1, 0)) {
00299         if (errno == EAGAIN)
00300             return false;
00301         else
00302             throwSocketError(__FUNCTION__);
00303     }
00304 
00305     return true;
00306 }
00307 
00308 template<typename Arg1, typename... Args>
00309 bool trySendMessage(zmq::socket_t & socket, const Arg1 & arg1, Args... args)
00310 {
00311     if (!sendMesg(socket, arg1, ZMQ_SNDMORE | BLOCK_FLAG)) {
00312         if (errno == EAGAIN)
00313             return false;
00314         else
00315             throwSocketError(__FUNCTION__);
00316     }
00317     return trySendMessage(socket, args...);
00318 }
00319 
00320 inline bool trySendAll(zmq::socket_t & sock,
00321                        const std::vector<std::string> & message,
00322                        int lastFlags = 0)
00323 {
00324     if (message.empty())
00325         throw ML::Exception("can't send an empty message vector");
00326 
00327     for (unsigned i = 0; i < message.size() - 1;  ++i) {
00328         if (!sendMesg(sock, message[i], ZMQ_SNDMORE | BLOCK_FLAG)) {
00329             if (errno == EAGAIN)
00330                 return false;
00331             else
00332                 throwSocketError(__FUNCTION__);
00333         }
00334     }
00335 
00336     if (!sendMesg(sock, message.back(), lastFlags | BLOCK_FLAG)) {
00337         if (errno == EAGAIN)
00338             return false;
00339         else
00340             throwSocketError(__FUNCTION__);
00341     }
00342 
00343     return true;
00344 }
00345 
00346 inline bool trySendAll(zmq::socket_t & sock,
00347                         const std::initializer_list<std::string> & message,
00348                         int lastFlags = 0)
00349 {
00350     return trySendAll(sock, std::vector<std::string>(message), lastFlags);
00351 }
00352 
00353 /* We take a copy of the shared pointer in a heap-allocated object that
00354    makes sure that it continues to have a reference.  The control
00355    connection then takes control of the pointer.  This allows us to
00356    effectively transfer a shared pointer over a zeromq socket.
00357 */
00358 template<typename T>
00359 std::string sharedPtrToMessage(std::shared_ptr<T> ptr)
00360 {
00361     static const int mypid = getpid();
00362 
00363     std::auto_ptr<std::shared_ptr<T> > ptrToTransfer
00364         (new std::shared_ptr<T>(ptr));
00365     
00366     std::string ptrMsg
00367         = ML::format("%p:%d:%p",
00368                      ptrToTransfer.get(), mypid,
00369                      typeid(T).name());
00370     
00371     ptrToTransfer.release();
00372     
00373     return ptrMsg;
00374 }
00375 
00376 template<typename T>
00377 std::shared_ptr<T>
00378 sharedPtrFromMessage(const std::string & message)
00379 {
00380     static const int mypid = getpid();
00381 
00382     std::shared_ptr<T> * ptr;
00383     int pid;
00384     const char * name;
00385 
00386     int res = std::sscanf(message.c_str(), "%p:%d:%p", &ptr, &pid, &name);
00387     if (res != 3)
00388         throw ML::Exception("failure reconstituting auction");
00389     if (pid != mypid)
00390         throw ML::Exception("message comes from different pid");
00391     if (name != typeid(T).name())
00392         throw ML::Exception("wrong name for type info: %s vs %s",
00393                             name, typeid(T).name());
00394 
00395     std::auto_ptr<std::shared_ptr<T> > ptrHolder(ptr);
00396     return *ptr;
00397 }
00398 
00399 template<typename T>
00400 void recvMesg(zmq::socket_t & sock, std::shared_ptr<T> & val)
00401 {
00402     return sharedPtrFromMessage<T>(recvMesg(sock));
00403 }
00404 
00405 inline void close(zmq::socket_t & sock)
00406 {
00407     zmq_close(sock);
00408 }
00409 
00410 inline int
00411 bindAndReturnOpenTcpPort(zmq::socket_t & socket, PortRange const & portRange, const std::string & host) {
00412     std::string uri;
00413     int port = portRange.bindPort([&](int port) {
00414         uri = ML::format("tcp://%s:%d", host.c_str(), port);
00415         return zmq_bind(socket, uri.c_str()) == 0;
00416     });
00417 
00418     if(port == -1)
00419         throw ML::Exception("no open TCP port '%s': %s %s",
00420                             uri.c_str(),
00421                             zmq_strerror(zmq_errno()),
00422                             strerror(errno));
00423     return port;
00424 }
00425 
00426 inline std::string
00427 bindToOpenTcpPort(zmq::socket_t & socket, PortRange const & portRange, const std::string & host) {
00428     int port = bindAndReturnOpenTcpPort(socket, portRange, host);
00429     return ML::format("tcp://%s:%d", host.c_str(), port);
00430 }
00431 
00433 std::string printZmqEvent(int event);
00434 
00438 bool zmqEventIsError(int event);
00439 
00440 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator