RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* zmq_named_pub_sub.h -*- C++ -*- 00002 Jeremy Barnes, 8 January 2013 00003 Copyright (c) 2013 Datacratic Inc. All rights reserved. 00004 00005 Named publish/subscribe endpoint. 00006 */ 00007 00008 #pragma once 00009 00010 #include "zmq_endpoint.h" 00011 #include "typed_message_channel.h" 00012 #include <sys/utsname.h> 00013 #include "jml/arch/backtrace.h" 00014 00015 namespace Datacratic { 00016 00017 00018 00019 /*****************************************************************************/ 00020 /* ZMQ NAMED PUBLISHER */ 00021 /*****************************************************************************/ 00022 00025 struct ZmqNamedPublisher: public MessageLoop { 00026 00027 ZmqNamedPublisher(std::shared_ptr<zmq::context_t> context, 00028 int messageBufferSize = 10000) 00029 : publishEndpoint(context), 00030 publishQueue(messageBufferSize) 00031 { 00032 } 00033 00034 virtual ~ZmqNamedPublisher() 00035 { 00036 shutdown(); 00037 } 00038 00039 void init(std::shared_ptr<ConfigurationService> config, 00040 const std::string & endpointName, 00041 const std::string & identity = "") 00042 { 00043 using namespace std; 00044 00045 // Initialize the publisher endpoint 00046 publishEndpoint.init(config, ZMQ_XPUB, endpointName); 00047 00048 // Called when we queue up a message to be published 00049 publishQueue.onEvent = [=] (std::vector<zmq::message_t> && message) 00050 { 00051 using namespace std; 00052 //cerr << "popped message to publish" << endl; 00053 publishEndpoint.sendMessage(std::move(message)); 00054 }; 00055 00056 // Called when there is a new subscription. 00057 // The first byte is 1 (subscription) or 0 (unsubscription). 00058 // The rest of the message is the filter for the subscription 00059 auto doPublishMessage = [=] (const std::vector<std::string> & msg) 00060 { 00061 #if 0 00062 cerr << "got publish subscription" << msg << endl; 00063 cerr << "msg.size() = " << msg.size() << endl; 00064 cerr << "msg[0].size() = " << msg[0].size() << endl; 00065 cerr << "msg[0][0] = " << (int)msg[0][0] << endl; 00066 #endif 00067 }; 00068 00069 publishEndpoint.messageHandler = doPublishMessage; 00070 00071 // Hook everything into the message loop 00072 addSource("ZmqNamedPublisher::publishEndpoint", publishEndpoint); 00073 addSource("ZmqNamedPublisher::publishQueue", publishQueue); 00074 00075 } 00076 00077 std::string bindTcp(PortRange const & portRange = PortRange(), std::string host = "") 00078 { 00079 return publishEndpoint.bindTcp(portRange, host); 00080 } 00081 00082 void shutdown() 00083 { 00084 MessageLoop::shutdown(); 00085 publishEndpoint.shutdown(); 00086 //publishEndpointMonitor.shutdown(); 00087 } 00088 00089 //std::vector<std::string> getSubscribers() 00090 //{ 00091 //} 00092 00093 template<typename Head, typename... Tail> 00094 void encodeAll(std::vector<zmq::message_t> & messages, 00095 Head head, 00096 Tail&&... tail) 00097 { 00098 messages.emplace_back(std::move(encodeMessage(head))); 00099 encodeAll(messages, std::forward<Tail>(tail)...); 00100 } 00101 00102 // Vectors treated specially... they are copied 00103 template<typename... Tail> 00104 void encodeAll(std::vector<zmq::message_t> & messages, 00105 const std::vector<std::string> & head, 00106 Tail&&... tail) 00107 { 00108 for (auto & m: head) 00109 messages.emplace_back(std::move(encodeMessage(m))); 00110 encodeAll(messages, std::forward<Tail>(tail)...); 00111 } 00112 00113 void encodeAll(std::vector<zmq::message_t> & messages) 00114 { 00115 } 00116 00117 template<typename... Args> 00118 void publish(const std::string & channel, Args&&... args) 00119 { 00120 std::vector<zmq::message_t> messages; 00121 messages.reserve(sizeof...(Args) + 1); 00122 00123 encodeAll(messages, channel, 00124 std::forward<Args>(args)...); 00125 publishQueue.push(messages); 00126 } 00127 00128 private: 00130 ZmqNamedEndpoint publishEndpoint; 00131 00133 TypedMessageSink<std::vector<zmq::message_t> > publishQueue; 00134 }; 00135 00136 00137 /*****************************************************************************/ 00138 /* ENDPOINT CONNECTOR */ 00139 /*****************************************************************************/ 00140 00173 struct EndpointConnector : public MessageLoop { 00174 00175 EndpointConnector() 00176 : changes(32) 00177 { 00178 } 00179 00180 void init(std::shared_ptr<ConfigurationService> config) 00181 { 00182 this->config = config; 00183 00184 changes.onEvent 00185 = std::bind(&EndpointConnector::handleEndpointChange, 00186 this, 00187 std::placeholders::_1); 00188 00189 addSource("EndpointConnector::changes", changes); 00190 } 00191 00192 void watchEndpoint(const std::string & endpointPath) 00193 { 00194 std::unique_lock<Lock> guard(lock); 00195 00196 using namespace std; 00197 //cerr << "watching endpoint " << endpointPath << endl; 00198 00199 auto & entry = endpoints[endpointPath]; 00200 00201 if (!entry.watch) { 00202 // First time that we watch this service; set it up 00203 00204 //cerr << "=============== initializing watch for " << endpointPath << endl; 00205 00206 entry.watch.init([=] (const std::string & path, 00207 ConfigurationService::ChangeType change) 00208 { 00209 using namespace std; 00210 //cerr << "endpoint changed path " << path << " " << this << endl; 00211 00212 { 00213 std::unique_lock<Lock> guard(lock); 00214 if (endpoints.count(endpointPath)) { 00215 auto & entry = endpoints[endpointPath]; 00216 entry.watchIsSet = false; 00217 } 00218 } 00219 00220 changes.push(endpointPath); 00221 }); 00222 00223 changes.push(endpointPath); 00224 } 00225 } 00226 00230 void unwatchEndpoint(const std::string & endpoint, 00231 bool forceDisconnect) 00232 { 00233 throw ML::Exception("unwatchEndpoint not done"); 00234 } 00235 00236 std::function<bool (const std::string & endpointPath, 00237 const std::string & entry, 00238 const Json::Value & params)> connectHandler; 00239 00250 virtual bool connect(const std::string & endpointPath, 00251 const std::string & entryName, 00252 const Json::Value & params) 00253 { 00254 if (connectHandler) 00255 return connectHandler(endpointPath, entryName, params); 00256 throw ML::Exception("no connect override"); 00257 } 00258 00260 void handleDisconnection(const std::string & endpointPath, 00261 const std::string & entryName) 00262 { 00263 std::unique_lock<Lock> guard(lock); 00264 00265 if (!endpoints.count(endpointPath)) 00266 return; 00267 00268 auto & entry = endpoints[endpointPath]; 00269 00270 ExcAssertEqual(entry.connectedTo, entryName); 00271 entry.connectedTo = ""; 00272 00273 // And we attempt to reconnect 00274 // Note that we can't push endpointPath on changes 00275 changes.push(endpointPath); 00276 } 00277 00278 private: 00279 typedef std::mutex Lock; 00280 mutable Lock lock; 00281 00285 TypedMessageSink<std::string> changes; 00286 00287 struct Entry { 00288 Entry() 00289 : watchIsSet(false) 00290 { 00291 } 00292 00293 ConfigurationService::Watch watch; 00294 std::string connectedTo; 00295 bool watchIsSet; 00296 }; 00297 00298 std::map<std::string, Entry> endpoints; 00299 00300 std::shared_ptr<ConfigurationService> config; 00301 00302 void handleEndpointChange(const std::string & endpointPath) 00303 { 00304 using namespace std; 00305 00306 //cerr << "handleEndpointChange " << endpointPath << endl; 00307 00308 std::unique_lock<Lock> guard(lock); 00309 00310 if (!endpoints.count(endpointPath)) 00311 return; 00312 00313 auto & entry = endpoints[endpointPath]; 00314 00315 //cerr << "handling service class change for " << endpointPath << endl; 00316 00317 vector<string> children; 00318 if (entry.watchIsSet) 00319 children = config->getChildren(endpointPath); 00320 else children = config->getChildren(endpointPath, entry.watch); 00321 entry.watchIsSet = true; 00322 00323 //cerr << "children = " << children << endl; 00324 00325 // If we're connected, look for a change in the endpoints 00326 if (!entry.connectedTo.empty()) { 00327 00328 // Does our connected node still exist? 00329 if (std::find(children.begin(), children.end(), entry.connectedTo) 00330 == children.end()) { 00331 // Node disappeared; we need to disconnect 00332 guard.unlock(); 00333 handleDisconnection(endpointPath, entry.connectedTo); 00334 return; // handleDisconnection will call into us recursively 00335 } 00336 else { 00337 // Node is still there 00338 // TODO: check for change in value 00339 return; 00340 } 00341 } 00342 00343 guard.unlock(); 00344 00345 // If we got here, we're not connected 00346 for (auto & c: children) { 00347 Json::Value cfg = config->getJson(endpointPath + "/" + c); 00348 if (connect(endpointPath, c, cfg)) { 00349 notifyConnectionStatus(endpointPath, c, true); 00350 return; 00351 } 00352 } 00353 00354 cerr << "warning: could not connect to " << endpointPath << " immediately" 00355 << endl; 00356 } 00357 00358 void notifyConnectionStatus(const std::string & endpointPath, 00359 const std::string & entryName, 00360 bool status) 00361 { 00362 std::unique_lock<Lock> guard(lock); 00363 00364 if (!endpoints.count(endpointPath)) 00365 return; 00366 00367 auto & entry = endpoints[endpointPath]; 00368 00369 if (status) { 00370 if (!entry.connectedTo.empty()) { 00371 // TODO 00372 throw ML::Exception("TODO: handle connection with one already " 00373 "active"); 00374 } 00375 entry.connectedTo = entryName; 00376 // TODO: watch config 00377 } 00378 else { 00379 if (entry.connectedTo.empty()) 00380 return; 00381 if (entry.connectedTo != entryName) { 00382 throw ML::Exception("TODO: handle disconnection from non-active"); 00383 } 00384 entry.connectedTo = ""; 00385 } 00386 } 00387 }; 00388 00389 00390 /*****************************************************************************/ 00391 /* ZMQ NAMED SOCKET */ 00392 /*****************************************************************************/ 00393 00396 struct ZmqNamedSocket: public MessageLoop { 00397 00398 enum ConnectionState { 00399 NO_CONNECTION, 00400 CONNECTING, 00401 CONNECTED, 00402 DISCONNECTED 00403 }; 00404 00405 ZmqNamedSocket(zmq::context_t & context, int type) 00406 : context(&context), 00407 socketType(type), 00408 connectionState(NO_CONNECTION), 00409 monitor(context) 00410 { 00411 //using namespace std; 00412 //cerr << "created zmqNamedSocket at " << this << endl; 00413 } 00414 00415 virtual ~ZmqNamedSocket() 00416 { 00417 shutdown(); 00418 } 00419 00423 void init(std::shared_ptr<ConfigurationService> config) 00424 { 00425 if (socket) 00426 throw ML::Exception("socket already initialized"); 00427 socket.reset(new zmq::socket_t(*context, socketType)); 00428 00429 using namespace std; 00430 00431 monitor.defaultHandler 00432 = std::bind(&ZmqNamedSocket::handleMonitorEvent, 00433 this, 00434 std::placeholders::_1, 00435 std::placeholders::_2, 00436 std::placeholders::_3); 00437 00438 connector.connectHandler 00439 = std::bind(&ZmqNamedSocket::doConnect, 00440 this, 00441 std::placeholders::_1, 00442 std::placeholders::_2, 00443 std::placeholders::_3); 00444 00445 connector.init(config); 00446 monitor.init(*socket); 00447 00448 addSource("ZmqNamedSocket::connector", connector); 00449 addSource("ZmqNamedSocket::monitor", monitor); 00450 addSource("ZmqNamedSocket::socket", 00451 std::make_shared<ZmqBinaryEventSource> 00452 (*socket, [=] (std::vector<zmq::message_t> && message) 00453 { 00454 this->handleMessage(std::move(message)); 00455 })); 00456 } 00457 00458 void shutdown() 00459 { 00460 MessageLoop::shutdown(); 00461 00462 if (!socket) 00463 return; 00464 00465 disconnect(); 00466 monitor.shutdown(); 00467 connector.shutdown(); 00468 00469 socket->tryDisconnect(this->connectedAddress); 00470 socket.reset(); 00471 } 00472 00473 void connectToEndpoint(const std::string & endpointPath) 00474 { 00475 if (connectedEndpoint != "") 00476 throw ML::Exception("attempt to connect a ZmqNamedSocket " 00477 "to an enpoint that is already connected"); 00478 00479 this->connectedEndpoint = connectedEndpoint; 00480 this->connectionState = CONNECTING; 00481 00482 // No lock needed as the connector has its own lock 00483 00484 // Tell the connector to watch the endpoint. When an entry pops 00485 // up, it will connect to it. 00486 connector.watchEndpoint(endpointPath); 00487 } 00488 00490 void disconnect() 00491 { 00492 std::unique_lock<Lock> guard(lock); 00493 ExcAssert(socket); 00494 socket->tryDisconnect(connectedAddress); 00495 this->connectedEndpoint = ""; 00496 this->connectedAddress = ""; 00497 this->connectionState = DISCONNECTED; 00498 //connector.disconnect(); 00499 } 00500 00502 std::string getConnectedEndpoint() const 00503 { 00504 std::unique_lock<Lock> guard(lock); 00505 return connectedEndpoint; 00506 } 00507 00509 std::string getConnectedAddress() const 00510 { 00511 std::unique_lock<Lock> guard(lock); 00512 return connectedAddress; 00513 } 00514 00516 ConnectionState getConnectionState() const 00517 { 00518 // No lock needed as performed atomically 00519 return connectionState; 00520 } 00521 00525 typedef std::function<void (std::vector<zmq::message_t> &&)> MessageHandler; 00526 MessageHandler messageHandler; 00527 00532 virtual void handleMessage(std::vector<zmq::message_t> && message) 00533 { 00534 using namespace std; 00535 //cerr << "named socket got message " << message.at(0).toString() 00536 // << endl; 00537 if (messageHandler) 00538 messageHandler(std::move(message)); 00539 else throw ML::Exception("need to override either messageHandler " 00540 "or handleMessage"); 00541 } 00542 00553 template<typename Fn> 00554 void performSocketOpSync(Fn fn) 00555 { 00556 std::unique_lock<Lock> guard(lock); 00557 fn(*socket); 00558 } 00559 00561 void sendSync(std::vector<zmq::message_t> && message) 00562 { 00563 std::unique_lock<Lock> guard(lock); 00564 for (unsigned i = 0; i < message.size(); ++i) { 00565 socket->send(message[i], i == message.size() - 1 00566 ? 0 : ZMQ_SNDMORE); 00567 } 00568 } 00569 00570 private: 00571 // This lock is used to allow the synchronous methods to work without 00572 // needing ping-pong with the message loop. Normally it should be 00573 // uncontended. 00574 typedef std::mutex Lock; 00575 mutable Lock lock; 00576 00582 virtual bool doConnect(const std::string & endpointPath, 00583 const std::string & entryName, 00584 const Json::Value & epConfig) 00585 { 00586 using namespace std; 00587 00588 //cerr << " ((((((( doConnect for " << endpointPath << " " << entryName 00589 // << endl; 00590 00591 std::unique_lock<Lock> guard(lock); 00592 00593 for (auto & entry: epConfig) { 00594 00595 //cerr << "entry is " << entry << endl; 00596 00597 if (!entry.isMember("zmqConnectUri")) 00598 continue; 00599 00600 auto hs = entry["transports"][0]["hostScope"]; 00601 if (!hs) 00602 continue; 00603 00604 string hostScope = hs.asString(); 00605 if (hs != "*") { 00606 utsname name; 00607 if (uname(&name)) 00608 throw ML::Exception(errno, "uname"); 00609 if (hostScope != name.nodename) 00610 continue; // wrong host scope 00611 } 00612 00613 string uri = entry["zmqConnectUri"].asString(); 00614 00615 if (connectedAddress != "") { 00616 // Already connected... 00617 if (connectedAddress == uri) 00618 return true; 00619 else { 00620 // Need to disconnect from the current address and connect to the new one 00621 //cerr << "connectedAddress = " << connectedAddress << " uri = " << uri << endl; 00622 socket->tryDisconnect(connectedAddress); 00623 //throw ML::Exception("need to handle disconnect and reconnect to different " 00624 // "address"); 00625 } 00626 } 00627 00628 connectedAddress = uri; 00629 connectedEndpointPath = endpointPath; 00630 connectedEntryName = entryName; 00631 socket->connect(uri); 00632 00633 //cerr << "connection in progress to " << uri << endl; 00634 connectionState = CONNECTING; 00635 return true; 00636 } 00637 00638 return false; 00639 } 00640 00642 void handleMonitorEvent(std::string addr, int param, 00643 const zmq_event_t & event) 00644 { 00645 using namespace std; 00646 00647 std::unique_lock<Lock> guard(lock); 00648 00649 switch (event.event) { 00650 00651 case ZMQ_EVENT_CONNECTED: 00652 //cerr << "connecting to " << connectedAddress 00653 // << " connected to " << addr << endl; 00654 connectionState = CONNECTED; 00655 connectedFd = param; 00656 return; 00657 00658 case ZMQ_EVENT_CONNECT_DELAYED: 00659 //cerr << "connecting to " << connectedAddress 00660 // << " connection is delayed for " << addr << endl; 00661 return; // this is normal that connect not return immediately 00662 00663 case ZMQ_EVENT_DISCONNECTED: 00664 //cerr << "*********** connecting to " << connectedAddress 00665 // << " disconnected from " << addr << " " << this << endl; 00666 00667 socket->disconnect(connectedAddress); 00668 00669 // Notify the connector that the endpoint has disconnected. It will either: 00670 // 1. Call back connect again on the same address, or 00671 // 2. Attempt to connect somewhere else 00672 00673 connector.handleDisconnection(connectedEndpointPath, connectedEntryName); 00674 00675 connectionState = DISCONNECTED; 00676 connectedEndpointPath = ""; 00677 connectedAddress = ""; 00678 connectedEntryName = ""; 00679 return; 00680 00681 default: 00682 break; 00683 } 00684 00685 //cerr << "got socket event " 00686 // << printZmqEvent(event.event) 00687 // << " on " << addr << " with " << param; 00688 //if (zmqEventIsError(event.event)) 00689 // cerr << " " << strerror(param); 00690 //cerr << endl; 00691 } 00692 00694 zmq::context_t * context; 00695 00697 int socketType; 00698 00700 std::string connectedEndpoint; 00701 00703 std::string connectedAddress; 00704 00706 std::string connectedEndpointPath; 00707 00709 std::string connectedEntryName; 00710 00712 int connectedFd; 00713 00715 ConnectionState connectionState; 00716 00718 EndpointConnector connector; 00719 00721 std::unique_ptr<zmq::socket_t> socket; 00722 00724 ZmqSocketMonitor monitor; 00725 }; 00726 00727 00728 /*****************************************************************************/ 00729 /* ZMQ NAMED SUBSCRIBER */ 00730 /*****************************************************************************/ 00731 00734 struct ZmqNamedSubscriber: public ZmqNamedSocket { 00735 00736 ZmqNamedSubscriber(zmq::context_t & context) 00737 : ZmqNamedSocket(context, ZMQ_SUB) 00738 { 00739 } 00740 00742 void subscribe(const std::string & prefix) 00743 { 00744 auto doSubscribe = [&] (zmq::socket_t & socket) 00745 { 00746 subscribeChannel(socket, prefix); 00747 }; 00748 00749 performSocketOpSync(doSubscribe); 00750 } 00751 00753 void subscribe(const std::vector<std::string> & prefixes) 00754 { 00755 auto doSubscribe = [&] (zmq::socket_t & socket) 00756 { 00757 for (const auto& prefix : prefixes) 00758 subscribeChannel(socket, prefix); 00759 }; 00760 00761 performSocketOpSync(doSubscribe); 00762 } 00763 00764 }; 00765 00766 00767 00768 00769 /*****************************************************************************/ 00770 /* SERVICE PROVIDER WATCHER */ 00771 /*****************************************************************************/ 00772 00786 struct ServiceProviderWatcher: public MessageLoop { 00787 00788 ServiceProviderWatcher() 00789 : currentToken(1), changes(128) 00790 { 00791 } 00792 00793 ~ServiceProviderWatcher() 00794 { 00795 using namespace std; 00796 //cerr << "shutting down service provider watcher" << endl; 00797 shutdown(); 00798 //cerr << "done" << endl; 00799 } 00800 00801 void init(std::shared_ptr<ConfigurationService> config) 00802 { 00803 this->config = config; 00804 00805 changes.onEvent 00806 = std::bind(&ServiceProviderWatcher::handleServiceClassChange, 00807 this, 00808 std::placeholders::_1); 00809 00810 addSource("ServiceProviderWatcher::changes", changes); 00811 } 00812 00816 typedef std::function<void (std::string path, bool)> ChangeHandler; 00817 ChangeHandler changeHandler; 00818 00827 uint64_t watchServiceClass(const std::string & serviceClass, 00828 ChangeHandler onChange) 00829 { 00830 using namespace std; 00831 //cerr << "watching service class " << serviceClass << endl; 00832 00833 // Allocate a token, then push the message on to the thread to be 00834 // processed. 00835 uint64_t token = __sync_fetch_and_add(¤tToken, 1); 00836 00837 std::unique_lock<Lock> guard(lock); 00838 00839 auto & entry = serviceClasses[serviceClass]; 00840 00841 //cerr << "entry.watch = " << entry.watch << endl; 00842 00843 if (!entry.watch) { 00844 // First time that we watch this service; set it up 00845 00846 entry.watch.init([=] (const std::string & path, 00847 ConfigurationService::ChangeType change) 00848 { 00849 using namespace std; 00850 //cerr << "changed path " << path << endl; 00851 00852 changes.push(serviceClass); 00853 }); 00854 00855 changes.push(serviceClass); 00856 } 00857 00858 entry.entries[token].onChange = onChange; 00859 00860 return token; 00861 } 00862 00864 void unwatchServiceClass(const std::string & serviceClass, 00865 uint64_t token) 00866 { 00867 // TODO: synchronous? Yes, as otherwise we can't guarantee anything 00868 // about keeping the entries valid 00869 00870 throw ML::Exception("unwatchServiceClass: not done"); 00871 00872 #if 0 00873 int done = 0; 00874 00875 messages.push([&] () { this->doUnwatchServiceClass(serviceClass, token); done = 1; futex_wake(done); }); 00876 00877 while (!done) 00878 futex_wait(done, 0); 00879 #endif 00880 } 00881 00882 private: 00883 typedef std::mutex Lock; 00884 mutable Lock lock; 00885 00886 // Shared variable for the tokens to give out to allow unwatching 00887 uint64_t currentToken; 00888 00889 struct WatchEntry { 00890 ChangeHandler onChange; 00891 }; 00892 00893 struct ServiceClassEntry { 00895 ConfigurationService::Watch watch; 00896 std::map<uint64_t, WatchEntry> entries; 00897 00901 std::vector<std::string> knownChildren; 00902 }; 00903 00905 std::map<std::string, ServiceClassEntry> serviceClasses; 00906 00907 std::shared_ptr<ConfigurationService> config; 00908 00910 void handleServiceClassChange(const std::string & serviceClass) 00911 { 00912 using namespace std; 00913 00914 //cerr << "handleServiceClassChange " << serviceClass << endl; 00915 00916 std::unique_lock<Lock> guard(lock); 00917 00918 auto & entry = serviceClasses[serviceClass]; 00919 00920 //cerr << "handling service class change for " << serviceClass << endl; 00921 00922 vector<string> children 00923 = config->getChildren("serviceClass/" + serviceClass, 00924 entry.watch); 00925 00926 //cerr << "children = " << children << endl; 00927 00928 std::sort(children.begin(), children.end()); 00929 00930 auto & knownChildren = entry.knownChildren; 00931 00932 // Perform a diff between previously and currently known children 00933 vector<string> addedChildren, deletedChildren; 00934 std::set_difference(children.begin(), children.end(), 00935 knownChildren.begin(), knownChildren.end(), 00936 std::back_inserter(addedChildren)); 00937 std::set_difference(knownChildren.begin(), knownChildren.end(), 00938 children.begin(), children.end(), 00939 std::back_inserter(deletedChildren)); 00940 00941 knownChildren.swap(children); 00942 00943 guard.unlock(); 00944 00945 for (auto & c: addedChildren) { 00946 for (auto & e: entry.entries) { 00947 e.second.onChange("serviceClass/" + serviceClass + "/" + c, 00948 true); 00949 } 00950 } 00951 00952 for (auto & c: deletedChildren) { 00953 for (auto & e: entry.entries) { 00954 e.second.onChange("serviceClass/" + serviceClass + "/" + c, 00955 false); 00956 } 00957 } 00958 } 00959 00963 TypedMessageSink<std::string> changes; 00964 }; 00965 00966 00967 /*****************************************************************************/ 00968 /* ZMQ NAMED MULTIPLE SUBSCRIBER */ 00969 /*****************************************************************************/ 00970 00975 struct ZmqNamedMultipleSubscriber: public MessageLoop { 00976 00977 ZmqNamedMultipleSubscriber(std::shared_ptr<zmq::context_t> context) 00978 : context(context) 00979 { 00980 } 00981 00982 ~ZmqNamedMultipleSubscriber() 00983 { 00984 shutdown(); 00985 } 00986 00987 void init(std::shared_ptr<ConfigurationService> config) 00988 { 00989 this->config = config; 00990 00991 serviceWatcher.init(config); 00992 00993 addSource("ZmqNamedMultipleSubscriber", serviceWatcher); 00994 00995 //debug(true); 00996 } 00997 00998 void shutdown() 00999 { 01000 MessageLoop::shutdown(); 01001 01002 for (auto & sub: subscribers) 01003 sub.second->shutdown(); 01004 subscribers.clear(); 01005 } 01006 01007 void connectAllServiceProviders(const std::string & serviceClass, 01008 const std::string & endpointName, 01009 const std::vector<std::string> & prefixes 01010 = std::vector<std::string>()) 01011 { 01012 auto onServiceChange = [=] (const std::string & service, 01013 bool created) 01014 { 01015 using namespace std; 01016 //cerr << "onServiceChange " << serviceClass << " " << endpointName 01017 //<< " " << service << " created " << created << endl; 01018 01019 if (created) 01020 connectService(serviceClass, service, endpointName); 01021 else 01022 disconnectService(serviceClass, service, endpointName); 01023 }; 01024 01025 this->prefixes = prefixes; 01026 serviceWatcher.watchServiceClass(serviceClass, onServiceChange); 01027 } 01028 01032 typedef std::function<void (std::vector<zmq::message_t> &&)> MessageHandler; 01033 MessageHandler messageHandler; 01034 01039 virtual void handleMessage(std::vector<zmq::message_t> && message) 01040 { 01041 if (messageHandler) 01042 messageHandler(std::move(message)); 01043 else throw ML::Exception("need to override either messageHandler " 01044 "or handleMessage"); 01045 } 01046 01048 void connectService(std::string serviceClass, std::string service, 01049 std::string endpointName) 01050 { 01051 using namespace std; 01052 01053 std::unique_lock<Lock> guard(lock); 01054 01055 SubscriberMap::const_iterator found = subscribers.find(service); 01056 if(found != subscribers.end()) 01057 { 01058 if(found->second->getConnectionState() == ZmqNamedSocket::CONNECTED) 01059 { 01060 std::cerr << "Already connected to service " << service << std::endl; 01061 return; 01062 } 01063 else 01064 { 01065 std::cerr << "we already had a connection entry to service " << service <<" - reuse " << std::endl; 01066 Json::Value value = config->getJson(service); 01067 std::string path = value["servicePath"].asString(); 01068 found->second->connectToEndpoint(path); 01069 return ; 01070 } 01071 } 01072 01073 std::unique_ptr<ZmqNamedSubscriber> sub 01074 (new ZmqNamedSubscriber(*context)); 01075 01076 // Set up to handle the message 01077 sub->messageHandler = [=] (std::vector<zmq::message_t> && msg) 01078 { 01079 //cerr << "SUB MESSAGE HANDLER " << this << endl; 01080 this->handleMessage(std::move(msg)); 01081 }; 01082 01083 sub->init(config); 01084 01085 // TODO: put a watch in to reconnect if this changes 01086 Json::Value value = config->getJson(service); 01087 std::string path = value["servicePath"].asString(); 01088 01089 //cerr << "(((((((((((( connecting to service " << service 01090 // << " at endpoint " << path + "/" + endpointName << endl; 01091 01092 //cerr << "+-+-+-+-+- connecting to endpoint " << path + "/" + endpointName << endl; 01093 //cerr << config->getChildren(path + "/" + endpointName) << endl; 01094 01095 if (!prefixes.empty()) 01096 sub->subscribe(prefixes); 01097 else sub->subscribe(""); // Subscribe to everything. 01098 01099 sub->connectToEndpoint(path + "/" + endpointName); 01100 addSourceDeferred(service, *sub); 01101 subscribers[service] = std::move(sub); 01102 } 01103 01105 void disconnectService(std::string serviceClass, std::string service, 01106 std::string endpointName) 01107 { 01108 using namespace std; 01109 cerr << "need to disconenct from " << serviceClass << " " 01110 << service << " " << endpointName << endl; 01111 // cerr << "aborting as disconnect not done yet" << endl; 01112 SubscriberMap::const_iterator found = subscribers.find(service); 01113 if(found != subscribers.end()) 01114 { 01115 found->second->disconnect(); 01116 } 01117 //abort(); 01118 } 01119 01123 ServiceProviderWatcher serviceWatcher; 01124 01125 std::shared_ptr<zmq::context_t> context; 01126 01127 typedef std::mutex Lock; 01128 mutable Lock lock; 01129 01131 typedef std::map<std::string, std::unique_ptr<ZmqNamedSubscriber> > SubscriberMap; 01132 SubscriberMap subscribers; 01133 01134 std::shared_ptr<ConfigurationService> config; 01135 01136 std::vector<std::string> prefixes; 01137 }; 01138 01139 01140 } // namespace Datacratic