![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 00009 #include "mock_exchange.h" 00010 00011 #include "rtbkit/core/post_auction/post_auction_loop.h" 00012 #include "soa/service/http_header.h" 00013 00014 #include <array> 00015 00016 using namespace std; 00017 using namespace ML; 00018 00019 namespace RTBKIT { 00020 00021 /******************************************************************************/ 00022 /* MOCK EXCHANGE */ 00023 /******************************************************************************/ 00024 00025 MockExchange:: 00026 MockExchange(Datacratic::ServiceProxyArguments & args, const std::string& name) : 00027 ServiceBase(name, args.makeServiceProxies()), 00028 running(0) { 00029 } 00030 00031 00032 MockExchange:: 00033 MockExchange(const shared_ptr<ServiceProxies> proxies, const string& name) : 00034 ServiceBase(name, proxies), 00035 running(0) { 00036 } 00037 00038 00039 MockExchange:: 00040 ~MockExchange() 00041 { 00042 threads.join_all(); 00043 } 00044 00045 00046 void 00047 MockExchange:: 00048 start(size_t threadCount, size_t numBidRequests, std::vector<int> const & bidPorts, std::vector<int> const & winPorts) 00049 { 00050 try { 00051 running = threadCount; 00052 00053 auto startWorker = [=](size_t i, int bidPort, int winPort) { 00054 Worker worker(this, i, bidPort, winPort); 00055 if(numBidRequests) { 00056 worker.run(numBidRequests); 00057 } 00058 else { 00059 worker.run(); 00060 } 00061 00062 ML::atomic_dec(running); 00063 }; 00064 00065 int bp = 0; 00066 int wp = 0; 00067 00068 for(size_t i = 0; i != threadCount; ++i) { 00069 int bidPort = bidPorts[bp++ % bidPorts.size()]; 00070 int winPort = winPorts[wp++ % winPorts.size()]; 00071 threads.create_thread(std::bind(startWorker, i, bidPort, winPort)); 00072 } 00073 } 00074 catch (const exception& ex) { 00075 cerr << "got exception on request: " << ex.what() << endl; 00076 } 00077 } 00078 00079 00080 MockExchange::Stream:: 00081 Stream(int port) : addr(0), fd(-1) 00082 { 00083 addrinfo hint = { 0, AF_INET, SOCK_STREAM, 0, 0, 0, 0, 0 }; 00084 00085 int res = getaddrinfo(0, to_string(port).c_str(), &hint, &addr); 00086 ExcCheckErrno(!res, "getaddrinfo failed"); 00087 00088 if(!addr) { 00089 throw ML::Exception("cannot find suitable address"); 00090 } 00091 00092 std::cerr << "publishing on port " << port << std::endl; 00093 connect(); 00094 } 00095 00096 00097 MockExchange::Stream:: 00098 ~Stream() 00099 { 00100 if (addr) freeaddrinfo(addr); 00101 if (fd >= 0) close(fd); 00102 } 00103 00104 00105 void 00106 MockExchange::Stream:: 00107 connect() 00108 { 00109 if(fd >= 0) close(fd); 00110 00111 fd = socket(AF_INET, SOCK_STREAM, 0); 00112 ExcCheckErrno(fd != -1, "socket failed"); 00113 00114 for(;;) { 00115 int res = ::connect(fd, addr->ai_addr, addr->ai_addrlen); 00116 if(res == 0) { 00117 break; 00118 } 00119 00120 //ML::sleep(0.1); 00121 } 00122 } 00123 00124 00125 void 00126 MockExchange::BidStream:: 00127 sendBidRequest(const BidRequest& originalBidRequest) 00128 { 00129 string strBidRequest = originalBidRequest.toJsonStr(); 00130 string httpRequest = ML::format( 00131 "POST /bids HTTP/1.1\r\n" 00132 "Content-Length: %zd\r\n" 00133 "Content-Type: application/json\r\n" 00134 "Connection: Keep-Alive\r\n" 00135 "\r\n" 00136 "%s", 00137 strBidRequest.size(), 00138 strBidRequest.c_str()); 00139 00140 00141 const char * current = httpRequest.c_str(); 00142 const char * end = current + httpRequest.size(); 00143 00144 while (current != end) { 00145 int res = send(fd, current, end - current, MSG_NOSIGNAL); 00146 if(res == 0 || res == -1) { 00147 connect(); 00148 current = httpRequest.c_str(); 00149 continue; 00150 } 00151 00152 current += res; 00153 } 00154 00155 ExcAssertEqual((void *)current, (void *)end); 00156 } 00157 00158 00159 auto 00160 MockExchange::BidStream:: 00161 parseResponse(const string& rawResponse) -> pair<bool, vector<Bid>> 00162 { 00163 Json::Value payload; 00164 00165 try { 00166 HttpHeader header; 00167 header.parse(rawResponse); 00168 payload = Json::parse(header.knownData); 00169 } 00170 catch (const exception & exc) { 00171 cerr << "invalid response received: " << exc.what() << endl; 00172 return make_pair(false, vector<Bid>()); 00173 } 00174 00175 if (payload.isMember("error")) { 00176 cerr << "error returned: " 00177 << payload["error"] << endl 00178 << payload["details"] << endl; 00179 return make_pair(false, vector<Bid>()); 00180 } 00181 00182 ExcAssert(payload.isMember("imp")); 00183 00184 vector<Bid> bids; 00185 00186 for (size_t i = 0; i < payload["imp"].size(); ++i) { 00187 auto& spot = payload["imp"][i]; 00188 00189 Bid bid; 00190 00191 bid.adSpotId = Id(spot["id"].asString()); 00192 bid.maxPrice = spot["max_price"].asInt(); 00193 bid.account = AccountKey(spot["account"].asString(), '.'); 00194 bids.push_back(bid); 00195 } 00196 00197 return make_pair(true, bids); 00198 } 00199 00200 00201 auto 00202 MockExchange::BidStream:: 00203 recvBid() -> pair<bool, vector<Bid>> 00204 { 00205 array<char, 16384> buffer; 00206 00207 int res = recv(fd, buffer.data(), buffer.size(), 0); 00208 if (res == 0 || (res == -1 && errno == ECONNRESET)) { 00209 return make_pair(false, vector<Bid>()); 00210 } 00211 00212 ExcCheckErrno(res != -1, "recv"); 00213 00214 close(fd); 00215 fd = -1; 00216 00217 return parseResponse(string(buffer.data(), res)); 00218 } 00219 00220 00221 auto 00222 MockExchange::BidStream:: 00223 makeBidRequest() -> BidRequest 00224 { 00225 BidRequest bidRequest; 00226 00227 FormatSet formats; 00228 formats.push_back(Format(160,600)); 00229 AdSpot spot; 00230 spot.id = Id(1); 00231 spot.formats = formats; 00232 bidRequest.imp.push_back(spot); 00233 00234 formats[0] = Format(300,250); 00235 spot.id = Id(2); 00236 bidRequest.imp.push_back(spot); 00237 00238 bidRequest.location.countryCode = "CA"; 00239 bidRequest.location.regionCode = "QC"; 00240 bidRequest.location.cityName = "Montreal"; 00241 bidRequest.auctionId = Id(id * 10000000 + key); 00242 bidRequest.exchange = "test"; 00243 bidRequest.language = "en"; 00244 bidRequest.url = Url("http://datacratic.com"); 00245 bidRequest.timestamp = Date::now(); 00246 bidRequest.userIds.add(Id(std::string("foo")), ID_EXCHANGE); 00247 bidRequest.userIds.add(Id(std::string("bar")), ID_PROVIDER); 00248 ++key; 00249 00250 return bidRequest; 00251 } 00252 00253 00254 void 00255 MockExchange::WinStream:: 00256 sendWin(const BidRequest& bidRequest, const Bid& bid, const Amount& winPrice) 00257 { 00258 PostAuctionEvent event; 00259 event.type = PAE_WIN; 00260 event.auctionId = bidRequest.auctionId; 00261 event.adSpotId = bid.adSpotId; 00262 event.timestamp = Date::now(); 00263 event.winPrice = winPrice; 00264 event.uids = bidRequest.userIds; 00265 event.account = bid.account; 00266 event.bidTimestamp = bid.bidTimestamp; 00267 00268 string str = event.toJson().toString(); 00269 string httpRequest = ML::format( 00270 "POST /win HTTP/1.1\r\n" 00271 "Content-Length: %zd\r\n" 00272 "Content-Type: application/json\r\n" 00273 "Connection: Keep-Alive\r\n" 00274 "\r\n" 00275 "%s", 00276 str.size(), 00277 str.c_str()); 00278 00279 const char * current = httpRequest.c_str(); 00280 const char * end = current + httpRequest.size(); 00281 00282 while (current != end) { 00283 int res = send(fd, current, end - current, MSG_NOSIGNAL); 00284 if(res == 0 || res == -1) { 00285 connect(); 00286 current = httpRequest.c_str(); 00287 continue; 00288 } 00289 00290 current += res; 00291 } 00292 00293 close(fd); 00294 fd = -1; 00295 00296 std::cerr << "win sent payload=" << str << std::endl; 00297 00298 ExcAssertEqual((void *)current, (void *)end); 00299 } 00300 00301 00302 MockExchange::Worker:: 00303 Worker(MockExchange * exchange, size_t id, int bidPort, int winPort) : exchange(exchange), bids(bidPort, id), wins(winPort), rng(random()) { 00304 } 00305 00306 00307 void 00308 MockExchange::Worker:: 00309 run() { 00310 for(;;) { 00311 bid(); 00312 } 00313 } 00314 00315 00316 void 00317 MockExchange::Worker:: 00318 run(size_t requests) { 00319 for(size_t i = 0; i != requests; ++i) { 00320 bid(); 00321 } 00322 } 00323 00324 00325 void 00326 MockExchange::Worker::bid() { 00327 BidRequest bidRequest = bids.makeBidRequest(); 00328 exchange->recordHit("requests"); 00329 00330 for (;;) { 00331 bids.sendBidRequest(bidRequest); 00332 exchange->recordHit("sent"); 00333 00334 auto response = bids.recvBid(); 00335 if (!response.first) continue; 00336 exchange->recordHit("bids"); 00337 00338 vector<Bid> bids = response.second; 00339 00340 for (const Bid& bid : bids) { 00341 auto ret = isWin(bidRequest, bid); 00342 if (!ret.first) continue; 00343 wins.sendWin(bidRequest, bid, ret.second); 00344 exchange->recordHit("wins"); 00345 } 00346 00347 break; 00348 } 00349 } 00350 00351 00352 pair<bool, Amount> 00353 MockExchange::Worker::isWin(const BidRequest&, const Bid& bid) 00354 { 00355 if (rng.random01() >= 0.1) 00356 return make_pair(false, Amount()); 00357 00358 return make_pair(true, MicroUSD(bid.maxPrice * rng.random01())); 00359 } 00360 00361 00362 } // namepsace RTBKIT
1.7.6.1