RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/socket_per_thread.h
00001 /* socket_per_thread.h                                             -*- C++ -*-
00002    Jeremy Barnes, 14 April 2011
00003    Copyright (c) 2011 Datacratic.  All rights reserved.
00004 
00005    ZeroMQ Socket per thread.
00006 */
00007 
00008 #ifndef __zmq__socket_per_thread_h__
00009 #define __zmq__socket_per_thread_h__
00010 
00011 
00012 #include "zmq.hpp"
00013 #include <boost/thread/tss.hpp>
00014 #include <boost/noncopyable.hpp>
00015 #include <boost/thread/locks.hpp>
00016 #include "jml/compiler/compiler.h"
00017 #include "jml/arch/spinlock.h"
00018 #include "jml/arch/exception.h"
00019 #include <set>
00020 
00021 
00022 namespace Datacratic {
00023 
00024 
00025 /*****************************************************************************/
00026 /* SOCKET PER THREAD                                                         */
00027 /*****************************************************************************/
00028 
00038 struct SocketPerThread : boost::noncopyable {
00039 
00041     SocketPerThread();
00042 
00045     SocketPerThread(zmq::context_t & context,
00046                     int type,
00047                     const std::string & uri,
00048                     bool allowForceClose = true);
00049 
00050     ~SocketPerThread();
00051 
00054     void init(zmq::context_t & context,
00055               int type,
00056               const std::string & uri,
00057               bool allowForceClose = true);
00058 
00059     void shutdown();
00060 
00061     zmq::context_t * context;   
00062     int type;                   
00063     std::string uri;            
00064     bool allowForceClose;       
00065     mutable int numOpen;        
00066 
00068     inline zmq::socket_t & operator () () const
00069     {
00070         if (state != READY)
00071             throw ML::Exception("socket not ready: %d", state);
00072 
00073         if (JML_UNLIKELY(!entries.get())) {
00074             initForThisThread();
00075         }
00076 
00077         return *entries->sock;
00078     }
00079 
00081     void initForThisThread() const;
00082     
00084     void cleanupThisThread();
00085 
00090     static void cleanupAllForThread();
00091     
00092 private:
00093     enum {
00094         NOTINITIALIZED = 12321,
00095         READY = 349244,
00096         FINISHED = 293845
00097     };
00098     int state;
00099 
00101     struct Entry {
00102         zmq::socket_t * sock;
00103         SocketPerThread * owner;
00104     };
00105 
00107     std::set<Entry *> allThreads;
00108 
00110     typedef ML::Spinlock Lock;
00111     typedef boost::unique_lock<Lock> Guard;
00112     mutable Lock allThreadsLock;
00113 
00115     void removeThreadEntry(Entry * entry);
00116 
00118     void addThreadEntry(Entry * entry);
00119 
00121     static void onFreeEntry(Entry * entry);
00122     
00124     mutable boost::thread_specific_ptr<Entry> entries;
00125 
00129     static boost::thread_specific_ptr<std::set<SocketPerThread *> > allForThread;
00130 };
00131 
00132 } // namespace Datacratic
00133 
00134 
00135 
00136 #endif /* __zmq__socket_per_thread_h__ */
00137 
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator