RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/zmq_named_pub_sub.h
00001 /* zmq_named_pub_sub.h                                             -*- C++ -*-
00002    Jeremy Barnes, 8 January 2013
00003    Copyright (c) 2013 Datacratic Inc.  All rights reserved.
00004    
00005    Named publish/subscribe endpoint.
00006 */
00007 
00008 #pragma once
00009 
00010 #include "zmq_endpoint.h"
00011 #include "typed_message_channel.h"
00012 #include <sys/utsname.h>
00013 #include "jml/arch/backtrace.h"
00014 
00015 namespace Datacratic {
00016 
00017 
00018 
00019 /*****************************************************************************/
00020 /* ZMQ NAMED PUBLISHER                                                       */
00021 /*****************************************************************************/
00022 
00025 struct ZmqNamedPublisher: public MessageLoop {
00026 
00027     ZmqNamedPublisher(std::shared_ptr<zmq::context_t> context,
00028                       int messageBufferSize = 10000)
00029         : publishEndpoint(context),
00030           publishQueue(messageBufferSize)
00031     {
00032     }
00033 
00034     virtual ~ZmqNamedPublisher()
00035     {
00036         shutdown();
00037     }
00038 
00039     void init(std::shared_ptr<ConfigurationService> config,
00040               const std::string & endpointName,
00041               const std::string & identity = "")
00042     {
00043         using namespace std;
00044 
00045         // Initialize the publisher endpoint
00046         publishEndpoint.init(config, ZMQ_XPUB, endpointName);
00047 
00048         // Called when we queue up a message to be published
00049         publishQueue.onEvent = [=] (std::vector<zmq::message_t> && message)
00050             {
00051                 using namespace std;
00052                 //cerr << "popped message to publish" << endl;
00053                 publishEndpoint.sendMessage(std::move(message));
00054             };
00055 
00056         // Called when there is a new subscription.
00057         // The first byte is 1 (subscription) or 0 (unsubscription).
00058         // The rest of the message is the filter for the subscription
00059         auto doPublishMessage = [=] (const std::vector<std::string> & msg)
00060             {
00061 #if 0
00062                 cerr << "got publish subscription" << msg << endl;
00063                 cerr << "msg.size() = " << msg.size() << endl;
00064                 cerr << "msg[0].size() = " << msg[0].size() << endl;
00065                 cerr << "msg[0][0] = " << (int)msg[0][0] << endl;
00066 #endif
00067             };
00068 
00069         publishEndpoint.messageHandler = doPublishMessage;
00070 
00071         // Hook everything into the message loop
00072         addSource("ZmqNamedPublisher::publishEndpoint", publishEndpoint);
00073         addSource("ZmqNamedPublisher::publishQueue", publishQueue);
00074 
00075     }
00076 
00077     std::string bindTcp(PortRange const & portRange = PortRange(), std::string host = "")
00078     {
00079         return publishEndpoint.bindTcp(portRange, host);
00080     }
00081 
00082     void shutdown()
00083     {
00084         MessageLoop::shutdown();
00085         publishEndpoint.shutdown();
00086         //publishEndpointMonitor.shutdown();
00087     }
00088 
00089     //std::vector<std::string> getSubscribers()
00090     //{
00091     //}
00092 
00093     template<typename Head, typename... Tail>
00094     void encodeAll(std::vector<zmq::message_t> & messages,
00095                    Head head,
00096                    Tail&&... tail)
00097     {
00098         messages.emplace_back(std::move(encodeMessage(head)));
00099         encodeAll(messages, std::forward<Tail>(tail)...);
00100     }
00101 
00102     // Vectors treated specially... they are copied
00103     template<typename... Tail>
00104     void encodeAll(std::vector<zmq::message_t> & messages,
00105                    const std::vector<std::string> & head,
00106                    Tail&&... tail)
00107     {
00108         for (auto & m: head)
00109             messages.emplace_back(std::move(encodeMessage(m)));
00110         encodeAll(messages, std::forward<Tail>(tail)...);
00111     }
00112 
00113     void encodeAll(std::vector<zmq::message_t> & messages)
00114     {
00115     }
00116 
00117     template<typename... Args>
00118     void publish(const std::string & channel, Args&&... args)
00119     {
00120         std::vector<zmq::message_t> messages;
00121         messages.reserve(sizeof...(Args) + 1);
00122         
00123         encodeAll(messages, channel,
00124                   std::forward<Args>(args)...);
00125         publishQueue.push(messages);
00126     }
00127 
00128 private:
00130     ZmqNamedEndpoint publishEndpoint;
00131 
00133     TypedMessageSink<std::vector<zmq::message_t> > publishQueue;
00134 };
00135 
00136 
00137 /*****************************************************************************/
00138 /* ENDPOINT CONNECTOR                                                        */
00139 /*****************************************************************************/
00140 
00173 struct EndpointConnector : public MessageLoop {
00174 
00175     EndpointConnector()
00176         : changes(32)
00177     {
00178     }
00179 
00180     void init(std::shared_ptr<ConfigurationService> config)
00181     {
00182         this->config = config;
00183 
00184         changes.onEvent
00185             = std::bind(&EndpointConnector::handleEndpointChange,
00186                         this,
00187                         std::placeholders::_1);
00188 
00189         addSource("EndpointConnector::changes", changes);
00190     }
00191 
00192     void watchEndpoint(const std::string & endpointPath)
00193     {
00194         std::unique_lock<Lock> guard(lock);
00195 
00196         using namespace std;
00197         //cerr << "watching endpoint " << endpointPath << endl;
00198 
00199         auto & entry = endpoints[endpointPath];
00200 
00201         if (!entry.watch) {
00202             // First time that we watch this service; set it up
00203 
00204             //cerr << "=============== initializing watch for " << endpointPath << endl;
00205 
00206             entry.watch.init([=] (const std::string & path,
00207                                   ConfigurationService::ChangeType change)
00208                              {
00209                                  using namespace std;
00210                                  //cerr << "endpoint changed path " << path << " " << this << endl;
00211 
00212                                  {
00213                                      std::unique_lock<Lock> guard(lock);
00214                                      if (endpoints.count(endpointPath)) {
00215                                          auto & entry = endpoints[endpointPath];
00216                                          entry.watchIsSet = false;
00217                                      }
00218                                  }
00219                                  
00220                                  changes.push(endpointPath);
00221                              });
00222 
00223             changes.push(endpointPath);
00224         }
00225     }
00226     
00230     void unwatchEndpoint(const std::string & endpoint,
00231                          bool forceDisconnect)
00232     {
00233         throw ML::Exception("unwatchEndpoint not done");
00234     }
00235 
00236     std::function<bool (const std::string & endpointPath,
00237                         const std::string & entry,
00238                         const Json::Value & params)> connectHandler;
00239     
00250     virtual bool connect(const std::string & endpointPath,
00251                          const std::string & entryName,
00252                          const Json::Value & params)
00253     {
00254         if (connectHandler)
00255             return connectHandler(endpointPath, entryName, params);
00256         throw ML::Exception("no connect override");
00257     }
00258 
00260     void handleDisconnection(const std::string & endpointPath,
00261                              const std::string & entryName)
00262     {
00263         std::unique_lock<Lock> guard(lock);
00264 
00265         if (!endpoints.count(endpointPath))
00266             return;
00267 
00268         auto & entry = endpoints[endpointPath];
00269 
00270         ExcAssertEqual(entry.connectedTo, entryName);
00271         entry.connectedTo = "";
00272 
00273         // And we attempt to reconnect
00274         // Note that we can't push endpointPath on changes
00275         changes.push(endpointPath);
00276     }
00277 
00278 private:
00279     typedef std::mutex Lock;
00280     mutable Lock lock;
00281 
00285     TypedMessageSink<std::string> changes;
00286 
00287     struct Entry {
00288         Entry()
00289             : watchIsSet(false)
00290         {
00291         }
00292 
00293         ConfigurationService::Watch watch;
00294         std::string connectedTo;
00295         bool watchIsSet;
00296     };
00297 
00298     std::map<std::string, Entry> endpoints;
00299 
00300     std::shared_ptr<ConfigurationService> config;
00301 
00302     void handleEndpointChange(const std::string & endpointPath)
00303     {
00304         using namespace std;
00305 
00306         //cerr << "handleEndpointChange " << endpointPath << endl;
00307 
00308         std::unique_lock<Lock> guard(lock);
00309 
00310         if (!endpoints.count(endpointPath))
00311             return;
00312 
00313         auto & entry = endpoints[endpointPath];
00314 
00315         //cerr << "handling service class change for " << endpointPath << endl;
00316 
00317         vector<string> children;
00318         if (entry.watchIsSet)
00319             children = config->getChildren(endpointPath);
00320         else children = config->getChildren(endpointPath, entry.watch);
00321         entry.watchIsSet = true;
00322 
00323         //cerr << "children = " << children << endl;
00324 
00325         // If we're connected, look for a change in the endpoints
00326         if (!entry.connectedTo.empty()) {
00327 
00328             // Does our connected node still exist?
00329             if (std::find(children.begin(), children.end(), entry.connectedTo)
00330                 == children.end()) {
00331                 // Node disappeared; we need to disconnect
00332                 guard.unlock();
00333                 handleDisconnection(endpointPath, entry.connectedTo);
00334                 return;  // handleDisconnection will call into us recursively
00335             }
00336             else {
00337                 // Node is still there
00338                 // TODO: check for change in value
00339                 return;
00340             }
00341         }
00342 
00343         guard.unlock();
00344 
00345         // If we got here, we're not connected
00346         for (auto & c: children) {
00347             Json::Value cfg = config->getJson(endpointPath + "/" + c);
00348             if (connect(endpointPath, c, cfg)) {
00349                 notifyConnectionStatus(endpointPath, c, true);
00350                 return;
00351             }
00352         }
00353 
00354         cerr << "warning: could not connect to " << endpointPath << " immediately"
00355              << endl;
00356     }
00357 
00358     void notifyConnectionStatus(const std::string & endpointPath,
00359                                 const std::string & entryName,
00360                                 bool status)
00361     {
00362         std::unique_lock<Lock> guard(lock);
00363 
00364         if (!endpoints.count(endpointPath))
00365             return;
00366 
00367         auto & entry = endpoints[endpointPath];
00368 
00369         if (status) {
00370             if (!entry.connectedTo.empty()) {
00371                 // TODO
00372                 throw ML::Exception("TODO: handle connection with one already "
00373                                     "active");
00374             }
00375             entry.connectedTo = entryName;
00376             // TODO: watch config
00377         }
00378         else {
00379             if (entry.connectedTo.empty())
00380                 return;
00381             if (entry.connectedTo != entryName) {
00382                 throw ML::Exception("TODO: handle disconnection from non-active");
00383             }
00384             entry.connectedTo = "";
00385         }
00386     }
00387 };
00388 
00389 
00390 /*****************************************************************************/
00391 /* ZMQ NAMED SOCKET                                                      */
00392 /*****************************************************************************/
00393 
00396 struct ZmqNamedSocket: public MessageLoop {
00397 
00398     enum ConnectionState {
00399         NO_CONNECTION,   
00400         CONNECTING,      
00401         CONNECTED,       
00402         DISCONNECTED     
00403     };
00404 
00405     ZmqNamedSocket(zmq::context_t & context, int type)
00406         : context(&context),
00407           socketType(type),
00408           connectionState(NO_CONNECTION),
00409           monitor(context)
00410     {
00411         //using namespace std;
00412         //cerr << "created zmqNamedSocket at " << this << endl;
00413     }
00414 
00415     virtual ~ZmqNamedSocket()
00416     {
00417         shutdown();
00418     }
00419 
00423     void init(std::shared_ptr<ConfigurationService> config)
00424     {
00425         if (socket)
00426             throw ML::Exception("socket already initialized");
00427         socket.reset(new zmq::socket_t(*context, socketType));
00428         
00429         using namespace std;
00430 
00431         monitor.defaultHandler
00432             = std::bind(&ZmqNamedSocket::handleMonitorEvent,
00433                         this,
00434                         std::placeholders::_1,
00435                         std::placeholders::_2,
00436                         std::placeholders::_3);
00437 
00438         connector.connectHandler
00439             = std::bind(&ZmqNamedSocket::doConnect,
00440                         this,
00441                         std::placeholders::_1,
00442                         std::placeholders::_2,
00443                         std::placeholders::_3);
00444 
00445         connector.init(config);
00446         monitor.init(*socket);
00447 
00448         addSource("ZmqNamedSocket::connector", connector);
00449         addSource("ZmqNamedSocket::monitor", monitor);
00450         addSource("ZmqNamedSocket::socket",
00451                   std::make_shared<ZmqBinaryEventSource>
00452                   (*socket, [=] (std::vector<zmq::message_t> && message)
00453                    {
00454                        this->handleMessage(std::move(message));
00455                    }));
00456     }
00457 
00458     void shutdown()
00459     {
00460         MessageLoop::shutdown();
00461 
00462         if (!socket)
00463             return;
00464 
00465         disconnect();
00466         monitor.shutdown();
00467         connector.shutdown();
00468 
00469         socket->tryDisconnect(this->connectedAddress);
00470         socket.reset();
00471     }
00472 
00473     void connectToEndpoint(const std::string & endpointPath)
00474     {
00475         if (connectedEndpoint != "")
00476             throw ML::Exception("attempt to connect a ZmqNamedSocket "
00477                                 "to an enpoint that is already connected");
00478      
00479         this->connectedEndpoint = connectedEndpoint;
00480         this->connectionState = CONNECTING;
00481 
00482         // No lock needed as the connector has its own lock
00483 
00484         // Tell the connector to watch the endpoint.  When an entry pops
00485         // up, it will connect to it.
00486         connector.watchEndpoint(endpointPath);
00487     }
00488 
00490     void disconnect()
00491     {
00492         std::unique_lock<Lock> guard(lock);
00493         ExcAssert(socket);
00494         socket->tryDisconnect(connectedAddress);
00495         this->connectedEndpoint = "";
00496         this->connectedAddress = "";
00497         this->connectionState = DISCONNECTED;
00498         //connector.disconnect();
00499     }
00500 
00502     std::string getConnectedEndpoint() const
00503     {
00504         std::unique_lock<Lock> guard(lock);
00505         return connectedEndpoint;
00506     }
00507 
00509     std::string getConnectedAddress() const
00510     {
00511         std::unique_lock<Lock> guard(lock);
00512         return connectedAddress;
00513     }
00514 
00516     ConnectionState getConnectionState() const
00517     {
00518         // No lock needed as performed atomically
00519         return connectionState;
00520     }
00521 
00525     typedef std::function<void (std::vector<zmq::message_t> &&)> MessageHandler;
00526     MessageHandler messageHandler;
00527 
00532     virtual void handleMessage(std::vector<zmq::message_t> && message)
00533     {
00534         using namespace std;
00535         //cerr << "named socket got message " << message.at(0).toString()
00536         //     << endl;
00537         if (messageHandler)
00538             messageHandler(std::move(message));
00539         else throw ML::Exception("need to override either messageHandler "
00540                                  "or handleMessage");
00541     }
00542 
00553     template<typename Fn>
00554     void performSocketOpSync(Fn fn)
00555     {
00556         std::unique_lock<Lock> guard(lock);
00557         fn(*socket);
00558     }
00559 
00561     void sendSync(std::vector<zmq::message_t> && message)
00562     {
00563         std::unique_lock<Lock> guard(lock);
00564         for (unsigned i = 0;  i < message.size();  ++i) {
00565             socket->send(message[i], i == message.size() - 1
00566                          ? 0 : ZMQ_SNDMORE);
00567         }
00568     }
00569 
00570 private:
00571     // This lock is used to allow the synchronous methods to work without
00572     // needing ping-pong with the message loop.  Normally it should be
00573     // uncontended.
00574     typedef std::mutex Lock;
00575     mutable Lock lock;
00576 
00582     virtual bool doConnect(const std::string & endpointPath,
00583                            const std::string & entryName,
00584                            const Json::Value & epConfig)
00585     {
00586         using namespace std;
00587 
00588         //cerr << "   ((((((( doConnect for " << endpointPath << " " << entryName
00589         //     << endl;
00590 
00591         std::unique_lock<Lock> guard(lock);
00592 
00593         for (auto & entry: epConfig) {
00594 
00595             //cerr << "entry is " << entry << endl;
00596 
00597             if (!entry.isMember("zmqConnectUri"))
00598                 continue;
00599 
00600             auto hs = entry["transports"][0]["hostScope"];
00601             if (!hs)
00602                 continue;
00603 
00604             string hostScope = hs.asString();
00605             if (hs != "*") {
00606                 utsname name;
00607                 if (uname(&name))
00608                     throw ML::Exception(errno, "uname");
00609                 if (hostScope != name.nodename)
00610                     continue;  // wrong host scope
00611             }
00612 
00613             string uri = entry["zmqConnectUri"].asString();
00614 
00615             if (connectedAddress != "") {
00616                 // Already connected...
00617                 if (connectedAddress == uri)
00618                     return true;
00619                 else {
00620                     // Need to disconnect from the current address and connect to the new one
00621                     //cerr << "connectedAddress = " << connectedAddress << " uri = " << uri << endl;
00622                     socket->tryDisconnect(connectedAddress);
00623                     //throw ML::Exception("need to handle disconnect and reconnect to different "
00624                     //                    "address");
00625                 }
00626             }
00627             
00628             connectedAddress = uri;
00629             connectedEndpointPath = endpointPath;
00630             connectedEntryName = entryName;
00631             socket->connect(uri);
00632 
00633             //cerr << "connection in progress to " << uri << endl;
00634             connectionState = CONNECTING;
00635             return true;
00636         }
00637         
00638         return false;
00639     }
00640 
00642     void handleMonitorEvent(std::string addr, int param,
00643                             const zmq_event_t & event)
00644     {
00645         using namespace std;
00646 
00647         std::unique_lock<Lock> guard(lock);
00648 
00649         switch (event.event) {
00650 
00651         case ZMQ_EVENT_CONNECTED:
00652             //cerr << "connecting to " << connectedAddress
00653             //     << " connected to " << addr << endl;
00654             connectionState = CONNECTED;
00655             connectedFd = param;
00656             return;
00657 
00658         case ZMQ_EVENT_CONNECT_DELAYED:
00659             //cerr << "connecting to " << connectedAddress
00660             //     << " connection is delayed for " << addr << endl;
00661             return;  // this is normal that connect not return immediately
00662             
00663         case ZMQ_EVENT_DISCONNECTED:
00664             //cerr << "*********** connecting to " << connectedAddress
00665             //     << " disconnected from " << addr << " " << this << endl;
00666 
00667             socket->disconnect(connectedAddress);
00668 
00669             // Notify the connector that the endpoint has disconnected.  It will either:
00670             // 1.  Call back connect again on the same address, or
00671             // 2.  Attempt to connect somewhere else
00672 
00673             connector.handleDisconnection(connectedEndpointPath, connectedEntryName);
00674 
00675             connectionState = DISCONNECTED;
00676             connectedEndpointPath = "";
00677             connectedAddress = "";
00678             connectedEntryName = "";
00679             return;
00680             
00681         default:
00682             break;
00683         }
00684         
00685         //cerr << "got socket event "
00686         //     << printZmqEvent(event.event)
00687         //     << " on " << addr << " with " << param;
00688         //if (zmqEventIsError(event.event))
00689         //    cerr << " " << strerror(param);
00690         //cerr << endl;
00691     }
00692 
00694     zmq::context_t * context;
00695 
00697     int socketType;
00698     
00700     std::string connectedEndpoint;
00701 
00703     std::string connectedAddress;
00704 
00706     std::string connectedEndpointPath;
00707 
00709     std::string connectedEntryName;
00710 
00712     int connectedFd;
00713 
00715     ConnectionState connectionState;
00716 
00718     EndpointConnector connector;
00719 
00721     std::unique_ptr<zmq::socket_t> socket;
00722     
00724     ZmqSocketMonitor monitor;
00725 };
00726 
00727 
00728 /*****************************************************************************/
00729 /* ZMQ NAMED SUBSCRIBER                                                      */
00730 /*****************************************************************************/
00731 
00734 struct ZmqNamedSubscriber: public ZmqNamedSocket {
00735 
00736     ZmqNamedSubscriber(zmq::context_t & context)
00737         : ZmqNamedSocket(context, ZMQ_SUB)
00738     {
00739     }
00740 
00742     void subscribe(const std::string & prefix)
00743     {
00744         auto doSubscribe = [&] (zmq::socket_t & socket)
00745             {
00746                 subscribeChannel(socket, prefix);
00747             };
00748         
00749         performSocketOpSync(doSubscribe);
00750     }
00751 
00753     void subscribe(const std::vector<std::string> & prefixes)
00754     {
00755         auto doSubscribe = [&] (zmq::socket_t & socket)
00756             {
00757                 for (const auto& prefix : prefixes)
00758                     subscribeChannel(socket, prefix);
00759             };
00760 
00761         performSocketOpSync(doSubscribe);
00762     }
00763 
00764 };
00765 
00766 
00767 
00768 
00769 /*****************************************************************************/
00770 /* SERVICE PROVIDER WATCHER                                                  */
00771 /*****************************************************************************/
00772 
00786 struct ServiceProviderWatcher: public MessageLoop {
00787 
00788     ServiceProviderWatcher()
00789         : currentToken(1), changes(128)
00790     {
00791     }
00792 
00793     ~ServiceProviderWatcher()
00794     {
00795         using namespace std;
00796         //cerr << "shutting down service provider watcher" << endl;
00797         shutdown();
00798         //cerr << "done" << endl;
00799     }
00800 
00801     void init(std::shared_ptr<ConfigurationService> config)
00802     {
00803         this->config = config;
00804 
00805         changes.onEvent
00806             = std::bind(&ServiceProviderWatcher::handleServiceClassChange,
00807                         this,
00808                         std::placeholders::_1);
00809 
00810         addSource("ServiceProviderWatcher::changes", changes);
00811     }
00812 
00816     typedef std::function<void (std::string path, bool)> ChangeHandler;
00817     ChangeHandler changeHandler;
00818 
00827     uint64_t watchServiceClass(const std::string & serviceClass,
00828                                ChangeHandler onChange)
00829     {
00830         using namespace std;
00831         //cerr << "watching service class " << serviceClass << endl;
00832 
00833         // Allocate a token, then push the message on to the thread to be
00834         // processed.
00835         uint64_t token = __sync_fetch_and_add(&currentToken, 1);
00836 
00837         std::unique_lock<Lock> guard(lock);
00838 
00839         auto & entry = serviceClasses[serviceClass];
00840 
00841         //cerr << "entry.watch = " << entry.watch << endl;
00842 
00843         if (!entry.watch) {
00844             // First time that we watch this service; set it up
00845 
00846             entry.watch.init([=] (const std::string & path,
00847                                   ConfigurationService::ChangeType change)
00848                              {
00849                                  using namespace std;
00850                                  //cerr << "changed path " << path << endl;
00851 
00852                                  changes.push(serviceClass);
00853                              });
00854 
00855             changes.push(serviceClass);
00856         }
00857 
00858         entry.entries[token].onChange = onChange;
00859         
00860         return token;
00861     }
00862 
00864     void unwatchServiceClass(const std::string & serviceClass,
00865                              uint64_t token)
00866     {
00867         // TODO: synchronous?  Yes, as otherwise we can't guarantee anything
00868         // about keeping the entries valid
00869 
00870         throw ML::Exception("unwatchServiceClass: not done");
00871 
00872 #if 0        
00873         int done = 0;
00874 
00875         messages.push([&] () { this->doUnwatchServiceClass(serviceClass, token); done = 1; futex_wake(done); });
00876 
00877         while (!done)
00878             futex_wait(done, 0);
00879 #endif
00880     }
00881 
00882 private:
00883     typedef std::mutex Lock;
00884     mutable Lock lock;
00885     
00886     // Shared variable for the tokens to give out to allow unwatching
00887     uint64_t currentToken;
00888 
00889     struct WatchEntry {
00890         ChangeHandler onChange;
00891     };
00892 
00893     struct ServiceClassEntry {
00895         ConfigurationService::Watch watch;
00896         std::map<uint64_t, WatchEntry> entries; 
00897 
00901         std::vector<std::string> knownChildren;
00902     };
00903     
00905     std::map<std::string, ServiceClassEntry> serviceClasses;
00906 
00907     std::shared_ptr<ConfigurationService> config;
00908 
00910     void handleServiceClassChange(const std::string & serviceClass)
00911     {
00912         using namespace std;
00913 
00914         //cerr << "handleServiceClassChange " << serviceClass << endl;
00915         
00916         std::unique_lock<Lock> guard(lock);
00917 
00918         auto & entry = serviceClasses[serviceClass];
00919 
00920         //cerr << "handling service class change for " << serviceClass << endl;
00921 
00922         vector<string> children
00923             = config->getChildren("serviceClass/" + serviceClass,
00924                                   entry.watch);
00925 
00926         //cerr << "children = " << children << endl;
00927 
00928         std::sort(children.begin(), children.end());
00929 
00930         auto & knownChildren = entry.knownChildren;
00931 
00932         // Perform a diff between previously and currently known children
00933         vector<string> addedChildren, deletedChildren;
00934         std::set_difference(children.begin(), children.end(),
00935                             knownChildren.begin(), knownChildren.end(),
00936                             std::back_inserter(addedChildren));
00937         std::set_difference(knownChildren.begin(), knownChildren.end(),
00938                             children.begin(), children.end(),
00939                             std::back_inserter(deletedChildren));
00940 
00941         knownChildren.swap(children);
00942 
00943         guard.unlock();
00944 
00945         for (auto & c: addedChildren) {
00946             for (auto & e: entry.entries) {
00947                 e.second.onChange("serviceClass/" + serviceClass + "/" + c,
00948                                   true);
00949             }
00950         }
00951 
00952         for (auto & c: deletedChildren) {
00953             for (auto & e: entry.entries) {
00954                 e.second.onChange("serviceClass/" + serviceClass + "/" + c,
00955                                   false);
00956             }
00957         }
00958     }
00959 
00963     TypedMessageSink<std::string> changes;
00964 };
00965 
00966 
00967 /*****************************************************************************/
00968 /* ZMQ NAMED MULTIPLE SUBSCRIBER                                             */
00969 /*****************************************************************************/
00970 
00975 struct ZmqNamedMultipleSubscriber: public MessageLoop {
00976 
00977     ZmqNamedMultipleSubscriber(std::shared_ptr<zmq::context_t> context)
00978         : context(context)
00979     {
00980     }
00981 
00982     ~ZmqNamedMultipleSubscriber()
00983     {
00984         shutdown();
00985     }
00986 
00987     void init(std::shared_ptr<ConfigurationService> config)
00988     {
00989         this->config = config;
00990 
00991         serviceWatcher.init(config);
00992 
00993         addSource("ZmqNamedMultipleSubscriber", serviceWatcher);
00994 
00995         //debug(true);
00996     }
00997 
00998     void shutdown()
00999     {
01000         MessageLoop::shutdown();
01001 
01002         for (auto & sub: subscribers)
01003             sub.second->shutdown();
01004         subscribers.clear();
01005     }
01006 
01007     void connectAllServiceProviders(const std::string & serviceClass,
01008                                     const std::string & endpointName,
01009                                     const std::vector<std::string> & prefixes
01010                                         = std::vector<std::string>())
01011     {
01012         auto onServiceChange = [=] (const std::string & service,
01013                                     bool created)
01014             {
01015                 using namespace std;
01016                 //cerr << "onServiceChange " << serviceClass << " " << endpointName
01017                 //<< " " << service << " created " << created << endl;
01018 
01019                 if (created)
01020                     connectService(serviceClass, service, endpointName);
01021                 else
01022                     disconnectService(serviceClass, service, endpointName);
01023             };
01024         
01025         this->prefixes = prefixes;
01026         serviceWatcher.watchServiceClass(serviceClass, onServiceChange);
01027     }
01028     
01032     typedef std::function<void (std::vector<zmq::message_t> &&)> MessageHandler;
01033     MessageHandler messageHandler;
01034 
01039     virtual void handleMessage(std::vector<zmq::message_t> && message)
01040     {
01041         if (messageHandler)
01042             messageHandler(std::move(message));
01043         else throw ML::Exception("need to override either messageHandler "
01044                                  "or handleMessage");
01045     }
01046 
01048     void connectService(std::string serviceClass, std::string service,
01049                         std::string endpointName)
01050     {
01051         using namespace std;
01052 
01053         std::unique_lock<Lock> guard(lock);
01054 
01055         SubscriberMap::const_iterator found = subscribers.find(service);
01056         if(found != subscribers.end())
01057         {
01058            if(found->second->getConnectionState() == ZmqNamedSocket::CONNECTED)
01059            {
01060                std::cerr << "Already connected to service " << service << std::endl;
01061                return;
01062            }
01063            else
01064            {
01065              std::cerr << "we already had a connection entry to service " << service <<" - reuse " << std::endl;
01066              Json::Value value = config->getJson(service);
01067              std::string path = value["servicePath"].asString();
01068              found->second->connectToEndpoint(path); 
01069              return ;
01070            }
01071         } 
01072          
01073         std::unique_ptr<ZmqNamedSubscriber> sub
01074             (new ZmqNamedSubscriber(*context));
01075 
01076         // Set up to handle the message
01077         sub->messageHandler = [=] (std::vector<zmq::message_t> && msg)
01078             {
01079                 //cerr << "SUB MESSAGE HANDLER " << this << endl;
01080                 this->handleMessage(std::move(msg));
01081             };
01082                                         
01083         sub->init(config);
01084 
01085         // TODO: put a watch in to reconnect if this changes
01086         Json::Value value = config->getJson(service);
01087         std::string path = value["servicePath"].asString();
01088 
01089         //cerr << "(((((((((((( connecting to service " << service
01090         //     << " at endpoint " << path + "/" + endpointName << endl;
01091         
01092         //cerr << "+-+-+-+-+- connecting to endpoint " << path + "/" + endpointName << endl;
01093         //cerr << config->getChildren(path + "/" + endpointName) << endl;
01094 
01095         if (!prefixes.empty())
01096             sub->subscribe(prefixes);
01097         else sub->subscribe(""); // Subscribe to everything.
01098 
01099         sub->connectToEndpoint(path + "/" + endpointName);
01100         addSourceDeferred(service, *sub);
01101         subscribers[service] = std::move(sub);
01102     }
01103 
01105     void disconnectService(std::string serviceClass, std::string service,
01106                            std::string endpointName)
01107     {
01108         using namespace std;
01109         cerr << "need to disconenct from " << serviceClass << " "
01110              << service << " " << endpointName << endl;
01111 //        cerr << "aborting as disconnect not done yet" << endl;
01112         SubscriberMap::const_iterator found = subscribers.find(service);
01113         if(found != subscribers.end())
01114         {
01115            found->second->disconnect();
01116         } 
01117         //abort();
01118     }
01119 
01123     ServiceProviderWatcher serviceWatcher;
01124              
01125     std::shared_ptr<zmq::context_t> context;
01126 
01127     typedef std::mutex Lock;
01128     mutable Lock lock;
01129 
01131     typedef std::map<std::string, std::unique_ptr<ZmqNamedSubscriber> > SubscriberMap; 
01132     SubscriberMap subscribers;
01133 
01134     std::shared_ptr<ConfigurationService> config;
01135 
01136     std::vector<std::string> prefixes;
01137 };
01138 
01139 
01140 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator