![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* 00002 Copyright (c) 2007-2011 iMatix Corporation 00003 Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file 00004 00005 This file is part of 0MQ. 00006 00007 0MQ is free software; you can redistribute it and/or modify it under 00008 the terms of the GNU Lesser General Public License as published by 00009 the Free Software Foundation; either version 3 of the License, or 00010 (at your option) any later version. 00011 00012 0MQ is distributed in the hope that it will be useful, 00013 but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 GNU Lesser General Public License for more details. 00016 00017 You should have received a copy of the GNU Lesser General Public License 00018 along with this program. If not, see <http://www.gnu.org/licenses/>. 00019 */ 00020 00021 #ifndef __ZMQ_HPP_INCLUDED__ 00022 #define __ZMQ_HPP_INCLUDED__ 00023 00024 #include "zmq.h" 00025 00026 #include <cassert> 00027 #include <cstring> 00028 #include <exception> 00029 #include <mutex> 00030 #include <string> 00031 #include <iostream> 00032 00033 #include <thread> 00034 #include <set> 00035 #include "jml/utils/exc_assert.h" 00036 00037 namespace zmq 00038 { 00039 00040 typedef zmq_free_fn free_fn; 00041 typedef zmq_pollitem_t pollitem_t; 00042 00043 class socket_t; 00044 00045 class error_t : public std::exception 00046 { 00047 public: 00048 00049 error_t () : errnum (zmq_errno ()) {} 00050 00051 virtual const char *what () const throw () 00052 { 00053 return zmq_strerror (errnum); 00054 } 00055 00056 int num () const 00057 { 00058 return errnum; 00059 } 00060 00061 private: 00062 00063 int errnum; 00064 }; 00065 00066 inline int poll (zmq_pollitem_t *items_, int nitems_, long timeout_ = -1) 00067 { 00068 int rc = zmq_poll (items_, nitems_, timeout_); 00069 if (rc < 0) 00070 throw error_t (); 00071 return rc; 00072 } 00073 00074 inline void device (int device_, void * insocket_, void* outsocket_) 00075 { 00076 int rc = zmq_device (device_, insocket_, outsocket_); 00077 if (rc != 0) 00078 throw error_t (); 00079 } 00080 00081 class message_t : private zmq_msg_t 00082 { 00083 friend class socket_t; 00084 00085 public: 00086 00087 inline message_t () 00088 { 00089 int rc = zmq_msg_init (this); 00090 if (rc != 0) 00091 throw error_t (); 00092 } 00093 00094 inline message_t (size_t size_) 00095 { 00096 int rc = zmq_msg_init_size (this, size_); 00097 if (rc != 0) 00098 throw error_t (); 00099 } 00100 00101 inline message_t (void *data_, size_t size_, free_fn *ffn_, 00102 void *hint_ = NULL) 00103 { 00104 int rc = zmq_msg_init_data (this, data_, size_, ffn_, hint_); 00105 if (rc != 0) 00106 throw error_t (); 00107 } 00108 00109 inline message_t (const std::string & s) 00110 { 00111 int rc = zmq_msg_init_size (this, s.size()); 00112 if (rc != 0) 00113 throw error_t (); 00114 std::copy(s.begin(), s.end(), (char *)data()); 00115 } 00116 00117 inline message_t (const message_t & other) noexcept 00118 { 00119 int rc = zmq_msg_init (this); 00120 if (rc == 0) 00121 rc = zmq_msg_copy(this, const_cast<message_t *>(&other)); 00122 if (rc != 0) 00123 throw error_t (); 00124 } 00125 00126 inline message_t (message_t && other) noexcept 00127 { 00128 int rc = zmq_msg_init (this); 00129 if (rc == 0) 00130 rc = zmq_msg_move(this, &other); 00131 if (rc != 0) 00132 throw error_t (); 00133 } 00134 00135 inline message_t & operator = (const message_t & other) noexcept 00136 { 00137 int rc = zmq_msg_copy(this, const_cast<message_t *>(&other)); 00138 if (rc != 0) 00139 throw error_t (); 00140 return *this; 00141 } 00142 00143 inline message_t & operator = (message_t && other) noexcept 00144 { 00145 int rc = zmq_msg_move(this, &other); 00146 if (rc != 0) 00147 throw error_t (); 00148 return *this; 00149 } 00150 00151 inline ~message_t () 00152 { 00153 int rc = zmq_msg_close (this); 00154 assert (rc == 0); 00155 (void)rc; 00156 } 00157 00158 inline void rebuild () 00159 { 00160 int rc = zmq_msg_close (this); 00161 if (rc != 0) 00162 throw error_t (); 00163 rc = zmq_msg_init (this); 00164 if (rc != 0) 00165 throw error_t (); 00166 } 00167 00168 inline void rebuild (size_t size_) 00169 { 00170 int rc = zmq_msg_close (this); 00171 if (rc != 0) 00172 throw error_t (); 00173 rc = zmq_msg_init_size (this, size_); 00174 if (rc != 0) 00175 throw error_t (); 00176 } 00177 00178 inline void rebuild (void *data_, size_t size_, free_fn *ffn_, 00179 void *hint_ = NULL) 00180 { 00181 int rc = zmq_msg_close (this); 00182 if (rc != 0) 00183 throw error_t (); 00184 rc = zmq_msg_init_data (this, data_, size_, ffn_, hint_); 00185 if (rc != 0) 00186 throw error_t (); 00187 } 00188 00189 inline void move (message_t *msg_) 00190 { 00191 int rc = zmq_msg_move (this, (zmq_msg_t*) msg_); 00192 if (rc != 0) 00193 throw error_t (); 00194 } 00195 00196 inline void copy (message_t *msg_) 00197 { 00198 int rc = zmq_msg_copy (this, (zmq_msg_t*) msg_); 00199 if (rc != 0) 00200 throw error_t (); 00201 } 00202 00203 inline char *data () 00204 { 00205 return (char *)zmq_msg_data (this); 00206 } 00207 00208 inline const char *data () const 00209 { 00210 return (const char *)zmq_msg_data (const_cast<message_t *>(this)); 00211 } 00212 00213 inline size_t size () const 00214 { 00215 return zmq_msg_size (const_cast<message_t *>(this)); 00216 } 00217 00218 std::string toString() const 00219 { 00220 return std::string(data(), data() + size()); 00221 } 00222 00223 private: 00224 00225 // Disable implicit message copying, so that users won't use shared 00226 // messages (less efficient) without being aware of the fact. 00227 //message_t (const message_t&); 00228 //void operator = (const message_t&); 00229 }; 00230 00231 class context_t 00232 { 00233 friend class socket_t; 00234 00235 public: 00236 00237 inline context_t (int io_threads_) 00238 { 00239 ptr = zmq_init (io_threads_); 00240 if (ptr == NULL) 00241 throw error_t (); 00242 } 00243 00244 inline ~context_t () 00245 { 00246 // If this throws, it means that you forgot to destroy a zmq_socket_t object 00247 // created with this context before you closed the context. 00248 ExcAssertEqual(sockets.size(), 0); 00249 00250 int rc = zmq_term (ptr); 00251 assert (rc == 0); 00252 (void)rc; 00253 } 00254 00255 // Be careful with this, it's probably only useful for 00256 // using the C api together with an existing C++ api. 00257 // Normally you should never need to use this. 00258 inline operator void* () 00259 { 00260 return ptr; 00261 } 00262 00263 void registerSocket(socket_t * sock) 00264 { 00265 std::unique_lock<std::mutex> guard(lock); 00266 ExcAssertEqual(sockets.count(sock), 0); 00267 sockets.insert(sock); 00268 } 00269 00270 void unregisterSocket(socket_t * sock) 00271 { 00272 std::unique_lock<std::mutex> guard(lock); 00273 ExcAssertEqual(sockets.count(sock), 1); 00274 sockets.erase(sock); 00275 } 00276 00277 private: 00278 00279 void *ptr; 00280 00281 context_t (const context_t&); 00282 void operator = (const context_t&); 00283 00284 std::mutex lock; 00285 std::set<socket_t *> sockets; 00286 }; 00287 00288 class socket_t 00289 { 00290 public: 00291 context_t * context_; 00292 00293 inline socket_t (context_t &context_, int type_) 00294 : context_(&context_) 00295 { 00296 ptr = zmq_socket (context_.ptr, type_); 00297 if (ptr == NULL) 00298 throw error_t (); 00299 00300 context_.registerSocket(this); 00301 } 00302 00303 inline ~socket_t () 00304 { 00305 int linger = 0; 00306 setsockopt (ZMQ_LINGER, &linger, sizeof(linger)); 00307 int rc = zmq_close (ptr); 00308 assert (rc == 0); 00309 (void)rc; 00310 00311 context_->unregisterSocket(this); 00312 } 00313 00314 inline operator void* () 00315 { 00316 return ptr; 00317 } 00318 00319 inline void setsockopt (int option_, const void *optval_, 00320 size_t optvallen_) 00321 { 00322 int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_); 00323 if (rc != 0) 00324 throw error_t (); 00325 } 00326 00327 inline void getsockopt (int option_, void *optval_, 00328 size_t *optvallen_) 00329 { 00330 int rc = zmq_getsockopt (ptr, option_, optval_, optvallen_); 00331 if (rc != 0) 00332 throw error_t (); 00333 } 00334 00335 inline void bind (const std::string & addr_) 00336 { 00337 int rc = zmq_bind (ptr, addr_.c_str()); 00338 if (rc != 0) 00339 throw error_t (); 00340 } 00341 00342 inline void unbind (const std::string & addr_) 00343 { 00344 int rc = zmq_unbind (ptr, addr_.c_str()); 00345 if (rc != 0) 00346 throw error_t (); 00347 } 00348 00349 inline int tryUnbind (const std::string & addr_) 00350 { 00351 int rc = zmq_unbind (ptr, addr_.c_str()); 00352 return rc; 00353 } 00354 00355 inline void connect (const std::string & addr_) 00356 { 00357 int rc = zmq_connect (ptr, addr_.c_str()); 00358 if (rc != 0) 00359 throw error_t (); 00360 } 00361 00362 inline void disconnect (const std::string & addr_) 00363 { 00364 int rc = zmq_disconnect (ptr, addr_.c_str()); 00365 if (rc != 0) 00366 throw error_t (); 00367 } 00368 00369 inline int tryDisconnect (const std::string & addr_) 00370 { 00371 return zmq_disconnect (ptr, addr_.c_str()); 00372 } 00373 00374 inline bool send (message_t &msg_, int flags_ = 0) 00375 { 00376 int rc = zmq_sendmsg (ptr, &msg_, flags_); 00377 if (rc >= 0) 00378 return true; 00379 if (rc == -1 && zmq_errno () == EAGAIN) 00380 return false; 00381 throw error_t (); 00382 } 00383 00384 inline bool send (message_t && msg_, int flags_ = 0) 00385 { 00386 int rc = zmq_sendmsg (ptr, &msg_, flags_); 00387 if (rc >= 0) 00388 return true; 00389 if (rc == -1 && zmq_errno () == EAGAIN) 00390 return false; 00391 throw error_t (); 00392 } 00393 00394 inline bool recv (message_t *msg_, int flags_ = 0) 00395 { 00396 int rc = zmq_recvmsg (ptr, msg_, flags_); 00397 if (rc >= 0) 00398 return true; 00399 if (rc == -1 && zmq_errno () == EAGAIN) 00400 return false; 00401 throw error_t (); 00402 } 00403 00404 private: 00405 00406 void *ptr; 00407 00408 socket_t (const socket_t&); 00409 socket_t (socket_t&&); 00410 void operator = (const socket_t&); 00411 void operator = (socket_t&&); 00412 }; 00413 00414 } 00415 00416 #endif