RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* data_logger.cc 00002 Jeremy Barnes, March 2011 00003 Wolfgang Sourdeau, February 2013 00004 Copyright (c) 2012, 2013 Datacratic. All rights reserved. 00005 00006 Launches the router's logger. 00007 */ 00008 00009 00010 #include "data_logger.h" 00011 00012 00013 using namespace std; 00014 using namespace Datacratic; 00015 using namespace RTBKIT; 00016 00017 DataLogger:: 00018 DataLogger(std::shared_ptr<ServiceProxies> proxies) 00019 : ServiceBase("data_logger", proxies), 00020 Logger(proxies->zmqContext), 00021 multipleSubscriber(proxies->zmqContext), 00022 monitorProviderClient(proxies->zmqContext, *this) 00023 {} 00024 00025 DataLogger:: 00026 ~DataLogger() 00027 { 00028 monitorProviderClient.shutdown(); 00029 shutdown(); 00030 } 00031 00032 void 00033 DataLogger:: 00034 init() 00035 { 00036 Logger::init(); 00037 monitorProviderClient.init(getServices()->config); 00038 00039 multipleSubscriber.init(getServices()->config); 00040 multipleSubscriber.messageHandler 00041 = [&] (vector<zmq::message_t> && msg) { 00042 // forward to logger class 00043 vector<string> s; 00044 s.reserve(msg.size()); 00045 for (auto & m: msg) 00046 s.push_back(m.toString()); 00047 this->logMessageNoTimestamp(s); 00048 }; 00049 00050 //messageLoop.addSource("DataLogger::multipleSubscriber", 00051 // multipleSubscriber); 00052 } 00053 00054 void 00055 DataLogger:: 00056 start(std::function<void ()> onStop) 00057 { 00058 Logger::start(onStop); 00059 multipleSubscriber.start(); 00060 monitorProviderClient.start(); 00061 } 00062 00063 void 00064 DataLogger:: 00065 shutdown() 00066 { 00067 monitorProviderClient.shutdown(); 00068 Logger::shutdown(); 00069 multipleSubscriber.shutdown(); 00070 } 00071 00072 void 00073 DataLogger:: 00074 connectAllServiceProviders(const string & serviceClass, const string & epName) 00075 { 00076 multipleSubscriber.connectAllServiceProviders(serviceClass, epName); 00077 } 00078 00080 string 00081 DataLogger:: 00082 getProviderName() 00083 const 00084 { 00085 return serviceName(); 00086 } 00087 00088 Json::Value 00089 DataLogger:: 00090 getProviderIndicators() 00091 const 00092 { 00093 bool status(true); 00094 00095 for (const auto & pair: multipleSubscriber.subscribers) { 00096 if (pair.second->getConnectionState() 00097 == ZmqNamedSocket::ConnectionState::DISCONNECTED) { 00098 status = false; 00099 break; 00100 } 00101 } 00102 00103 Json::Value indicators; 00104 indicators["status"] = status ? "ok" : "failure"; 00105 00106 return indicators; 00107 }