![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* socket_per_thread.cc 00002 Jeremy Barnes, 5 March 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 One socket per thread. 00006 */ 00007 00008 #include "soa/service/socket_per_thread.h" 00009 #include "jml/arch/format.h" 00010 #include "ace/OS_NS_Thread.h" 00011 #include "jml/arch/backtrace.h" 00012 #include "jml/arch/spinlock.h" 00013 #include "jml/arch/atomic_ops.h" 00014 #include "jml/arch/exception.h" 00015 #include "jml/arch/timers.h" 00016 #include "jml/utils/exc_assert.h" 00017 #include "soa/service/zmq_utils.h" 00018 00019 00020 using namespace std; 00021 using namespace ML; 00022 00023 00024 namespace Datacratic { 00025 00026 /*****************************************************************************/ 00027 /* SOCKET PER THREAD */ 00028 /*****************************************************************************/ 00029 00030 SocketPerThread:: 00031 SocketPerThread() 00032 : context(0), type(0), numOpen(0), state(NOTINITIALIZED), 00033 entries(onFreeEntry) 00034 { 00035 } 00036 00037 SocketPerThread:: 00038 SocketPerThread(zmq::context_t & context, 00039 int type, 00040 const std::string & uri, 00041 bool allowForceClose) 00042 : context(&context), type(type), uri(uri), 00043 allowForceClose(allowForceClose), numOpen(0), 00044 state(READY), entries(onFreeEntry) 00045 { 00046 //std::cerr << "finished init of socket " << uri << std::endl; 00047 } 00048 00049 SocketPerThread:: 00050 ~SocketPerThread() 00051 { 00052 shutdown(); 00053 } 00054 00055 void 00056 SocketPerThread:: 00057 init(zmq::context_t & context, 00058 int type, 00059 const std::string & uri, 00060 bool allowForceClose) 00061 { 00062 if (this->context) 00063 throw ML::Exception("attempt to double initialize a SocketPerThread"); 00064 this->context = &context; 00065 this->type = type; 00066 this->uri = uri; 00067 this->allowForceClose = allowForceClose; 00068 00069 state = READY; 00070 //using namespace std; 00071 //cerr << "initializing SocketPerThread with uri " << uri << endl; 00072 } 00073 00074 void 00075 SocketPerThread:: 00076 shutdown() 00077 { 00078 state = FINISHED; 00079 00080 entries.reset(); 00081 context = 0; 00082 //using namespace std; 00083 //cerr << "destroying SocketPerThread with uri " << uri << " and " 00084 // << numOpen << " open entries" << endl; 00085 00086 if (numOpen > 0) { 00087 if (!allowForceClose) { 00088 throw ML::Exception("attempt to destroy SocketPerThread with %d open entries", 00089 numOpen); 00090 } 00091 00092 while (!allThreads.empty()) { 00093 auto it = allThreads.begin(); 00094 onFreeEntry(*it); 00095 } 00096 00097 ExcAssertEqual(numOpen, 0); 00098 } 00099 } 00100 00101 void 00102 SocketPerThread:: 00103 initForThisThread() const 00104 { 00105 if (entries.get()) 00106 return; 00107 00108 //cerr << "initializing zeromq socket for this thread to connect to " 00109 // << uri << endl; 00110 00111 if (!context) 00112 throw ML::Exception("attempt to use a SocketPerThread " 00113 "without initializing"); 00114 00115 auto mThis = const_cast<SocketPerThread *>(this); 00116 00117 std::auto_ptr<Entry> newEntry(new Entry()); 00118 newEntry->owner = mThis; 00119 00120 std::auto_ptr<zmq::socket_t> newPtr 00121 (new zmq::socket_t(*context, type)); 00122 setIdentity(*newPtr, ML::format("thr%lld", 00123 (long long)ACE_OS::thr_self())); 00124 newPtr->connect(uri.c_str()); 00125 00126 newEntry->sock = newPtr.release(); 00127 //if (!allForThread.get()) 00128 // allForThread.reset(new std::set<SocketPerThread *>()); 00129 //allForThread->insert(mThis); 00130 entries.reset(newEntry.release()); 00131 00132 mThis->addThreadEntry(entries.get()); 00133 ML::atomic_inc(numOpen); 00134 00135 // wait for ZMQ when connecting... 00136 ML::sleep(0.1); 00137 } 00138 00139 void 00140 SocketPerThread:: 00141 onFreeEntry(Entry * entry) 00142 { 00143 using namespace std; 00144 //cerr << "onFreeEntry " << entry << endl; 00145 //cerr << "closing zmq socket" << entry->sock << " with owner " 00146 // << entry->owner << endl; 00147 delete entry->sock; 00148 //cerr << "erasing" << endl; 00149 //allForThread->erase(entry->owner); 00150 //cerr << "unowning" << endl; 00151 ML::atomic_dec(entry->owner->numOpen); 00152 00153 entry->owner->removeThreadEntry(entry); 00154 00155 delete entry; 00156 00157 //Datacratic::close(*sock); 00158 //ML::backtrace(); 00159 //cerr << endl << endl; 00160 } 00161 00162 void 00163 SocketPerThread:: 00164 cleanupThisThread() 00165 { 00166 //cerr << "cleaning up socket " << entries->sock << endl; 00167 entries.reset(); 00168 } 00169 00170 void 00171 SocketPerThread:: 00172 cleanupAllForThread() 00173 { 00174 if (!allForThread.get()) return; 00175 //cerr << "cleaning up " << allForThread->size() << " sockets" << endl; 00176 00177 for (auto it = allForThread->begin(); it != allForThread->end(); 00178 it = allForThread->begin()) 00179 (*it)->cleanupThisThread(); 00180 00181 //cerr << "we now have " << allForThread->size() << " sockets left" << endl; 00182 } 00183 00184 void 00185 SocketPerThread:: 00186 removeThreadEntry(Entry * entry) 00187 { 00188 Guard guard(allThreadsLock); 00189 allThreads.erase(entry); 00190 } 00191 00192 void 00193 SocketPerThread:: 00194 addThreadEntry(Entry * entry) 00195 { 00196 Guard guard(allThreadsLock); 00197 allThreads.insert(entry); 00198 } 00199 00200 boost::thread_specific_ptr<std::set<SocketPerThread *> > 00201 SocketPerThread::allForThread; 00202 00203 } // namespace Datacratic