RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* socket_per_thread.h -*- C++ -*- 00002 Jeremy Barnes, 14 April 2011 00003 Copyright (c) 2011 Datacratic. All rights reserved. 00004 00005 ZeroMQ Socket per thread. 00006 */ 00007 00008 #ifndef __zmq__socket_per_thread_h__ 00009 #define __zmq__socket_per_thread_h__ 00010 00011 00012 #include "zmq.hpp" 00013 #include <boost/thread/tss.hpp> 00014 #include <boost/noncopyable.hpp> 00015 #include <boost/thread/locks.hpp> 00016 #include "jml/compiler/compiler.h" 00017 #include "jml/arch/spinlock.h" 00018 #include "jml/arch/exception.h" 00019 #include <set> 00020 00021 00022 namespace Datacratic { 00023 00024 00025 /*****************************************************************************/ 00026 /* SOCKET PER THREAD */ 00027 /*****************************************************************************/ 00028 00038 struct SocketPerThread : boost::noncopyable { 00039 00041 SocketPerThread(); 00042 00045 SocketPerThread(zmq::context_t & context, 00046 int type, 00047 const std::string & uri, 00048 bool allowForceClose = true); 00049 00050 ~SocketPerThread(); 00051 00054 void init(zmq::context_t & context, 00055 int type, 00056 const std::string & uri, 00057 bool allowForceClose = true); 00058 00059 void shutdown(); 00060 00061 zmq::context_t * context; 00062 int type; 00063 std::string uri; 00064 bool allowForceClose; 00065 mutable int numOpen; 00066 00068 inline zmq::socket_t & operator () () const 00069 { 00070 if (state != READY) 00071 throw ML::Exception("socket not ready: %d", state); 00072 00073 if (JML_UNLIKELY(!entries.get())) { 00074 initForThisThread(); 00075 } 00076 00077 return *entries->sock; 00078 } 00079 00081 void initForThisThread() const; 00082 00084 void cleanupThisThread(); 00085 00090 static void cleanupAllForThread(); 00091 00092 private: 00093 enum { 00094 NOTINITIALIZED = 12321, 00095 READY = 349244, 00096 FINISHED = 293845 00097 }; 00098 int state; 00099 00101 struct Entry { 00102 zmq::socket_t * sock; 00103 SocketPerThread * owner; 00104 }; 00105 00107 std::set<Entry *> allThreads; 00108 00110 typedef ML::Spinlock Lock; 00111 typedef boost::unique_lock<Lock> Guard; 00112 mutable Lock allThreadsLock; 00113 00115 void removeThreadEntry(Entry * entry); 00116 00118 void addThreadEntry(Entry * entry); 00119 00121 static void onFreeEntry(Entry * entry); 00122 00124 mutable boost::thread_specific_ptr<Entry> entries; 00125 00129 static boost::thread_specific_ptr<std::set<SocketPerThread *> > allForThread; 00130 }; 00131 00132 } // namespace Datacratic 00133 00134 00135 00136 #endif /* __zmq__socket_per_thread_h__ */ 00137