RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
testing/mock_exchange.cc
00001 
00009 #include "mock_exchange.h"
00010 
00011 #include "rtbkit/core/post_auction/post_auction_loop.h"
00012 #include "soa/service/http_header.h"
00013 
00014 #include <array>
00015 
00016 using namespace std;
00017 using namespace ML;
00018 
00019 namespace RTBKIT {
00020 
00021 /******************************************************************************/
00022 /* MOCK EXCHANGE                                                              */
00023 /******************************************************************************/
00024 
00025 MockExchange::
00026 MockExchange(Datacratic::ServiceProxyArguments & args, const std::string& name) :
00027     ServiceBase(name, args.makeServiceProxies()),
00028     running(0) {
00029 }
00030 
00031 
00032 MockExchange::
00033 MockExchange(const shared_ptr<ServiceProxies> proxies, const string& name) :
00034     ServiceBase(name, proxies),
00035     running(0) {
00036 }
00037 
00038 
00039 MockExchange::
00040 ~MockExchange()
00041 {
00042     threads.join_all();
00043 }
00044 
00045 
00046 void
00047 MockExchange::
00048 start(size_t threadCount, size_t numBidRequests, std::vector<int> const & bidPorts, std::vector<int> const & winPorts)
00049 {
00050     try {
00051         running = threadCount;
00052 
00053         auto startWorker = [=](size_t i, int bidPort, int winPort) {
00054             Worker worker(this, i, bidPort, winPort);
00055             if(numBidRequests) {
00056                 worker.run(numBidRequests);
00057             }
00058             else {
00059                 worker.run();
00060             }
00061 
00062             ML::atomic_dec(running);
00063         };
00064 
00065         int bp = 0;
00066         int wp = 0;
00067 
00068         for(size_t i = 0; i != threadCount; ++i) {
00069             int bidPort = bidPorts[bp++ % bidPorts.size()];
00070             int winPort = winPorts[wp++ % winPorts.size()];
00071             threads.create_thread(std::bind(startWorker, i, bidPort, winPort));
00072         }
00073     }
00074     catch (const exception& ex) {
00075         cerr << "got exception on request: " << ex.what() << endl;
00076     }
00077 }
00078 
00079 
00080 MockExchange::Stream::
00081 Stream(int port) : addr(0), fd(-1)
00082 {
00083     addrinfo hint = { 0, AF_INET, SOCK_STREAM, 0, 0, 0, 0, 0 };
00084 
00085     int res = getaddrinfo(0, to_string(port).c_str(), &hint, &addr);
00086     ExcCheckErrno(!res, "getaddrinfo failed");
00087 
00088     if(!addr) {
00089         throw ML::Exception("cannot find suitable address");
00090     }
00091 
00092     std::cerr << "publishing on port " << port << std::endl;
00093     connect();
00094 }
00095 
00096 
00097 MockExchange::Stream::
00098 ~Stream()
00099 {
00100     if (addr) freeaddrinfo(addr);
00101     if (fd >= 0) close(fd);
00102 }
00103 
00104 
00105 void
00106 MockExchange::Stream::
00107 connect()
00108 {
00109     if(fd >= 0) close(fd);
00110 
00111     fd = socket(AF_INET, SOCK_STREAM, 0);
00112     ExcCheckErrno(fd != -1, "socket failed");
00113 
00114     for(;;) {
00115         int res = ::connect(fd, addr->ai_addr, addr->ai_addrlen);
00116         if(res == 0) {
00117             break;
00118         }
00119 
00120         //ML::sleep(0.1);
00121     }
00122 }
00123 
00124 
00125 void
00126 MockExchange::BidStream::
00127 sendBidRequest(const BidRequest& originalBidRequest)
00128 {
00129     string strBidRequest = originalBidRequest.toJsonStr();
00130     string httpRequest = ML::format(
00131             "POST /bids HTTP/1.1\r\n"
00132             "Content-Length: %zd\r\n"
00133             "Content-Type: application/json\r\n"
00134             "Connection: Keep-Alive\r\n"
00135             "\r\n"
00136             "%s",
00137             strBidRequest.size(),
00138             strBidRequest.c_str());
00139 
00140 
00141     const char * current = httpRequest.c_str();
00142     const char * end = current + httpRequest.size();
00143 
00144     while (current != end) {
00145         int res = send(fd, current, end - current, MSG_NOSIGNAL);
00146         if(res == 0 || res == -1) {
00147             connect();
00148             current = httpRequest.c_str();
00149             continue;
00150         }
00151 
00152         current += res;
00153     }
00154 
00155     ExcAssertEqual((void *)current, (void *)end);
00156 }
00157 
00158 
00159 auto
00160 MockExchange::BidStream::
00161 parseResponse(const string& rawResponse) -> pair<bool, vector<Bid>>
00162 {
00163     Json::Value payload;
00164 
00165     try {
00166         HttpHeader header;
00167         header.parse(rawResponse);
00168         payload = Json::parse(header.knownData);
00169     }
00170     catch (const exception & exc) {
00171         cerr << "invalid response received: " << exc.what() << endl;
00172         return make_pair(false, vector<Bid>());
00173     }
00174 
00175     if (payload.isMember("error")) {
00176         cerr << "error returned: "
00177             << payload["error"] << endl
00178             << payload["details"] << endl;
00179         return make_pair(false, vector<Bid>());
00180     }
00181 
00182     ExcAssert(payload.isMember("imp"));
00183 
00184     vector<Bid> bids;
00185 
00186     for (size_t i = 0; i < payload["imp"].size(); ++i) {
00187         auto& spot = payload["imp"][i];
00188 
00189         Bid bid;
00190 
00191         bid.adSpotId = Id(spot["id"].asString());
00192         bid.maxPrice = spot["max_price"].asInt();
00193         bid.account = AccountKey(spot["account"].asString(), '.');
00194         bids.push_back(bid);
00195     }
00196 
00197     return make_pair(true, bids);
00198 }
00199 
00200 
00201 auto
00202 MockExchange::BidStream::
00203 recvBid() -> pair<bool, vector<Bid>>
00204 {
00205     array<char, 16384> buffer;
00206 
00207     int res = recv(fd, buffer.data(), buffer.size(), 0);
00208     if (res == 0 || (res == -1 && errno == ECONNRESET)) {
00209         return make_pair(false, vector<Bid>());
00210     }
00211 
00212     ExcCheckErrno(res != -1, "recv");
00213 
00214     close(fd);
00215     fd = -1;
00216 
00217     return parseResponse(string(buffer.data(), res));
00218 }
00219 
00220 
00221 auto
00222 MockExchange::BidStream::
00223 makeBidRequest() -> BidRequest
00224 {
00225     BidRequest bidRequest;
00226 
00227     FormatSet formats;
00228     formats.push_back(Format(160,600));
00229     AdSpot spot;
00230     spot.id = Id(1);
00231     spot.formats = formats;
00232     bidRequest.imp.push_back(spot);
00233 
00234     formats[0] = Format(300,250);
00235     spot.id = Id(2);
00236     bidRequest.imp.push_back(spot);
00237 
00238     bidRequest.location.countryCode = "CA";
00239     bidRequest.location.regionCode = "QC";
00240     bidRequest.location.cityName = "Montreal";
00241     bidRequest.auctionId = Id(id * 10000000 + key);
00242     bidRequest.exchange = "test";
00243     bidRequest.language = "en";
00244     bidRequest.url = Url("http://datacratic.com");
00245     bidRequest.timestamp = Date::now();
00246     bidRequest.userIds.add(Id(std::string("foo")), ID_EXCHANGE);
00247     bidRequest.userIds.add(Id(std::string("bar")), ID_PROVIDER);
00248     ++key;
00249 
00250     return bidRequest;
00251 }
00252 
00253 
00254 void
00255 MockExchange::WinStream::
00256 sendWin(const BidRequest& bidRequest, const Bid& bid, const Amount& winPrice)
00257 {
00258     PostAuctionEvent event;
00259     event.type = PAE_WIN;
00260     event.auctionId = bidRequest.auctionId;
00261     event.adSpotId = bid.adSpotId;
00262     event.timestamp = Date::now();
00263     event.winPrice = winPrice;
00264     event.uids = bidRequest.userIds;
00265     event.account = bid.account;
00266     event.bidTimestamp = bid.bidTimestamp;
00267 
00268     string str = event.toJson().toString();
00269     string httpRequest = ML::format(
00270             "POST /win HTTP/1.1\r\n"
00271             "Content-Length: %zd\r\n"
00272             "Content-Type: application/json\r\n"
00273             "Connection: Keep-Alive\r\n"
00274             "\r\n"
00275             "%s",
00276             str.size(),
00277             str.c_str());
00278 
00279     const char * current = httpRequest.c_str();
00280     const char * end = current + httpRequest.size();
00281 
00282     while (current != end) {
00283         int res = send(fd, current, end - current, MSG_NOSIGNAL);
00284         if(res == 0 || res == -1) {
00285             connect();
00286             current = httpRequest.c_str();
00287             continue;
00288         }
00289 
00290         current += res;
00291     }
00292 
00293     close(fd);
00294     fd = -1;
00295 
00296     std::cerr << "win sent payload=" << str << std::endl;
00297 
00298     ExcAssertEqual((void *)current, (void *)end);
00299 }
00300 
00301 
00302 MockExchange::Worker::
00303 Worker(MockExchange * exchange, size_t id, int bidPort, int winPort) : exchange(exchange), bids(bidPort, id), wins(winPort), rng(random()) {
00304 }
00305 
00306 
00307 void
00308 MockExchange::Worker::
00309 run() {
00310     for(;;) {
00311         bid();
00312     }
00313 }
00314 
00315 
00316 void
00317 MockExchange::Worker::
00318 run(size_t requests) {
00319     for(size_t i = 0; i != requests; ++i) {
00320         bid();
00321     }
00322 }
00323 
00324 
00325 void
00326 MockExchange::Worker::bid() {
00327     BidRequest bidRequest = bids.makeBidRequest();
00328     exchange->recordHit("requests");
00329 
00330     for (;;) {
00331         bids.sendBidRequest(bidRequest);
00332         exchange->recordHit("sent");
00333 
00334         auto response = bids.recvBid();
00335         if (!response.first) continue;
00336         exchange->recordHit("bids");
00337 
00338         vector<Bid> bids = response.second;
00339 
00340         for (const Bid& bid : bids) {
00341             auto ret = isWin(bidRequest, bid);
00342             if (!ret.first) continue;
00343             wins.sendWin(bidRequest, bid, ret.second);
00344             exchange->recordHit("wins");
00345         }
00346 
00347         break;
00348     }
00349 }
00350 
00351 
00352 pair<bool, Amount>
00353 MockExchange::Worker::isWin(const BidRequest&, const Bid& bid)
00354 {
00355     if (rng.random01() >= 0.1)
00356         return make_pair(false, Amount());
00357 
00358     return make_pair(true, MicroUSD(bid.maxPrice * rng.random01()));
00359 }
00360 
00361 
00362 } // namepsace RTBKIT
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator