RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/socket_per_thread.cc
00001 /* socket_per_thread.cc
00002    Jeremy Barnes, 5 March 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005    One socket per thread.
00006 */
00007 
00008 #include "soa/service/socket_per_thread.h"
00009 #include "jml/arch/format.h"
00010 #include "ace/OS_NS_Thread.h"
00011 #include "jml/arch/backtrace.h"
00012 #include "jml/arch/spinlock.h"
00013 #include "jml/arch/atomic_ops.h"
00014 #include "jml/arch/exception.h"
00015 #include "jml/arch/timers.h"
00016 #include "jml/utils/exc_assert.h"
00017 #include "soa/service/zmq_utils.h"
00018 
00019 
00020 using namespace std;
00021 using namespace ML;
00022 
00023 
00024 namespace Datacratic {
00025 
00026 /*****************************************************************************/
00027 /* SOCKET PER THREAD                                                         */
00028 /*****************************************************************************/
00029 
00030 SocketPerThread::
00031 SocketPerThread()
00032     : context(0), type(0), numOpen(0), state(NOTINITIALIZED),
00033       entries(onFreeEntry)
00034 {
00035 }
00036 
00037 SocketPerThread::
00038 SocketPerThread(zmq::context_t & context,
00039                 int type,
00040                 const std::string & uri,
00041                 bool allowForceClose)
00042     : context(&context), type(type), uri(uri),
00043       allowForceClose(allowForceClose), numOpen(0),
00044       state(READY), entries(onFreeEntry)
00045 {
00046     //std::cerr << "finished init of socket " << uri << std::endl;
00047 }
00048 
00049 SocketPerThread::
00050 ~SocketPerThread()
00051 {
00052     shutdown();
00053 }
00054 
00055 void
00056 SocketPerThread::
00057 init(zmq::context_t & context,
00058      int type,
00059      const std::string & uri,
00060      bool allowForceClose)
00061 {
00062     if (this->context)
00063         throw ML::Exception("attempt to double initialize a SocketPerThread");
00064     this->context = &context;
00065     this->type = type;
00066     this->uri = uri;
00067     this->allowForceClose = allowForceClose;
00068 
00069     state = READY;
00070     //using namespace std;
00071     //cerr << "initializing SocketPerThread with uri " << uri << endl;
00072 }
00073 
00074 void
00075 SocketPerThread::
00076 shutdown()
00077 {
00078     state = FINISHED;
00079 
00080     entries.reset();
00081     context = 0;
00082     //using namespace std;
00083     //cerr << "destroying SocketPerThread with uri " << uri << " and "
00084     //     << numOpen << " open entries" << endl;
00085 
00086     if (numOpen > 0) {
00087         if (!allowForceClose) {
00088             throw ML::Exception("attempt to destroy SocketPerThread with %d open entries",
00089                                 numOpen);
00090         }
00091 
00092         while (!allThreads.empty()) {
00093             auto it = allThreads.begin();
00094             onFreeEntry(*it);
00095         }
00096 
00097         ExcAssertEqual(numOpen, 0);
00098     }
00099 }
00100 
00101 void
00102 SocketPerThread::
00103 initForThisThread() const
00104 {
00105     if (entries.get())
00106         return;
00107 
00108     //cerr << "initializing zeromq socket for this thread to connect to "
00109     //     << uri << endl;
00110 
00111     if (!context)
00112         throw ML::Exception("attempt to use a SocketPerThread "
00113                             "without initializing");
00114 
00115     auto mThis = const_cast<SocketPerThread *>(this);
00116 
00117     std::auto_ptr<Entry> newEntry(new Entry());
00118     newEntry->owner = mThis;
00119             
00120     std::auto_ptr<zmq::socket_t> newPtr
00121         (new zmq::socket_t(*context, type));
00122     setIdentity(*newPtr, ML::format("thr%lld",
00123                                     (long long)ACE_OS::thr_self()));
00124     newPtr->connect(uri.c_str());
00125 
00126     newEntry->sock = newPtr.release();
00127     //if (!allForThread.get())
00128     //    allForThread.reset(new std::set<SocketPerThread *>());
00129     //allForThread->insert(mThis);
00130     entries.reset(newEntry.release());
00131 
00132     mThis->addThreadEntry(entries.get());
00133     ML::atomic_inc(numOpen);
00134 
00135     // wait for ZMQ when connecting...
00136     ML::sleep(0.1);
00137 }
00138     
00139 void
00140 SocketPerThread::
00141 onFreeEntry(Entry * entry)
00142 {
00143     using namespace std;
00144     //cerr << "onFreeEntry " << entry << endl;
00145     //cerr << "closing zmq socket" << entry->sock << " with owner "
00146     //     << entry->owner << endl;
00147     delete entry->sock;
00148     //cerr << "erasing" << endl;
00149     //allForThread->erase(entry->owner);
00150     //cerr << "unowning" << endl;
00151     ML::atomic_dec(entry->owner->numOpen);
00152 
00153     entry->owner->removeThreadEntry(entry);
00154 
00155     delete entry;
00156 
00157     //Datacratic::close(*sock);
00158     //ML::backtrace();
00159     //cerr << endl << endl;
00160 }
00161 
00162 void
00163 SocketPerThread::
00164 cleanupThisThread()
00165 {
00166     //cerr << "cleaning up socket " << entries->sock << endl;
00167     entries.reset();
00168 }
00169 
00170 void
00171 SocketPerThread::
00172 cleanupAllForThread()
00173 {
00174     if (!allForThread.get()) return;
00175     //cerr << "cleaning up " << allForThread->size() << " sockets" << endl;
00176 
00177     for (auto it = allForThread->begin();  it != allForThread->end();
00178          it = allForThread->begin())
00179         (*it)->cleanupThisThread();
00180 
00181     //cerr << "we now have " << allForThread->size() << " sockets left" << endl;
00182 }
00183 
00184 void
00185 SocketPerThread::
00186 removeThreadEntry(Entry * entry)
00187 {
00188     Guard guard(allThreadsLock);
00189     allThreads.erase(entry);
00190 }
00191 
00192 void
00193 SocketPerThread::
00194 addThreadEntry(Entry * entry)
00195 {
00196     Guard guard(allThreadsLock);
00197     allThreads.insert(entry);
00198 }
00199 
00200 boost::thread_specific_ptr<std::set<SocketPerThread *> >
00201 SocketPerThread::allForThread;
00202 
00203 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator