RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/zmq.hpp
00001 /*
00002     Copyright (c) 2007-2011 iMatix Corporation
00003     Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
00004 
00005     This file is part of 0MQ.
00006 
00007     0MQ is free software; you can redistribute it and/or modify it under
00008     the terms of the GNU Lesser General Public License as published by
00009     the Free Software Foundation; either version 3 of the License, or
00010     (at your option) any later version.
00011 
00012     0MQ is distributed in the hope that it will be useful,
00013     but WITHOUT ANY WARRANTY; without even the implied warranty of
00014     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015     GNU Lesser General Public License for more details.
00016 
00017     You should have received a copy of the GNU Lesser General Public License
00018     along with this program.  If not, see <http://www.gnu.org/licenses/>.
00019 */
00020 
00021 #ifndef __ZMQ_HPP_INCLUDED__
00022 #define __ZMQ_HPP_INCLUDED__
00023 
00024 #include "zmq.h"
00025 
00026 #include <cassert>
00027 #include <cstring>
00028 #include <exception>
00029 #include <mutex>
00030 #include <string>
00031 #include <iostream>
00032 
00033 #include <thread>
00034 #include <set>
00035 #include "jml/utils/exc_assert.h"
00036 
00037 namespace zmq
00038 {
00039 
00040     typedef zmq_free_fn free_fn;
00041     typedef zmq_pollitem_t pollitem_t;
00042 
00043     class socket_t;
00044 
00045     class error_t : public std::exception
00046     {
00047     public:
00048 
00049         error_t () : errnum (zmq_errno ()) {}
00050 
00051         virtual const char *what () const throw ()
00052         {
00053             return zmq_strerror (errnum);
00054         }
00055 
00056         int num () const
00057         {
00058             return errnum;
00059         }
00060 
00061     private:
00062 
00063         int errnum;
00064     };
00065 
00066     inline int poll (zmq_pollitem_t *items_, int nitems_, long timeout_ = -1)
00067     {
00068         int rc = zmq_poll (items_, nitems_, timeout_);
00069         if (rc < 0)
00070             throw error_t ();
00071         return rc;
00072     }
00073 
00074     inline void device (int device_, void * insocket_, void* outsocket_)
00075     {
00076         int rc = zmq_device (device_, insocket_, outsocket_);
00077         if (rc != 0)
00078             throw error_t ();
00079     }
00080 
00081     class message_t : private zmq_msg_t
00082     {
00083         friend class socket_t;
00084 
00085     public:
00086 
00087         inline message_t ()
00088         {
00089             int rc = zmq_msg_init (this);
00090             if (rc != 0)
00091                 throw error_t ();
00092         }
00093 
00094         inline message_t (size_t size_)
00095         {
00096             int rc = zmq_msg_init_size (this, size_);
00097             if (rc != 0)
00098                 throw error_t ();
00099         }
00100 
00101         inline message_t (void *data_, size_t size_, free_fn *ffn_,
00102             void *hint_ = NULL)
00103         {
00104             int rc = zmq_msg_init_data (this, data_, size_, ffn_, hint_);
00105             if (rc != 0)
00106                 throw error_t ();
00107         }
00108 
00109         inline message_t (const std::string & s)
00110         {
00111             int rc = zmq_msg_init_size (this, s.size());
00112             if (rc != 0)
00113                 throw error_t ();
00114             std::copy(s.begin(), s.end(), (char *)data());
00115         }
00116         
00117         inline message_t (const message_t & other) noexcept
00118         {
00119             int rc = zmq_msg_init (this);
00120             if (rc == 0)
00121                 rc = zmq_msg_copy(this, const_cast<message_t *>(&other));
00122             if (rc != 0)
00123                 throw error_t ();
00124         }
00125 
00126         inline message_t (message_t && other) noexcept
00127         {
00128             int rc = zmq_msg_init (this);
00129             if (rc == 0)
00130                 rc = zmq_msg_move(this, &other);
00131             if (rc != 0)
00132                 throw error_t ();
00133         }
00134 
00135         inline message_t & operator = (const message_t & other) noexcept
00136         {
00137             int rc = zmq_msg_copy(this, const_cast<message_t *>(&other));
00138             if (rc != 0)
00139                 throw error_t ();
00140             return *this;
00141         }
00142 
00143         inline message_t & operator = (message_t && other) noexcept
00144         {
00145             int rc = zmq_msg_move(this, &other);
00146             if (rc != 0)
00147                 throw error_t ();
00148             return *this;
00149         }
00150 
00151         inline ~message_t ()
00152         {
00153             int rc = zmq_msg_close (this);
00154             assert (rc == 0);
00155             (void)rc;
00156         }
00157 
00158         inline void rebuild ()
00159         {
00160             int rc = zmq_msg_close (this);
00161             if (rc != 0)
00162                 throw error_t ();
00163             rc = zmq_msg_init (this);
00164             if (rc != 0)
00165                 throw error_t ();
00166         }
00167 
00168         inline void rebuild (size_t size_)
00169         {
00170             int rc = zmq_msg_close (this);
00171             if (rc != 0)
00172                 throw error_t ();
00173             rc = zmq_msg_init_size (this, size_);
00174             if (rc != 0)
00175                 throw error_t ();
00176         }
00177 
00178         inline void rebuild (void *data_, size_t size_, free_fn *ffn_,
00179             void *hint_ = NULL)
00180         {
00181             int rc = zmq_msg_close (this);
00182             if (rc != 0)
00183                 throw error_t ();
00184             rc = zmq_msg_init_data (this, data_, size_, ffn_, hint_);
00185             if (rc != 0)
00186                 throw error_t ();
00187         }
00188 
00189         inline void move (message_t *msg_)
00190         {
00191             int rc = zmq_msg_move (this, (zmq_msg_t*) msg_);
00192             if (rc != 0)
00193                 throw error_t ();
00194         }
00195 
00196         inline void copy (message_t *msg_)
00197         {
00198             int rc = zmq_msg_copy (this, (zmq_msg_t*) msg_);
00199             if (rc != 0)
00200                 throw error_t ();
00201         }
00202 
00203         inline char *data ()
00204         {
00205             return (char *)zmq_msg_data (this);
00206         }
00207 
00208         inline const char *data () const
00209         {
00210             return (const char *)zmq_msg_data (const_cast<message_t *>(this));
00211         }
00212 
00213         inline size_t size () const
00214         {
00215             return zmq_msg_size (const_cast<message_t *>(this));
00216         }
00217 
00218         std::string toString() const
00219         {
00220             return std::string(data(), data() + size());
00221         }
00222 
00223     private:
00224 
00225         //  Disable implicit message copying, so that users won't use shared
00226         //  messages (less efficient) without being aware of the fact.
00227         //message_t (const message_t&);
00228         //void operator = (const message_t&);
00229     };
00230 
00231     class context_t
00232     {
00233         friend class socket_t;
00234 
00235     public:
00236 
00237         inline context_t (int io_threads_)
00238         {
00239             ptr = zmq_init (io_threads_);
00240             if (ptr == NULL)
00241                 throw error_t ();
00242         }
00243 
00244         inline ~context_t ()
00245         {
00246             // If this throws, it means that you forgot to destroy a zmq_socket_t object
00247             // created with this context before you closed the context.
00248             ExcAssertEqual(sockets.size(), 0);
00249 
00250             int rc = zmq_term (ptr);
00251             assert (rc == 0);
00252             (void)rc;
00253         }
00254 
00255         //  Be careful with this, it's probably only useful for
00256         //  using the C api together with an existing C++ api.
00257         //  Normally you should never need to use this.
00258         inline operator void* ()
00259         {
00260             return ptr;
00261         }
00262         
00263         void registerSocket(socket_t * sock)
00264         {
00265             std::unique_lock<std::mutex> guard(lock);
00266             ExcAssertEqual(sockets.count(sock), 0);
00267             sockets.insert(sock);
00268         }
00269 
00270         void unregisterSocket(socket_t * sock)
00271         {
00272             std::unique_lock<std::mutex> guard(lock);
00273             ExcAssertEqual(sockets.count(sock), 1);
00274             sockets.erase(sock);
00275         }
00276 
00277     private:
00278 
00279         void *ptr;
00280 
00281         context_t (const context_t&);
00282         void operator = (const context_t&);
00283 
00284         std::mutex lock;
00285         std::set<socket_t *> sockets;
00286     };
00287 
00288     class socket_t
00289     {
00290     public:
00291         context_t * context_;
00292 
00293         inline socket_t (context_t &context_, int type_)
00294             : context_(&context_)
00295         {
00296             ptr = zmq_socket (context_.ptr, type_);
00297             if (ptr == NULL)
00298                 throw error_t ();
00299 
00300             context_.registerSocket(this);
00301         }
00302 
00303         inline ~socket_t ()
00304         {
00305             int linger = 0;
00306             setsockopt (ZMQ_LINGER, &linger, sizeof(linger));
00307             int rc = zmq_close (ptr);
00308             assert (rc == 0);
00309             (void)rc;
00310 
00311             context_->unregisterSocket(this);
00312         }
00313 
00314         inline operator void* ()
00315         {
00316             return ptr;
00317         }
00318 
00319         inline void setsockopt (int option_, const void *optval_,
00320             size_t optvallen_)
00321         {
00322             int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_);
00323             if (rc != 0)
00324                 throw error_t ();
00325         }
00326 
00327         inline void getsockopt (int option_, void *optval_,
00328             size_t *optvallen_)
00329         {
00330             int rc = zmq_getsockopt (ptr, option_, optval_, optvallen_);
00331             if (rc != 0)
00332                 throw error_t ();
00333         }
00334 
00335         inline void bind (const std::string & addr_)
00336         {
00337             int rc = zmq_bind (ptr, addr_.c_str());
00338             if (rc != 0)
00339                 throw error_t ();
00340         }
00341 
00342         inline void unbind (const std::string & addr_)
00343         {
00344             int rc = zmq_unbind (ptr, addr_.c_str());
00345             if (rc != 0)
00346                 throw error_t ();
00347         }
00348 
00349         inline int tryUnbind (const std::string & addr_)
00350         {
00351             int rc = zmq_unbind (ptr, addr_.c_str());
00352             return rc;
00353         }
00354 
00355         inline void connect (const std::string & addr_)
00356         {
00357             int rc = zmq_connect (ptr, addr_.c_str());
00358             if (rc != 0)
00359                 throw error_t ();
00360         }
00361 
00362         inline void disconnect (const std::string & addr_)
00363         {
00364             int rc = zmq_disconnect (ptr, addr_.c_str());
00365             if (rc != 0)
00366                 throw error_t ();
00367         }
00368 
00369         inline int tryDisconnect (const std::string & addr_)
00370         {
00371             return zmq_disconnect (ptr, addr_.c_str());
00372         }
00373 
00374         inline bool send (message_t &msg_, int flags_ = 0)
00375         {
00376             int rc = zmq_sendmsg (ptr, &msg_, flags_);
00377             if (rc >= 0)
00378                 return true;
00379             if (rc == -1 && zmq_errno () == EAGAIN)
00380                 return false;
00381             throw error_t ();
00382         }
00383 
00384         inline bool send (message_t && msg_, int flags_ = 0)
00385         {
00386             int rc = zmq_sendmsg (ptr, &msg_, flags_);
00387             if (rc >= 0)
00388                 return true;
00389             if (rc == -1 && zmq_errno () == EAGAIN)
00390                 return false;
00391             throw error_t ();
00392         }
00393 
00394         inline bool recv (message_t *msg_, int flags_ = 0)
00395         {
00396             int rc = zmq_recvmsg (ptr, msg_, flags_);
00397             if (rc >= 0)
00398                 return true;
00399             if (rc == -1 && zmq_errno () == EAGAIN)
00400                 return false;
00401             throw error_t ();
00402         }
00403 
00404     private:
00405 
00406         void *ptr;
00407 
00408         socket_t (const socket_t&);
00409         socket_t (socket_t&&);
00410         void operator = (const socket_t&);
00411         void operator = (socket_t&&);
00412     };
00413 
00414 }
00415 
00416 #endif
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator