![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* zmq_utils.h -*- C++ -*- 00002 Jeremy Barnes, 15 March 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 Utilities for zmq. 00006 */ 00007 00008 #pragma once 00009 00010 #include <unistd.h> 00011 #include <string> 00012 #include <iostream> 00013 #include <cstdio> 00014 #include <memory> 00015 #include <boost/utility.hpp> 00016 #include "soa/service/zmq.hpp" 00017 #include "soa/jsoncpp/value.h" 00018 #include "soa/types/date.h" 00019 #include "soa/service/port_range_service.h" 00020 #include "jml/arch/format.h" 00021 #include "jml/arch/exception.h" 00022 #include "jml/compiler/compiler.h" 00023 00024 #if 0 00025 #define BLOCK_FLAG 0 00026 #else 00027 #define BLOCK_FLAG ZMQ_DONTWAIT 00028 #endif 00029 00030 namespace Datacratic { 00031 00032 inline void setIdentity(zmq::socket_t & sock, const std::string & identity) 00033 { 00034 sock.setsockopt(ZMQ_IDENTITY, (void *)identity.c_str(), identity.size()); 00035 } 00036 00037 inline void subscribeChannel(zmq::socket_t & sock, const std::string & channel) 00038 { 00039 sock.setsockopt(ZMQ_SUBSCRIBE, channel.c_str(), channel.length()); 00040 } 00041 00042 inline void unsubscribeChannel(zmq::socket_t & sock, const std::string & channel) 00043 { 00044 sock.setsockopt(ZMQ_UNSUBSCRIBE, channel.c_str(), channel.length()); 00045 } 00046 00047 inline void setHwm(zmq::socket_t & sock, int hwm) 00048 { 00049 sock.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); 00050 sock.setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); 00051 } 00052 00053 inline void throwSocketError(const char *data) 00054 { 00055 throw ML::Exception(errno, 00056 "unhandled error (" + std::to_string(errno) + ")", 00057 data); 00058 } 00059 00061 inline std::pair<bool, bool> 00062 getEvents(zmq::socket_t & sock) 00063 { 00064 int events = 0; 00065 size_t events_size = sizeof(events); 00066 sock.getsockopt(ZMQ_EVENTS, &events, &events_size); 00067 return std::make_pair(events & ZMQ_POLLIN, events & ZMQ_POLLOUT); 00068 } 00069 00070 inline std::string recvMesg(zmq::socket_t & sock) 00071 { 00072 zmq::message_t message; 00073 while (!sock.recv(&message, 0)) ; 00074 00075 return std::string((const char *)message.data(), 00076 ((const char *)message.data()) + message.size()); 00077 } 00078 00079 inline std::pair<std::string, bool> 00080 recvMesgNonBlocking(zmq::socket_t & sock) 00081 { 00082 zmq::message_t message; 00083 bool got = sock.recv(&message, ZMQ_NOBLOCK); 00084 if (!got) 00085 return std::make_pair("", false); 00086 return std::make_pair 00087 (std::string((const char *)message.data(), 00088 ((const char *)message.data()) + message.size()), 00089 true); 00090 } 00091 00092 inline void recvMesg(zmq::socket_t & sock, char & val) 00093 { 00094 zmq::message_t message; 00095 while (!sock.recv(&message, 0)) ; 00096 if (message.size() != 1) 00097 throw ML::Exception("invalid char message size"); 00098 val = *(const char *)message.data(); 00099 } 00100 00101 inline std::vector<std::string> recvAll(zmq::socket_t & sock) 00102 { 00103 std::vector<std::string> result; 00104 00105 int64_t more = 1; 00106 size_t more_size = sizeof (more); 00107 00108 while (more) { 00109 result.push_back(recvMesg(sock)); 00110 sock.getsockopt(ZMQ_RCVMORE, &more, &more_size); 00111 } 00112 00113 return result; 00114 } 00115 00116 inline std::vector<std::string> 00117 recvAllNonBlocking(zmq::socket_t & sock) 00118 { 00119 std::vector<std::string> result; 00120 00121 std::string msg0; 00122 bool got; 00123 std::tie(msg0, got) = recvMesgNonBlocking(sock); 00124 if (got) { 00125 result.push_back(msg0); 00126 00127 for (;;) { 00128 int64_t more = 1; 00129 size_t more_size = sizeof (more); 00130 sock.getsockopt(ZMQ_RCVMORE, &more, &more_size); 00131 //using namespace std; 00132 //cerr << "result.size() " << result.size() 00133 // << " more " << more << endl; 00134 if (!more) break; 00135 result.push_back(recvMesg(sock)); 00136 } 00137 } 00138 00139 return result; 00140 } 00141 00142 inline zmq::message_t encodeMessage(const std::string & message) 00143 { 00144 return message; 00145 } 00146 00147 inline zmq::message_t encodeMessage(const char * msg) 00148 { 00149 size_t sz = strlen(msg); 00150 zmq::message_t zmsg(sz); 00151 std::copy(msg, msg + sz, (char *)zmsg.data()); 00152 return zmsg; 00153 } 00154 00155 inline zmq::message_t encodeMessage(const Date & date) 00156 { 00157 return ML::format("%.5f", date.secondsSinceEpoch()); 00158 } 00159 00160 inline zmq::message_t encodeMessage(int i) 00161 { 00162 return ML::format("%d", i); 00163 } 00164 00165 inline zmq::message_t encodeMessage(char c) 00166 { 00167 return ML::format("%c", c); 00168 } 00169 00170 inline std::string chomp(const std::string & s) 00171 { 00172 const char * start = s.c_str(); 00173 const char * end = start + s.length(); 00174 00175 while (end > start && end[-1] == '\n') --end; 00176 00177 if (end == start + s.length()) return s; 00178 return std::string(start, end); 00179 } 00180 00181 inline zmq::message_t encodeMessage(const Json::Value & j) 00182 { 00183 return chomp(j.toString()); 00184 } 00185 00186 inline bool sendMesg(zmq::socket_t & sock, 00187 const std::string & msg, 00188 int options = 0) 00189 { 00190 zmq::message_t msg1(msg.size()); 00191 std::copy(msg.begin(), msg.end(), (char *)msg1.data()); 00192 return sock.send(msg1, options); 00193 } 00194 00195 inline bool sendMesg(zmq::socket_t & sock, 00196 const char * msg, 00197 int options = 0) 00198 { 00199 size_t sz = strlen(msg); 00200 zmq::message_t msg1(sz); 00201 std::copy(msg, msg + sz, (char *)msg1.data()); 00202 return sock.send(msg1, options); 00203 } 00204 00205 inline bool sendMesg(zmq::socket_t & sock, 00206 const void * msg, 00207 size_t sz, 00208 int options = 0) 00209 { 00210 zmq::message_t msg1(sz); 00211 memcpy(msg1.data(), msg, sz); 00212 return sock.send(msg1, options); 00213 } 00214 00215 template<typename T> 00216 inline bool sendMesg(zmq::socket_t & sock, 00217 const T & obj, 00218 int options = 0) 00219 { 00220 return sock.send(encodeMessage(obj), options); 00221 } 00222 00223 template<typename T> 00224 bool sendMesg(zmq::socket_t & socket, const std::shared_ptr<T> & val, 00225 int flags = 0) 00226 { 00227 return sendMesg(socket, sharedPtrToMessage(val), flags); 00228 } 00229 00230 inline void sendAll(zmq::socket_t & sock, 00231 const std::vector<std::string> & message, 00232 int lastFlags = 0) 00233 { 00234 if (message.empty()) 00235 throw ML::Exception("can't send an empty message vector"); 00236 00237 for (unsigned i = 0; i < message.size() - 1; ++i) 00238 if (!sendMesg(sock, message[i], ZMQ_SNDMORE | BLOCK_FLAG)) { 00239 throwSocketError(__FUNCTION__); 00240 } 00241 if (!sendMesg(sock, message.back(), lastFlags | BLOCK_FLAG)) { 00242 throwSocketError(__FUNCTION__); 00243 } 00244 } 00245 00246 inline void sendAll(zmq::socket_t & sock, 00247 const std::initializer_list<std::string> & message, 00248 int lastFlags = 0) 00249 { 00250 sendAll(sock, std::vector<std::string>(message)); 00251 } 00252 00253 #if 0 00254 template<typename T> 00255 inline void sendAll(zmq::socket_t & socket, 00256 const std::vector<T> & vals, 00257 int lastFlags) 00258 { 00259 if (vals.empty()) { 00260 throw ML::Exception("can't send empty vector"); 00261 } 00262 for (int i = 0; i < vals.size() - 1; ++i) 00263 sendMesg(socket, vals[i], ZMQ_SNDMORE | BLOCK_FLAG); 00264 sendMesg(socket, vals.back(), lastFlags | BLOCK_FLAG); 00265 } 00266 #endif 00267 00268 template<typename Arg1> 00269 void sendMessage(zmq::socket_t & socket, 00270 const Arg1 & arg1) 00271 { 00272 if (!sendMesg(socket, arg1, 0)) { 00273 throwSocketError(__FUNCTION__); 00274 } 00275 } 00276 00277 inline void sendMessage(zmq::socket_t & socket, 00278 const std::vector<std::string> & args) 00279 { 00280 sendAll(socket, args, 0); 00281 } 00282 00283 template<typename Arg1, typename... Args> 00284 void sendMessage(zmq::socket_t & socket, 00285 const Arg1 & arg1, 00286 Args... args) 00287 { 00288 if (!sendMesg(socket, arg1, ZMQ_SNDMORE | BLOCK_FLAG)) { 00289 throwSocketError(__FUNCTION__); 00290 } 00291 sendMessage(socket, args...); 00292 } 00293 00294 /* non-throwing versions, where EAGAIN cases would return false */ 00295 template<typename Arg1> 00296 bool trySendMessage(zmq::socket_t & socket, const Arg1 & arg1) 00297 { 00298 if (!sendMesg(socket, arg1, 0)) { 00299 if (errno == EAGAIN) 00300 return false; 00301 else 00302 throwSocketError(__FUNCTION__); 00303 } 00304 00305 return true; 00306 } 00307 00308 template<typename Arg1, typename... Args> 00309 bool trySendMessage(zmq::socket_t & socket, const Arg1 & arg1, Args... args) 00310 { 00311 if (!sendMesg(socket, arg1, ZMQ_SNDMORE | BLOCK_FLAG)) { 00312 if (errno == EAGAIN) 00313 return false; 00314 else 00315 throwSocketError(__FUNCTION__); 00316 } 00317 return trySendMessage(socket, args...); 00318 } 00319 00320 inline bool trySendAll(zmq::socket_t & sock, 00321 const std::vector<std::string> & message, 00322 int lastFlags = 0) 00323 { 00324 if (message.empty()) 00325 throw ML::Exception("can't send an empty message vector"); 00326 00327 for (unsigned i = 0; i < message.size() - 1; ++i) { 00328 if (!sendMesg(sock, message[i], ZMQ_SNDMORE | BLOCK_FLAG)) { 00329 if (errno == EAGAIN) 00330 return false; 00331 else 00332 throwSocketError(__FUNCTION__); 00333 } 00334 } 00335 00336 if (!sendMesg(sock, message.back(), lastFlags | BLOCK_FLAG)) { 00337 if (errno == EAGAIN) 00338 return false; 00339 else 00340 throwSocketError(__FUNCTION__); 00341 } 00342 00343 return true; 00344 } 00345 00346 inline bool trySendAll(zmq::socket_t & sock, 00347 const std::initializer_list<std::string> & message, 00348 int lastFlags = 0) 00349 { 00350 return trySendAll(sock, std::vector<std::string>(message), lastFlags); 00351 } 00352 00353 /* We take a copy of the shared pointer in a heap-allocated object that 00354 makes sure that it continues to have a reference. The control 00355 connection then takes control of the pointer. This allows us to 00356 effectively transfer a shared pointer over a zeromq socket. 00357 */ 00358 template<typename T> 00359 std::string sharedPtrToMessage(std::shared_ptr<T> ptr) 00360 { 00361 static const int mypid = getpid(); 00362 00363 std::auto_ptr<std::shared_ptr<T> > ptrToTransfer 00364 (new std::shared_ptr<T>(ptr)); 00365 00366 std::string ptrMsg 00367 = ML::format("%p:%d:%p", 00368 ptrToTransfer.get(), mypid, 00369 typeid(T).name()); 00370 00371 ptrToTransfer.release(); 00372 00373 return ptrMsg; 00374 } 00375 00376 template<typename T> 00377 std::shared_ptr<T> 00378 sharedPtrFromMessage(const std::string & message) 00379 { 00380 static const int mypid = getpid(); 00381 00382 std::shared_ptr<T> * ptr; 00383 int pid; 00384 const char * name; 00385 00386 int res = std::sscanf(message.c_str(), "%p:%d:%p", &ptr, &pid, &name); 00387 if (res != 3) 00388 throw ML::Exception("failure reconstituting auction"); 00389 if (pid != mypid) 00390 throw ML::Exception("message comes from different pid"); 00391 if (name != typeid(T).name()) 00392 throw ML::Exception("wrong name for type info: %s vs %s", 00393 name, typeid(T).name()); 00394 00395 std::auto_ptr<std::shared_ptr<T> > ptrHolder(ptr); 00396 return *ptr; 00397 } 00398 00399 template<typename T> 00400 void recvMesg(zmq::socket_t & sock, std::shared_ptr<T> & val) 00401 { 00402 return sharedPtrFromMessage<T>(recvMesg(sock)); 00403 } 00404 00405 inline void close(zmq::socket_t & sock) 00406 { 00407 zmq_close(sock); 00408 } 00409 00410 inline int 00411 bindAndReturnOpenTcpPort(zmq::socket_t & socket, PortRange const & portRange, const std::string & host) { 00412 std::string uri; 00413 int port = portRange.bindPort([&](int port) { 00414 uri = ML::format("tcp://%s:%d", host.c_str(), port); 00415 return zmq_bind(socket, uri.c_str()) == 0; 00416 }); 00417 00418 if(port == -1) 00419 throw ML::Exception("no open TCP port '%s': %s %s", 00420 uri.c_str(), 00421 zmq_strerror(zmq_errno()), 00422 strerror(errno)); 00423 return port; 00424 } 00425 00426 inline std::string 00427 bindToOpenTcpPort(zmq::socket_t & socket, PortRange const & portRange, const std::string & host) { 00428 int port = bindAndReturnOpenTcpPort(socket, portRange, host); 00429 return ML::format("tcp://%s:%d", host.c_str(), port); 00430 } 00431 00433 std::string printZmqEvent(int event); 00434 00438 bool zmqEventIsError(int event); 00439 00440 } // namespace Datacratic