RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 00010 #include "augmentor_ex.h" 00011 00012 #include "rtbkit/core/agent_configuration/agent_configuration_listener.h" 00013 #include "rtbkit/core/agent_configuration/agent_config.h" 00014 #include "rtbkit/plugins/augmentor/augmentor_base.h" 00015 #include "rtbkit/common/bid_request.h" 00016 #include "soa/service/zmq_named_pub_sub.h" 00017 #include "jml/utils/exc_assert.h" 00018 00019 #include <unordered_map> 00020 #include <mutex> 00021 00022 00023 using namespace std; 00024 00025 namespace RTBKIT { 00026 00027 00028 /******************************************************************************/ 00029 /* FREQUENCY CAP STORAGE */ 00030 /******************************************************************************/ 00031 00040 struct FrequencyCapStorage 00041 { 00045 size_t get(const RTBKIT::AccountKey& account, const RTBKIT::UserIds& uids) 00046 { 00047 lock_guard<mutex> guard(lock); 00048 return counters[uids.exchangeId][account[0]]; 00049 } 00050 00054 void inc(const RTBKIT::AccountKey& account, const RTBKIT::UserIds& uids) 00055 { 00056 lock_guard<mutex> guard(lock); 00057 counters[uids.exchangeId][account[0]]++; 00058 } 00059 00060 private: 00061 00062 mutex lock; 00063 unordered_map<Datacratic::Id, unordered_map<string, size_t> > counters; 00064 00065 }; 00066 00067 /******************************************************************************/ 00068 /* FREQUENCY CAP AUGMENTOR */ 00069 /******************************************************************************/ 00070 00075 FrequencyCapAugmentor:: 00076 FrequencyCapAugmentor( 00077 std::shared_ptr<Datacratic::ServiceProxies> services, 00078 const string& serviceName, 00079 const string& augmentorName) : 00080 SyncAugmentor(augmentorName, serviceName, services), 00081 storage(new FrequencyCapStorage()), 00082 agentConfig(getZmqContext()), 00083 palEvents(getZmqContext()) 00084 { 00085 recordHit("up"); 00086 } 00087 00088 00095 void 00096 FrequencyCapAugmentor:: 00097 init() 00098 { 00099 SyncAugmentor::init(2 /* numThreads */); 00100 00101 /* Manages all the communications with the AgentConfigurationService. */ 00102 agentConfig.init(getServices()->config); 00103 addSource("FrequencyCapAugmentor::agentConfig", agentConfig); 00104 00105 /* This lambda will get called when the post auction loop receives a win 00106 on an auction. 00107 */ 00108 palEvents.messageHandler = [&] (const vector<zmq::message_t>& msg) 00109 { 00110 RTBKIT::AccountKey account(msg[19].toString()); 00111 RTBKIT::UserIds uids = 00112 RTBKIT::UserIds::createFromJson(msg[15].toString()); 00113 00114 storage->inc(account, uids); 00115 recordHit("wins"); 00116 }; 00117 00118 palEvents.connectAllServiceProviders( 00119 "rtbPostAuctionService", "logger", {"MATCHEDWIN"}); 00120 00121 addSource("FrequencyCapAugmentor::palEvents", palEvents); 00122 } 00123 00124 00131 RTBKIT::AugmentationList 00132 FrequencyCapAugmentor:: 00133 onRequest(const RTBKIT::AugmentationRequest& request) 00134 { 00135 recordHit("requests"); 00136 00137 RTBKIT::AugmentationList result; 00138 00139 const RTBKIT::UserIds& uids = request.bidRequest->userIds; 00140 00141 for (const string& agent : request.agents) { 00142 00143 RTBKIT::AgentConfigEntry config = agentConfig.getAgentEntry(agent); 00144 00145 /* When a new agent comes online there's a race condition where the 00146 router may send us a bid request for that agent before we receive 00147 its configuration. This check keeps us safe in that scenario. 00148 */ 00149 if (!config.valid()) { 00150 recordHit("unknownConfig"); 00151 continue; 00152 } 00153 00154 const RTBKIT::AccountKey& account = config.config->account; 00155 00156 size_t count = storage->get(account, uids); 00157 00158 /* The number of times a user has been seen by a given agent can be 00159 useful to make bid decisions so attach this data to the bid 00160 request. 00161 00162 It's also recomended to place your data in an object labeled 00163 after the augmentor from which it originated. 00164 */ 00165 result[account[0]].data[request.augmentor] = count; 00166 00167 /* We tag bid requests that pass the frequency cap filtering because 00168 if the augmentation doesn't terminate in time or if an error 00169 occured, then the bid request will not receive any tags and will 00170 therefor be filtered out for agents that require the frequency 00171 capping. 00172 */ 00173 if (count < getCap(request.augmentor, agent, config)) { 00174 result[account].tags.insert("pass-frequency-cap-ex"); 00175 recordHit("accounts." + account[0] + ".passed"); 00176 } 00177 else recordHit("accounts." + account[0] + ".capped"); 00178 } 00179 00180 return result; 00181 } 00182 00183 00189 size_t 00190 FrequencyCapAugmentor:: 00191 getCap( const string& augmentor, 00192 const string& agent, 00193 const RTBKIT::AgentConfigEntry& config) const 00194 { 00195 for (const auto& augConfig : config.config->augmentations) { 00196 if (augConfig.name != augmentor) continue; 00197 return augConfig.config.asInt(); 00198 } 00199 00200 /* There's a race condition here where if an agent removes our augmentor 00201 from its configuration while there are bid requests being augmented 00202 for that agent then we may not find its config. A sane default is 00203 good to have in this scenario. 00204 */ 00205 00206 return 0; 00207 } 00208 00209 } // namespace RTBKIT 00210