RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/s3.cc
00001 
00008 #include "soa/service/s3.h"
00009 #include "jml/utils/string_functions.h"
00010 #include "soa/types/date.h"
00011 #include "soa/types/url.h"
00012 #include "jml/arch/futex.h"
00013 #include "jml/utils/exc_assert.h"
00014 #include "jml/utils/pair_utils.h"
00015 #include "jml/utils/vector_utils.h"
00016 #include "jml/utils/filter_streams.h"
00017 #include "jml/arch/timers.h"
00018 #include "jml/utils/ring_buffer.h"
00019 #include "jml/utils/hash.h"
00020 #include "jml/utils/file_functions.h"
00021 
00022 #define CRYPTOPP_ENABLE_NAMESPACE_WEAK 1
00023 #include "crypto++/sha.h"
00024 #include "crypto++/md5.h"
00025 #include "crypto++/hmac.h"
00026 #include "crypto++/base64.h"
00027 
00028 #include <curlpp/cURLpp.hpp>
00029 #include <curlpp/Easy.hpp>
00030 #include <curlpp/Options.hpp>
00031 #include <curlpp/Info.hpp>
00032 #include <curlpp/Infos.hpp>
00033 
00034 #include <boost/iostreams/stream_buffer.hpp>
00035 #include <exception>
00036 #include <thread>
00037 #include <unordered_map>
00038 
00039 #include <boost/filesystem.hpp>
00040 
00041 
00042 using namespace std;
00043 using namespace ML;
00044 
00045 namespace Datacratic {
00046 
00047 double
00048 S3Api::
00049 defaultBandwidthToServiceMbps = 20.0;
00050 
00051 S3Api::
00052 S3Api()
00053 {
00054     bandwidthToServiceMbps = defaultBandwidthToServiceMbps;
00055 }
00056 
00057 S3Api::
00058 S3Api(const std::string & accessKeyId,
00059       const std::string & accessKey,
00060       double bandwidthToServiceMbps,
00061       const std::string & defaultProtocol,
00062       const std::string & serviceUri)
00063     : accessKeyId(accessKeyId),
00064       accessKey(accessKey),
00065       defaultProtocol(defaultProtocol),
00066       serviceUri(serviceUri),
00067       bandwidthToServiceMbps(bandwidthToServiceMbps)
00068 {
00069 }
00070 
00071 void
00072 S3Api::
00073 init(const std::string & accessKeyId,
00074      const std::string & accessKey,
00075      double bandwidthToServiceMbps,
00076      const std::string & defaultProtocol,
00077      const std::string & serviceUri)
00078 {
00079     this->accessKeyId = accessKeyId;
00080     this->accessKey = accessKey;
00081     this->defaultProtocol = defaultProtocol;
00082     this->serviceUri = serviceUri;
00083     this->bandwidthToServiceMbps = bandwidthToServiceMbps;
00084 }
00085 
00086 S3Api::Content::
00087 Content(const tinyxml2::XMLDocument & xml)
00088 {
00089     tinyxml2::XMLPrinter printer;
00090     const_cast<tinyxml2::XMLDocument &>(xml).Print(&printer);
00091     this->contentType = "application/xml";
00092     this->str = printer.CStr();
00093     this->hasContent = true;
00094     this->data = str.c_str();
00095     this->size = str.length();
00096 }
00097 
00098 std::string
00099 S3Api::
00100 getDigestMulti(const std::string & verb,
00101                const std::string & bucket,
00102                const std::string & resource,
00103                const std::string & subResource,
00104                const std::string & contentType,
00105                const std::string & contentMd5,
00106                const std::string & date,
00107                const std::vector<std::pair<std::string, std::string> > & headers)
00108 {
00109     map<string, string> canonHeaders;
00110     for (auto it = headers.begin(), end = headers.end();
00111          it != end;  ++it) {
00112         string key = lowercase(it->first);
00113         if (key.find("x-amz") != 0) continue;
00114 
00115         string value = it->second;
00116         if (canonHeaders.count(key))
00117             canonHeaders[key] += ",";
00118         canonHeaders[key] += value;
00119     }
00120 
00121     return getDigest(verb, bucket, resource, subResource,
00122                      contentType, contentMd5, date, canonHeaders);
00123 
00124 }
00125 
00126 /*
00127 Authorization = "AWS" + " " + AWSAccessKeyId + ":" + Signature;
00128 
00129 Signature = Base64( HMAC-SHA1( YourSecretAccessKeyID, UTF-8-Encoding-Of( StringToSign ) ) );
00130 
00131 StringToSign = HTTP-Verb + "\n" +
00132     Content-MD5 + "\n" +
00133     Content-Type + "\n" +
00134     Date + "\n" +
00135     CanonicalizedAmzHeaders +
00136     CanonicalizedResource;
00137 
00138 CanonicalizedResource = [ "/" + Bucket ] +
00139     <HTTP-Request-URI, from the protocol name up to the query string> +
00140     [ sub-resource, if present. For example "?acl", "?location", "?logging", or "?torrent"];
00141 
00142 CanonicalizedAmzHeaders = <described below>
00143 To construct the CanonicalizedAmzHeaders part of StringToSign, select all HTTP request headers that start with 'x-amz-' (using a case-insensitive comparison) and use the following process.
00144 
00145 CanonicalizedAmzHeaders Process
00146 1   Convert each HTTP header name to lower-case. For example, 'X-Amz-Date' becomes 'x-amz-date'.
00147 2   Sort the collection of headers lexicographically by header name.
00148 3   Combine header fields with the same name into one "header-name:comma-separated-value-list" pair as prescribed by RFC 2616, section 4.2, without any white-space between values. For example, the two metadata headers 'x-amz-meta-username: fred' and 'x-amz-meta-username: barney' would be combined into the single header 'x-amz-meta-username: fred,barney'.
00149 4   "Unfold" long headers that span multiple lines (as allowed by RFC 2616, section 4.2) by replacing the folding white-space (including new-line) by a single space.
00150 5   Trim any white-space around the colon in the header. For example, the header 'x-amz-meta-username: fred,barney' would become 'x-amz-meta-username:fred,barney'
00151 6   Finally, append a new-line (U+000A) to each canonicalized header in the resulting list. Construct the CanonicalizedResource element by concatenating all headers in this list into a single string.
00152 
00153 
00154 */
00155 std::string
00156 S3Api::
00157 getDigest(const std::string & verb,
00158           const std::string & bucket,
00159           const std::string & resource,
00160           const std::string & subResource,
00161           const std::string & contentType,
00162           const std::string & contentMd5,
00163           const std::string & date,
00164           const std::map<std::string, std::string> & headers)
00165 {
00166     string canonHeaderString;
00167 
00168     for (auto it = headers.begin(), end = headers.end();
00169          it != end;  ++it) {
00170         string key = lowercase(it->first);
00171         if (key.find("x-amz") != 0) continue;
00172 
00173         string value = it->second;
00174 
00175         canonHeaderString += key + ":" + value + "\n";
00176     }
00177 
00178     //cerr << "bucket = " << bucket << " resource = " << resource << endl;
00179 
00180     string canonResource
00181         = (bucket == "" ? "" : "/" + bucket)
00182         + resource
00183         + (subResource.empty() ? "" : "?")
00184         + subResource;
00185 
00186     string stringToSign
00187         = verb + "\n"
00188         + contentMd5 + "\n"
00189         + contentType + "\n"
00190         + date + "\n"
00191         + canonHeaderString
00192         + canonResource;
00193 
00194     return stringToSign;
00195 }
00196 
00197 std::string
00198 S3Api::
00199 sign(const std::string & stringToSign,
00200      const std::string & accessKey)
00201 {
00202     typedef CryptoPP::SHA1 Hash;
00203 
00204     size_t digestLen = Hash::DIGESTSIZE;
00205     byte digest[digestLen];
00206     CryptoPP::HMAC<Hash> hmac((byte *)accessKey.c_str(), accessKey.length());
00207     hmac.CalculateDigest(digest,
00208                          (byte *)stringToSign.c_str(),
00209                          stringToSign.length());
00210 
00211     // base64
00212     char outBuf[256];
00213 
00214     CryptoPP::Base64Encoder baseEncoder;
00215     baseEncoder.Put(digest, digestLen);
00216     baseEncoder.MessageEnd();
00217     size_t got = baseEncoder.Get((byte *)outBuf, 256);
00218     outBuf[got] = 0;
00219 
00220     //cerr << "got " << got << " characters" << endl;
00221 
00222     string base64digest(outBuf, outBuf + got - 1);
00223 
00224     //cerr << "base64digest.size() = " << base64digest.size() << endl;
00225 
00226     return base64digest;
00227 }
00228 
00229 S3Api::Response
00230 S3Api::SignedRequest::
00231 performSync() const
00232 {
00233     int numRetries = 7;
00234 
00235     for (unsigned i = 0;  i < numRetries;  ++i) {
00236         string responseHeaders;
00237         string body;
00238 
00239         try {
00240             responseHeaders.clear();
00241             body.clear();
00242 
00243             curlpp::Easy myRequest;
00244 
00245             using namespace curlpp::options;
00246             using namespace curlpp::infos;
00247 
00248             list<string> curlHeaders;
00249             for (unsigned i = 0;  i < params.headers.size();  ++i) {
00250                 curlHeaders.push_back(params.headers[i].first + ": "
00251                                       + params.headers[i].second);
00252             }
00253 
00254             curlHeaders.push_back("Date: " + params.date);
00255             curlHeaders.push_back("Authorization: " + auth);
00256 
00257             //cerr << "getting " << uri << " " << params.headers << endl;
00258 
00259             uint64_t totalBytesToTransfer = params.expectedBytesToDownload
00260                 + params.content.size;
00261             double expectedTimeSeconds
00262                 = totalBytesToTransfer
00263                 / 1000000.0
00264                 / bandwidthToServiceMbps;
00265             int timeout = 15 + std::max<int>(30, expectedTimeSeconds * 3);
00266 
00267 #if 0
00268             cerr << "totalBytesToTransfer = " << totalBytesToTransfer << endl;
00269             cerr << "expectedTimeSeconds = " << expectedTimeSeconds << endl;
00270             cerr << "timeout = " << timeout << endl;
00271 #endif
00272 
00273 #if 0
00274             if (params.verb == "GET") ;
00275             else if (params.verb == "POST") {
00276                 //myRequest.setOpt<Post>(true);
00277             }
00278             else if (params.verb == "PUT") {
00279                 myRequest.setOpt<Post>(true);
00280             }
00281             else throw ML::Exception("unknown verb " + params.verb);
00282 #endif
00283             //cerr << "!!!Setting params verb " << params.verb << endl;
00284             myRequest.setOpt<CustomRequest>(params.verb);
00285 
00286             myRequest.setOpt<curlpp::options::Url>(uri);
00287             //myRequest.setOpt<Verbose>(true);
00288             myRequest.setOpt<ErrorBuffer>((char *)0);
00289             myRequest.setOpt<Timeout>(timeout);
00290             myRequest.setOpt<NoSignal>(1);
00291 
00292             auto onData = [&] (char * data, size_t ofs1, size_t ofs2) -> size_t
00293                 {
00294                     //cerr << "called onData for " << ofs1 << " " << ofs2 << endl;
00295                     return 0;
00296                 };
00297 
00298             auto onWriteData = [&] (char * data, size_t ofs1, size_t ofs2) -> size_t
00299                 {
00300                     body.append(data, ofs1 * ofs2);
00301                     return ofs1 * ofs2;
00302                     //cerr << "called onWrite for " << ofs1 << " " << ofs2 << endl;
00303                     return 0;
00304                 };
00305 
00306             auto onProgress = [&] (double p1, double p2, double p3, double p4) -> int
00307                 {
00308                     cerr << "progress " << p1 << " " << p2 << " " << p3 << " "
00309                          << p4 << endl;
00310                     return 0;
00311                 };
00312 
00313             bool afterContinue = false;
00314 
00315             auto onHeader = [&] (char * data, size_t ofs1, size_t ofs2) -> size_t
00316                 {
00317                     string headerLine(data, ofs1 * ofs2);
00318                     if (headerLine.find("HTTP/1.1 100 Continue") == 0) {
00319                         afterContinue = true;
00320                     }
00321                     else if (afterContinue) {
00322                         if (headerLine == "\r\n")
00323                             afterContinue = false;
00324                     }
00325                     else {
00326                         responseHeaders.append(headerLine);
00327                         //cerr << "got header data " << headerLine << endl;
00328                     }
00329                     return ofs1 * ofs2;
00330                 };
00331 
00332             myRequest.setOpt<BoostHeaderFunction>(onHeader);
00333             myRequest.setOpt<BoostWriteFunction>(onWriteData);
00334             myRequest.setOpt<BoostProgressFunction>(onProgress);
00335             //myRequest.setOpt<Header>(true);
00336             if (params.content.data) {
00337                 string s(params.content.data, params.content.size);
00338                 myRequest.setOpt<PostFields>(s);
00339             }
00340             else
00341             {
00342                 string s;
00343                 myRequest.setOpt<PostFields>(s);
00344             }
00345             myRequest.setOpt<PostFieldSize>(params.content.size);
00346             curlHeaders.push_back(ML::format("Content-Length: %lld",
00347                                              params.content.size));
00348             curlHeaders.push_back("Transfer-Encoding:");
00349             curlHeaders.push_back("Content-Type:");
00350             myRequest.setOpt<curlpp::options::HttpHeader>(curlHeaders);
00351 
00352             myRequest.perform();
00353 
00354             Response response;
00355             response.body_ = body;
00356 
00357             curlpp::InfoGetter::get(myRequest, CURLINFO_RESPONSE_CODE,
00358                                     response.code_);
00359 
00360             if (response.code_ == 500) {
00361                 // Internal server error
00362                 // Wait 10 seconds and retry
00363                 cerr << "Service returned 500: " << endl;
00364                 cerr << "uri is " << uri << endl;
00365                 cerr << "response headers " << responseHeaders << endl;
00366                 cerr << "body is " << body << endl;
00367 
00368                 ML::sleep(10);
00369                 continue;  // retry
00370             }
00371 
00372             double bytesUploaded;
00373 
00374             curlpp::InfoGetter::get(myRequest, CURLINFO_SIZE_UPLOAD,
00375                                     bytesUploaded);
00376 
00377             //cerr << "uploaded " << bytesUploaded << " bytes" << endl;
00378 
00379             response.header_.parse(responseHeaders);
00380 
00381             return response;
00382         } catch (const curlpp::LibcurlRuntimeError & exc) {
00383             cerr << "libCurl returned an error with code " << exc.whatCode()
00384                  << endl;
00385             cerr << "error is " << curl_easy_strerror(exc.whatCode())
00386                  << endl;
00387             cerr << "uri is " << uri << endl;
00388             cerr << "headers are " << responseHeaders << endl;
00389             cerr << "body contains " << body.size() << " bytes" << endl;
00390 
00391             if (i < 2)
00392                 cerr << "retrying" << endl;
00393             else throw;
00394         }
00395     }
00396 
00397     throw ML::Exception("logic error");
00398 }
00399 
00400 std::string
00401 S3Api::
00402 signature(const RequestParams & request) const
00403 {
00404     string digest
00405         = S3Api::getDigestMulti(request.verb,
00406                                 request.bucket,
00407                                 request.resource, request.subResource,
00408                                 request.contentType, request.contentMd5,
00409                                 request.date, request.headers);
00410 
00411     //cerr << "digest = " << digest << endl;
00412 
00413     return S3Api::sign(digest, accessKey);
00414 }
00415 
00416 inline std::string uriEncode(const std::string & str)
00417 {
00418     return str;
00419 }
00420 
00421 S3Api::SignedRequest
00422 S3Api::
00423 prepare(const RequestParams & request) const
00424 {
00425     string protocol = defaultProtocol;
00426     if(protocol.length() == 0){
00427         throw ML::Exception("attempt to perform s3 request without a "
00428             "default protocol. (Could be caused by S3Api initialisation with "
00429             "the empty constructor.)");
00430     }
00431 
00432     SignedRequest result;
00433     result.params = request;
00434     result.bandwidthToServiceMbps = bandwidthToServiceMbps;
00435 
00436     if (request.resource.find("//") != string::npos)
00437         throw ML::Exception("attempt to perform s3 request with double slash: "
00438                             + request.resource);
00439 
00440     if (request.bucket.empty()) {
00441         result.uri = protocol + "://" + serviceUri
00442             + request.resource
00443             + (request.subResource != "" ? "?" + request.subResource : "");
00444     }
00445     else {
00446         result.uri = protocol + "://" + request.bucket + "." + serviceUri
00447             + request.resource
00448             + (request.subResource != "" ? "?" + request.subResource : "");
00449     }
00450 
00451     for (unsigned i = 0;  i < request.queryParams.size();  ++i) {
00452         if (i == 0 && request.subResource == "")
00453             result.uri += "?";
00454         else result.uri += "&";
00455         result.uri += uriEncode(request.queryParams[i].first)
00456             + "=" + uriEncode(request.queryParams[i].second);
00457     }
00458 
00459     string sig = signature(request);
00460     result.auth = "AWS " + accessKeyId + ":" + sig;
00461 
00462     //cerr << "result.uri = " << result.uri << endl;
00463     //cerr << "result.auth = " << result.auth << endl;
00464 
00465     return result;
00466 }
00467 
00468 S3Api::Response
00469 S3Api::
00470 get(const std::string & bucket,
00471     const std::string & resource,
00472     uint64_t expectedBytesToDownload,
00473     const std::string & subResource,
00474     const StrPairVector & headers,
00475     const StrPairVector & queryParams) const
00476 {
00477     RequestParams request;
00478     request.verb = "GET";
00479     request.bucket = bucket;
00480     request.resource = resource;
00481     request.subResource = subResource;
00482     request.headers = headers;
00483     request.queryParams = queryParams;
00484     request.date = Date::now().printRfc2616();
00485     request.expectedBytesToDownload = expectedBytesToDownload;
00486 
00487     return prepare(request).performSync();
00488 }
00489 
00491 S3Api::Response
00492 S3Api::
00493 post(const std::string & bucket,
00494      const std::string & resource,
00495      const std::string & subResource,
00496      const StrPairVector & headers,
00497      const StrPairVector & queryParams,
00498      const Content & content) const
00499 {
00500     RequestParams request;
00501     request.verb = "POST";
00502     request.bucket = bucket;
00503     request.resource = resource;
00504     request.subResource = subResource;
00505     request.headers = headers;
00506     request.queryParams = queryParams;
00507     request.date = Date::now().printRfc2616();
00508     request.content = content;
00509 
00510     return prepare(request).performSync();
00511 }
00512 
00513 S3Api::Response
00514 S3Api::
00515 put(const std::string & bucket,
00516     const std::string & resource,
00517     const std::string & subResource,
00518     const StrPairVector & headers,
00519     const StrPairVector & queryParams,
00520     const Content & content) const
00521 {
00522     RequestParams request;
00523     request.verb = "PUT";
00524     request.bucket = bucket;
00525     request.resource = resource;
00526     request.subResource = subResource;
00527     request.headers = headers;
00528     request.queryParams = queryParams;
00529     request.date = Date::now().printRfc2616();
00530     request.content = content;
00531 
00532     return prepare(request).performSync();
00533 }
00534 S3Api::Response
00535 S3Api::
00536 erase(const std::string & bucket,
00537     const std::string & resource,
00538     const std::string & subResource,
00539     const StrPairVector & headers,
00540     const StrPairVector & queryParams,
00541     const Content & content) const
00542 {
00543     RequestParams request;
00544     request.verb = "DELETE";
00545     request.bucket = bucket;
00546     request.resource = resource;
00547     request.subResource = subResource;
00548     request.headers = headers;
00549     request.queryParams = queryParams;
00550     request.date = Date::now().printRfc2616();
00551     request.content = content;
00552 
00553     return prepare(request).performSync();
00554 }
00555 
00556 template<typename T>
00557 T extract(tinyxml2::XMLNode * element, const std::string & path)
00558 {
00559     if (!element)
00560         throw ML::Exception("can't extract from missing element");
00561     //tinyxml2::XMLHandle handle(element);
00562 
00563     vector<string> splitPath = ML::split(path, '/');
00564     auto p = element;
00565     for (unsigned i = 0;  i < splitPath.size();  ++i) {
00566         p = p->FirstChildElement(splitPath[i].c_str());
00567         if (!p) {
00568             element->GetDocument()->Print();
00569             throw ML::Exception("required key " + splitPath[i]
00570                                 + " not found on path " + path);
00571         }
00572     }
00573 
00574     auto text = tinyxml2::XMLHandle(p).FirstChild().ToText();
00575 
00576     if (!text) {
00577         element->GetDocument()->Print();
00578         throw ML::Exception("no text at node "  + path);
00579     }
00580     return boost::lexical_cast<T>(text->Value());
00581 }
00582 
00583 template<typename T>
00584 T extractDef(tinyxml2::XMLNode * element, const std::string & path,
00585              const T & ifMissing)
00586 {
00587     if (!element) return ifMissing;
00588 
00589     vector<string> splitPath = ML::split(path, '/');
00590     auto p = element;
00591     for (unsigned i = 0;  i < splitPath.size();  ++i) {
00592         p = p->FirstChildElement(splitPath[i].c_str());
00593         if (!p)
00594             return ifMissing;
00595     }
00596 
00597     auto text = tinyxml2::XMLHandle(p).FirstChild().ToText();
00598 
00599     if (!text) return ifMissing;
00600 
00601     return boost::lexical_cast<T>(text->Value());
00602 }
00603 
00604 template<typename T>
00605 T extract(const std::unique_ptr<tinyxml2::XMLDocument> & doc,
00606           const std::string & path)
00607 {
00608     return extract<T>(doc.get(), path);
00609 }
00610 
00611 template<typename T>
00612 T extractDef(const std::unique_ptr<tinyxml2::XMLDocument> & doc,
00613              const std::string & path, const T & def)
00614 {
00615     return extractDef<T>(doc.get(), path, def);
00616 }
00617 
00618 namespace {
00619 
00620 } // file scope
00621 
00622 std::vector<std::pair<std::string, std::string> >
00623 S3Api::ObjectMetadata::
00624 getRequestHeaders() const
00625 {
00626     std::vector<std::pair<std::string, std::string> > result;
00627     if (redundancy == REDUNDANCY_REDUCED)
00628         result.push_back({"x-amz-storage-class", "REDUCED_REDUNDANCY"});
00629     else if(redundancy == REDUNDANCY_GLACIER)
00630         result.push_back({"x-amz-storage-class", "GLACIER"});
00631     if (serverSideEncryption == SSE_AES256)
00632         result.push_back({"x-amz-server-side-encryption", "AES256"});
00633     if (contentType != "")
00634         result.push_back({"Content-Type", contentType});
00635     if (contentEncoding != "")
00636         result.push_back({"Content-Encoding", contentEncoding});
00637     for (auto md: metadata) {
00638         result.push_back({"x-amz-meta-" + md.first, md.second});
00639     }
00640     return result;
00641 }
00642 
00643 S3Api::MultiPartUpload
00644 S3Api::
00645 obtainMultiPartUpload(const std::string & bucket,
00646                       const std::string & resource,
00647                       const ObjectMetadata & metadata) const
00648 {
00649     // Contains the resource without the leading slash
00650     string outputPrefix(resource, 1);
00651 
00652     // Check if there is already a multipart upload in progress
00653     auto inProgress = get(bucket, "/", 8192, "uploads", {},
00654                           { { "prefix", outputPrefix } })
00655         .bodyXml();
00656 
00657     using namespace tinyxml2;
00658 
00659     XMLHandle handle(*inProgress);
00660 
00661     auto upload
00662         = handle
00663         .FirstChildElement("ListMultipartUploadsResult")
00664         .FirstChildElement("Upload")
00665         .ToElement();
00666 
00667     string uploadId;
00668     vector<MultiPartUploadPart> parts;
00669 
00670     uint64_t partSize = 0;
00671     uint64_t currentOffset = 0;
00672 
00673     for (; upload; upload = upload->NextSiblingElement("Upload")) {
00674         XMLHandle uploadHandle(upload);
00675 
00676         auto foundNode
00677             = uploadHandle
00678             .FirstChildElement("UploadId")
00679             .FirstChild()
00680             .ToText();
00681 
00682         if (!foundNode)
00683             throw ML::Exception("found node has no ID");
00684 
00685         // TODO: check metadata, etc
00686 
00687         // Already an upload in progress
00688         uploadId = foundNode->Value();
00689 
00690         auto inProgressInfo = get(bucket, resource, 8192,
00691                                   "uploadId=" + uploadId)
00692             .bodyXml();
00693 
00694         //inProgressInfo->Print();
00695 
00696         XMLHandle handle(*inProgressInfo);
00697 
00698         auto foundPart
00699             = handle
00700             .FirstChildElement("ListPartsResult")
00701             .FirstChildElement("Part")
00702             .ToElement();
00703 
00704         int numPartsDone = 0;
00705         uint64_t biggestPartSize = 0;
00706         for (; foundPart;
00707              foundPart = foundPart->NextSiblingElement("Part"),
00708                  ++numPartsDone) {
00709             MultiPartUploadPart currentPart;
00710             currentPart.fromXml(foundPart);
00711             if (currentPart.partNumber != numPartsDone + 1) {
00712                 //cerr << "missing part " << numPartsDone + 1 << endl;
00713                 // from here we continue alone
00714                 break;
00715             }
00716             currentPart.startOffset = currentOffset;
00717             currentOffset += currentPart.size;
00718             biggestPartSize = std::max(biggestPartSize, currentPart.size);
00719             parts.push_back(currentPart);
00720         }
00721 
00722         partSize = biggestPartSize;
00723 
00724         //cerr << "numPartsDone = " << numPartsDone << endl;
00725         //cerr << "currentOffset = " << currentOffset
00726         //     << "dataSize = " << dataSize << endl;
00727     }
00728 
00729     if (uploadId.empty()) {
00730         //cerr << "getting new ID" << endl;
00731 
00732         vector<pair<string, string> > headers = metadata.getRequestHeaders();
00733         auto result = post(bucket, resource, "uploads", headers).bodyXml();
00734         //result->Print();
00735         //cerr << "result = " << result << endl;
00736 
00737         uploadId
00738             = extract<string>(result, "InitiateMultipartUploadResult/UploadId");
00739 
00740         //cerr << "new upload = " << uploadId << endl;
00741     }
00742         //return;
00743 
00744     MultiPartUpload result;
00745     result.parts.swap(parts);
00746     result.id = uploadId;
00747     return result;
00748 }
00749 
00750 std::string
00751 S3Api::
00752 finishMultiPartUpload(const std::string & bucket,
00753                       const std::string & resource,
00754                       const std::string & uploadId,
00755                       const std::vector<std::string> & etags) const
00756 {
00757     using namespace tinyxml2;
00758     // Finally, send back a response to join the parts together
00759     XMLDocument joinRequest;
00760     auto r = joinRequest.InsertFirstChild(joinRequest.NewElement("CompleteMultipartUpload"));
00761     for (unsigned i = 0;  i < etags.size();  ++i) {
00762         auto n = r->InsertEndChild(joinRequest.NewElement("Part"));
00763         n->InsertEndChild(joinRequest.NewElement("PartNumber"))
00764             ->InsertEndChild(joinRequest.NewText(ML::format("%d", i + 1).c_str()));
00765         n->InsertEndChild(joinRequest.NewElement("ETag"))
00766             ->InsertEndChild(joinRequest.NewText(etags[i].c_str()));
00767     }
00768     auto joinResponse
00769         = post(bucket, resource, "uploadId=" + uploadId,
00770                   {}, {}, joinRequest);
00771 
00772     //cerr << joinResponse.bodyXmlStr() << endl;
00773 
00774     auto joinResponseXml = joinResponse.bodyXml();
00775 
00776     try {
00777 
00778         string etag = extract<string>(joinResponseXml,
00779                                       "CompleteMultipartUploadResult/ETag");
00780         return etag;
00781     } catch (const std::exception & exc) {
00782         cerr << "--- request is " << endl;
00783         joinRequest.Print();
00784         cerr << "error completing multipart upload: " << exc.what() << endl;
00785         throw;
00786     }
00787 }
00788 
00789 void
00790 S3Api::MultiPartUploadPart::
00791 fromXml(tinyxml2::XMLElement * element)
00792 {
00793     partNumber = extract<int>(element, "PartNumber");
00794     lastModified = extract<string>(element, "LastModified");
00795     etag = extract<string>(element, "ETag");
00796     size = extract<uint64_t>(element, "Size");
00797     done = true;
00798 }
00799 
00800 std::string
00801 S3Api::
00802 upload(const char * data,
00803        size_t dataSize,
00804        const std::string & bucket,
00805        const std::string & resource,
00806        CheckMethod check,
00807        const ObjectMetadata & metadata,
00808        int numInParallel)
00809 {
00810     if (resource == "" || resource[0] != '/')
00811         throw ML::Exception("resource should start with a /");
00812     // Contains the resource without the leading slash
00813     string outputPrefix(resource, 1);
00814 
00815     //cerr << "need to upload " << dataSize << " bytes" << endl;
00816 
00817     // Check if it's already there
00818 
00819     if (check == CM_SIZE || check == CM_MD5_ETAG) {
00820         auto existingResource
00821             = get(bucket, "/", 8192, "", {},
00822                   { { "prefix", outputPrefix } })
00823             .bodyXml();
00824 
00825         //cerr << "existing" << endl;
00826         //existingResource->Print();
00827 
00828         auto foundContent
00829             = tinyxml2::XMLHandle(*existingResource)
00830             .FirstChildElement("ListBucketResult")
00831             .FirstChildElement("Contents")
00832             .ToElement();
00833 
00834         if (foundContent) {
00835             uint64_t size = extract<uint64_t>(foundContent, "Size");
00836             std::string etag = extract<string>(foundContent, "ETag");
00837             std::string lastModified = extract<string>(foundContent, "LastModified");
00838 
00839             if (size == dataSize) {
00840                 //cerr << "already uploaded" << endl;
00841                 return etag;
00842             }
00843         }
00844     }
00845 
00846     auto upload = obtainMultiPartUpload(bucket, resource, metadata);
00847 
00848     uint64_t partSize = 0;
00849     uint64_t currentOffset = 0;
00850 
00851     for (auto & part: upload.parts) {
00852         partSize = std::max(partSize, part.size);
00853         currentOffset = std::max(currentOffset, part.startOffset + part.size);
00854     }
00855 
00856     if (partSize == 0) {
00857         if (dataSize < 5 * 1024 * 1024) {
00858             partSize = dataSize;
00859         }
00860         else {
00861             partSize = 8 * 1024 * 1024;
00862             while (dataSize / partSize > 150) {
00863                 partSize *= 2;
00864             }
00865         }
00866     }
00867 
00868     string uploadId = upload.id;
00869     vector<MultiPartUploadPart> & parts = upload.parts;
00870 
00871     uint64_t offset = currentOffset;
00872     for (int i = 0;  offset < dataSize;  offset += partSize, ++i) {
00873         MultiPartUploadPart part;
00874         part.partNumber = parts.size() + 1;
00875         part.startOffset = offset;
00876         part.size = min<uint64_t>(partSize, dataSize - offset);
00877         parts.push_back(part);
00878     }
00879     // we are dealing with an empty file
00880     if(parts.empty())
00881     {
00882         MultiPartUploadPart part;
00883         part.partNumber = parts.size() + 1;
00884         part.startOffset = offset;
00885         part.size = min<uint64_t>(partSize, dataSize - offset);
00886         parts.push_back(part);
00887     }
00888     //cerr << "total parts = " << parts.size() << endl;
00889 
00890     //if (!foundId)
00891 
00892     uint64_t bytesDone = 0;
00893     Date start;
00894 
00895     auto touchByte = [] (const char * c)
00896         {
00897 
00898             __asm__
00899             (" # [in]"
00900              :
00901              : [in] "r" (*c)
00902              :
00903              );
00904         };
00905 
00906     auto touch = [&] (const char * start, size_t size)
00907         {
00908             for (size_t i = 0;  i < size;  i += 4096) {
00909                 touchByte(start + i);
00910             }
00911         };
00912 
00913     int readyPart = 0;
00914 
00915     auto doPart = [&] (int i)
00916         {
00917             MultiPartUploadPart & part = parts[i];
00918             //cerr << "part " << i << " with " << part.size << " bytes" << endl;
00919 
00920             // Wait until we're allowed to go
00921             for (;;) {
00922                 int isReadyPart = readyPart;
00923                 if (isReadyPart >= i) {
00924                     break;
00925                 }
00926                 futex_wait(readyPart, isReadyPart);
00927             }
00928 
00929             // First touch the input range
00930             touch(data + part.startOffset,
00931                   part.size);
00932 
00933             //cerr << "done touching " << i << endl;
00934 
00935             // Now let the next one go
00936             ExcAssertEqual(readyPart, i);
00937             ++readyPart;
00938 
00939             futex_wake(readyPart);
00940 
00941             string md5 = md5HashToHex(data + part.startOffset,
00942                                       part.size);
00943 
00944             if (part.done) {
00945                 //cerr << "etag is " << part.etag << endl;
00946                 if ('"' + md5 + '"' == part.etag) {
00947                     //cerr << "part " << i << " verified done" << endl;
00948                     return;
00949                 }
00950             }
00951 
00952             auto putResult = put(bucket, resource,
00953                                     ML::format("partNumber=%d&uploadId=%s",
00954                                                part.partNumber, uploadId),
00955                                     {}, {},
00956                                     S3Api::Content(data
00957                                                    + part.startOffset,
00958                                                    part.size,
00959                                                    md5));
00960 
00961             //cerr << "result of part " << i << " is "
00962             //<< putResult.bodyXmlStr() << endl;
00963 
00964             if (putResult.code_ != 200) {
00965                 part.etag = "ERROR";
00966                 cerr << putResult.bodyXmlStr() << endl;
00967                 throw ML::Exception("put didn't work: %d", (int)putResult.code_);
00968             }
00969 
00970 
00971 
00972             ML::atomic_add(bytesDone, part.size);
00973             double seconds = Date::now().secondsSince(start);
00974             cerr << "done " << bytesDone / 1024 / 1024 << " MB in "
00975             << seconds << " s at "
00976             << bytesDone / 1024.0 / 1024 / seconds
00977             << " MB/second" << endl;
00978 
00979             //cerr << putResult.header_ << endl;
00980 
00981             string etag = putResult.getHeader("etag");
00982 
00983             //cerr << "etag = " << etag << endl;
00984 
00985             part.etag = etag;
00986         };
00987 
00988     int currentPart = 0;
00989 
00990     start = Date::now();
00991 
00992     auto doPartThread = [&] ()
00993         {
00994             for (;;) {
00995                 if (currentPart >= parts.size()) break;
00996                 int partToDo = __sync_fetch_and_add(&currentPart, 1);
00997                 if (partToDo >= parts.size()) break;
00998                 doPart(partToDo);
00999             }
01000         };
01001 
01002     if (numInParallel == -1)
01003         numInParallel = 16;
01004 
01005     boost::thread_group tg;
01006     for (unsigned i = 0;  i < numInParallel;  ++i)
01007         tg.create_thread(doPartThread);
01008 
01009     tg.join_all();
01010 
01011     vector<string> etags;
01012     for (unsigned i = 0;  i < parts.size();  ++i) {
01013         etags.push_back(parts[i].etag);
01014     }
01015     string finalEtag = finishMultiPartUpload(bucket, resource, uploadId, etags);
01016     return finalEtag;
01017 }
01018 
01019 std::string
01020 S3Api::
01021 upload(const char * data,
01022        size_t bytes,
01023        const std::string & uri,
01024        CheckMethod check,
01025        const ObjectMetadata & metadata,
01026        int numInParallel)
01027 {
01028     string bucket, object;
01029     std::tie(bucket, object) = parseUri(uri);
01030     return upload(data, bytes, bucket, "/" + object, check, metadata,
01031                   numInParallel);
01032 }
01033 
01034 S3Api::ObjectInfo::
01035 ObjectInfo()
01036     : size(0), exists(false)
01037 {
01038 }
01039 
01040 S3Api::ObjectInfo::
01041 ObjectInfo(tinyxml2::XMLNode * element)
01042 {
01043     size = extract<uint64_t>(element, "Size");
01044     key  = extract<string>(element, "Key");
01045     string lastModifiedStr = extract<string>(element, "LastModified");
01046     lastModified = Date::parseIso8601(lastModifiedStr);
01047     etag = extract<string>(element, "ETag");
01048     ownerId = extract<string>(element, "Owner/ID");
01049     ownerName = extractDef<string>(element, "Owner/DisplayName", "");
01050     storageClass = extract<string>(element, "StorageClass");
01051     exists = true;
01052 }
01053 
01054 void
01055 S3Api::
01056 forEachObject(const std::string & bucket,
01057               const std::string & prefix,
01058               const OnObject & onObject,
01059               const OnSubdir & onSubdir,
01060               const std::string & delimiter,
01061               int depth) const
01062 {
01063     using namespace tinyxml2;
01064 
01065     //cerr << "forEachObject under " << prefix << endl;
01066 
01067     string marker;
01068     do {
01069         StrPairVector queryParams;
01070         if (prefix != "")
01071             queryParams.push_back({"prefix", prefix});
01072         if (delimiter != "")
01073             queryParams.push_back({"delimiter", delimiter});
01074         if (marker != "")
01075             queryParams.push_back({"marker", marker});
01076 
01077         auto listingResult = get(bucket, "/", 8192, "",
01078                                  {}, queryParams);
01079         auto listingResultXml = listingResult.bodyXml();
01080 
01081         //listingResultXml->Print();
01082 
01083         string foundPrefix
01084             = extractDef<string>(listingResult, "ListBucketResult/Prefix", "");
01085         string truncated
01086             = extract<string>(listingResult, "ListBucketResult/IsTruncated");
01087         if (truncated == "true") {
01088             marker = extract<string>(listingResult, "ListBucketResult/Marker");
01089         }
01090         else marker = "";
01091 
01092         auto foundObject
01093             = XMLHandle(*listingResultXml)
01094             .FirstChildElement("ListBucketResult")
01095             .FirstChildElement("Contents")
01096             .ToElement();
01097 
01098         for (; onObject && foundObject;
01099              foundObject = foundObject->NextSiblingElement("Contents")) {
01100             ObjectInfo info(foundObject);
01101 
01102             ExcAssertEqual(info.key.find(foundPrefix), 0);
01103             string basename(info.key, foundPrefix.length());
01104 
01105             if (!onObject(foundPrefix, basename, info, depth))
01106                 break;
01107         }
01108 
01109         auto foundDir
01110             = XMLHandle(*listingResultXml)
01111             .FirstChildElement("ListBucketResult")
01112             .FirstChildElement("CommonPrefixes")
01113             .ToElement();
01114 
01115         for (; onSubdir && foundDir;
01116              foundDir = foundDir->NextSiblingElement("CommonPrefixes")) {
01117             string dirName = extract<string>(foundDir, "Prefix");
01118 
01119             // Strip off the delimiter
01120             if (dirName.rfind(delimiter) == dirName.size() - delimiter.size()) {
01121                 dirName = string(dirName, 0, dirName.size() - delimiter.size());
01122                 ExcAssertEqual(dirName.find(prefix), 0);
01123                 dirName = string(dirName, prefix.size());
01124             }
01125             if (onSubdir(foundPrefix, dirName, depth)) {
01126                 string newPrefix = foundPrefix + dirName + "/";
01127                 //cerr << "newPrefix = " << newPrefix << endl;
01128                 forEachObject(bucket, newPrefix, onObject, onSubdir, delimiter,
01129                               depth + 1);
01130             }
01131         }
01132     } while (marker != "");
01133 
01134     //cerr << "done scanning" << endl;
01135 }
01136 
01137 S3Api::ObjectInfo
01138 S3Api::
01139 getObjectInfo(const std::string & bucket,
01140               const std::string & object) const
01141 {
01142     StrPairVector queryParams;
01143     queryParams.push_back({"prefix", object});
01144 
01145     auto listingResult = get(bucket, "/", 8192, "", {}, queryParams);
01146 
01147     if (listingResult.code_ != 200) {
01148         cerr << listingResult.bodyXmlStr() << endl;
01149         throw ML::Exception("error getting object");
01150     }
01151 
01152     auto listingResultXml = listingResult.bodyXml();
01153 
01154     auto foundObject
01155         = tinyxml2::XMLHandle(*listingResultXml)
01156         .FirstChildElement("ListBucketResult")
01157         .FirstChildElement("Contents")
01158         .ToElement();
01159 
01160     if (!foundObject)
01161         throw ML::Exception("object " + object + " not found in bucket "
01162                             + bucket);
01163 
01164     ObjectInfo info(foundObject);
01165 
01166     if(info.key != object){
01167         throw ML::Exception("object " + object + " not found in bucket "
01168                             + bucket);
01169     }
01170 
01171 
01172     return info;
01173 }
01174 
01175 S3Api::ObjectInfo
01176 S3Api::
01177 tryGetObjectInfo(const std::string & bucket,
01178                  const std::string & object) const
01179 {
01180     StrPairVector queryParams;
01181     queryParams.push_back({"prefix", object});
01182 
01183     auto listingResult = get(bucket, "/", 8192, "", {}, queryParams);
01184     if (listingResult.code_ != 200) {
01185         cerr << listingResult.bodyXmlStr() << endl;
01186         throw ML::Exception("error getting object request: %d",
01187                             listingResult.code_);
01188     }
01189     auto listingResultXml = listingResult.bodyXml();
01190 
01191     auto foundObject
01192         = tinyxml2::XMLHandle(*listingResultXml)
01193         .FirstChildElement("ListBucketResult")
01194         .FirstChildElement("Contents")
01195         .ToElement();
01196 
01197     if (!foundObject)
01198         return ObjectInfo();
01199 
01200     ObjectInfo info(foundObject);
01201 
01202     if(info.key != object){
01203         return ObjectInfo();
01204     }
01205 
01206     return info;
01207 }
01208 
01209 S3Api::ObjectInfo
01210 S3Api::
01211 getObjectInfo(const std::string & uri) const
01212 {
01213     string bucket, object;
01214     std::tie(bucket, object) = parseUri(uri);
01215     return getObjectInfo(bucket, object);
01216 }
01217 
01218 S3Api::ObjectInfo
01219 S3Api::
01220 tryGetObjectInfo(const std::string & uri) const
01221 {
01222     string bucket, object;
01223     std::tie(bucket, object) = parseUri(uri);
01224     return tryGetObjectInfo(bucket, object);
01225 }
01226 
01227 void
01228 S3Api::
01229 download(const std::string & uri,
01230          const OnChunk & onChunk,
01231          ssize_t startOffset,
01232          ssize_t endOffset) const
01233 {
01234     string bucket, object;
01235     std::tie(bucket, object) = parseUri(uri);
01236     return download(bucket, object, onChunk, startOffset, endOffset);
01237 }
01238 
01239 void
01240 S3Api::
01241 download(const std::string & bucket,
01242          const std::string & object,
01243          const OnChunk & onChunk,
01244          ssize_t startOffset,
01245          ssize_t endOffset) const
01246 {
01247 
01248     ObjectInfo info = getObjectInfo(bucket, object);
01249     if(info.storageClass == "GLACIER"){
01250         throw ML::Exception("Cannot download [" + info.key + "] because its "
01251             "storage class is [GLACIER]");
01252     }
01253 
01254     size_t chunkSize = 128 * 1024 * 1024;  // 128MB probably good
01255 
01256     struct Part {
01257         uint64_t offset;
01258         uint64_t size;
01259     };
01260 
01261     if (endOffset == -1)
01262         endOffset = info.size;
01263 
01264     //cerr << "getting " << endOffset << " bytes" << endl;
01265 
01266     vector<Part> parts;
01267 
01268     for (uint64_t offset = 0;  offset < endOffset;  offset += chunkSize) {
01269         Part part;
01270         part.offset = offset;
01271         part.size = std::min<ssize_t>(endOffset - offset, chunkSize);
01272         parts.push_back(part);
01273     }
01274 
01275     //cerr << "getting in " << parts.size() << " parts" << endl;
01276 
01277     uint64_t bytesDone = 0;
01278     Date start;
01279     bool failed = false;
01280 
01281     auto doPart = [&] (int i)
01282         {
01283             if (failed) return;
01284 
01285             Part & part = parts[i];
01286             //cerr << "part " << i << " with " << part.size << " bytes" << endl;
01287 
01288             StrPairVector headerParams;
01289             headerParams.push_back({"range",
01290                         ML::format("bytes=%zd-%zd",
01291                                    part.offset,
01292                                    part.offset + part.size - 1)});
01293 
01294             auto partResult = get(bucket, "/" + object, part.size, "", headerParams, {});
01295             if (partResult.code_ != 206) {
01296                 cerr << "error getting part " << i << ": "
01297                      << partResult.bodyXmlStr() << endl;
01298                 failed = true;
01299                 return;
01300             }
01301 
01302             ExcAssertEqual(partResult.body_.size(), part.size);
01303 
01304             onChunk(partResult.body_.c_str(),
01305                     part.size,
01306                     i,
01307                     part.offset,
01308                     info.size);
01309 
01310             ML::atomic_add(bytesDone, part.size);
01311             double seconds = Date::now().secondsSince(start);
01312             cerr << "done " << bytesDone / 1024 / 1024 << " MB in "
01313             << seconds << " s at "
01314             << bytesDone / 1024.0 / 1024 / seconds
01315             << " MB/second" << endl;
01316         };
01317 
01318     int currentPart = 0;
01319 
01320     start = Date::now();
01321 
01322     auto doPartThread = [&] ()
01323         {
01324             for (;;) {
01325                 if (currentPart >= parts.size()) break;
01326                 int partToDo = __sync_fetch_and_add(&currentPart, 1);
01327                 if (partToDo >= parts.size()) break;
01328                 doPart(partToDo);
01329             }
01330         };
01331 
01332     boost::thread_group tg;
01333     for (unsigned i = 0;  i < 16;  ++i)
01334         tg.create_thread(doPartThread);
01335 
01336     tg.join_all();
01337 
01338     if (failed)
01339         throw ML::Exception("Failed to get part");
01340 }
01341 
01346 void
01347 S3Api::
01348 downloadToFile(const std::string & uri, const std::string & outfile,
01349         ssize_t endOffset) const
01350 {
01351 
01352     auto info = getObjectInfo(uri);
01353     if (!info){
01354         throw ML::Exception("unknown s3 object");
01355     }
01356     if(endOffset == -1 || endOffset > info.size){
01357         endOffset = info.size;
01358     }
01359 
01360     ofstream myFile;
01361     myFile.open(outfile.c_str());
01362 
01363     uint64_t done = 0;
01364 
01365     auto onChunk = [&] (const char * data,
01366                             size_t size,
01367                             int chunkIndex,
01368                             uint64_t offset,
01369                             uint64_t totalSize){
01370         ExcAssertEqual(info.size, totalSize);
01371         ExcAssertLessEqual(offset + size, totalSize);
01372         myFile.seekp(offset);
01373         myFile.write(data, size);
01374         ML::atomic_add(done, size);
01375     };
01376     download(uri, onChunk, 0, endOffset);
01377 }
01378 
01379 struct StreamingDownloadSource {
01380 
01381     StreamingDownloadSource(const S3Api * owner,
01382                             const std::string & bucket,
01383                             const std::string & object)
01384     {
01385         impl.reset(new Impl());
01386         impl->owner = owner;
01387         impl->bucket = bucket;
01388         impl->object = object;
01389         impl->info = owner->getObjectInfo(bucket, object);
01390         impl->chunkSize = 1024 * 1024;  // start with 1MB and ramp up
01391 
01392         impl->start();
01393     }
01394 
01395     typedef char char_type;
01396     struct category
01397         : //input_seekable,
01398         boost::iostreams::input,
01399         boost::iostreams::device_tag,
01400         boost::iostreams::closable_tag
01401     { };
01402 
01403     struct Impl {
01404         Impl()
01405             : owner(0), offset(0), shutdown(false),
01406               readPartOffset(0), readPartDone(1)
01407         {
01408         }
01409 
01410         ~Impl()
01411         {
01412             stop();
01413         }
01414 
01415         const S3Api * owner;
01416         S3Api::ObjectInfo info;
01417         std::string bucket;
01418         std::string object;
01419         size_t offset;
01420         size_t chunkSize;
01421         size_t bytesDone;
01422         bool shutdown;
01423         boost::thread_group tg;
01424 
01425         string readPart;
01426         size_t readPartOffset;
01427 
01428         Date startDate;
01429 
01430         size_t writeOffset, readOffset;
01431         int readPartReady, readPartDone, writePartNumber, allocPartNumber;
01432 
01433 
01434         void start()
01435         {
01436             readPartOffset = offset = bytesDone = writeOffset
01437                 = writePartNumber = allocPartNumber = readOffset = 0;
01438             readPartReady = 0;
01439             readPartDone = 0;
01440             startDate = Date::now();
01441             for (unsigned i = 0;  i < 5;  ++i)
01442                 tg.create_thread(boost::bind<void>(&Impl::runThread, this));
01443         }
01444 
01445         void stop()
01446         {
01447             shutdown = true;
01448             futex_wake(writePartNumber);
01449             futex_wake(readPartReady);
01450             futex_wake(readPartDone);
01451             tg.join_all();
01452         }
01453 
01454         std::streamsize read(char_type* s, std::streamsize n)
01455         {
01456             if (readOffset == info.size)
01457                 return -1;
01458 
01459             //Date start = Date::now();
01460 
01461 #if 0
01462             cerr << "read: readPartReady = " << readPartReady
01463                  << " readPartDone = " << readPartDone
01464                  << " writePartNumber = " << writePartNumber
01465                  << " allocPartNumber = " << allocPartNumber
01466                  << " readPartOffset = " << readPartOffset
01467                  << endl;
01468 #endif
01469 
01470             //cerr << "trying to read " << n << " characters at offset "
01471             //     << readPartOffset << " of "
01472             //     << readPart.size() << endl;
01473 
01474             while (readPartDone == readPartReady) {
01475                 //cerr << "waiting for part " << readPartDone << endl;
01476                 ML::futex_wait(readPartReady, readPartDone);
01477             }
01478 
01479             ExcAssertGreaterEqual(readPartReady, readPartDone);
01480 
01481             //cerr << "ready to start reading" << endl;
01482 
01483             //cerr << "trying to read " << n << " characters at offset "
01484             //     << readPartOffset << " of "
01485             //     << readPart.size() << endl;
01486 
01487             ExcAssertGreaterEqual(readPart.size(), readPartOffset);
01488 
01489             size_t toDo = std::min<size_t>(readPart.size() - readPartOffset,
01490                                            n);
01491 
01492             //cerr << "toDo = " << toDo << endl;
01493 
01494             std::copy(readPart.c_str() + readPartOffset,
01495                       readPart.c_str() + readPartOffset + toDo,
01496                       s);
01497 
01498             readPartOffset += toDo;
01499             readOffset += toDo;
01500             if (readPartOffset == readPart.size()) {
01501                 //cerr << "finished part " << readPartDone << endl;
01502                 ++readPartDone;
01503                 ML::futex_wake(readPartDone);
01504             }
01505 
01506             //Date end = Date::now();
01507             //double elapsed = end.secondsSince(start);
01508             //if (elapsed > 0.0001)
01509             //    cerr << "read elapsed " << elapsed << endl;
01510 
01511             return toDo;
01512         }
01513 
01514         void runThread()
01515         {
01516             // Maximum chunk size is what we can do in 30 seconds
01517             size_t maxChunkSize
01518                 = owner->bandwidthToServiceMbps
01519                 * 15.0 * 1000000;
01520 
01521             while (!shutdown) {
01522                 // Go in the lottery to see which part I need to download
01523                 int partToDo = __sync_fetch_and_add(&allocPartNumber, 1);
01524 
01525                 //cerr << "partToDo = " << partToDo << endl;
01526 
01527                 // Wait until it's my turn to increment the offset
01528                 while (!shutdown) {
01529                     int currentWritePart = writePartNumber;
01530                     if (currentWritePart == partToDo) break;
01531                     ML::futex_wait(writePartNumber, currentWritePart);
01532                 }
01533                 if (shutdown) return;
01534 
01535                 //cerr << "ready" << endl;
01536 
01537                 ExcAssertEqual(writePartNumber, partToDo);
01538 
01539                 // If we're done then get out
01540                 if (writeOffset >= info.size || shutdown) return;  // done
01541 
01542                 if (partToDo && partToDo % 2 == 0 && chunkSize < maxChunkSize)
01543                     chunkSize *= 2;
01544 
01545                 size_t start = writeOffset;
01546                 size_t end = std::min<size_t>(writeOffset + chunkSize,
01547                                               info.size);
01548 
01549                 writeOffset = end;
01550 
01551                 // Finished my turn to increment.  Wake up the next thread
01552                 ++writePartNumber;
01553                 futex_wake(writePartNumber);
01554 
01555                 // Download my part
01556                 S3Api::StrPairVector headerParams;
01557                 headerParams.push_back({"range",
01558                             ML::format("bytes=%zd-%zd",
01559                                        start, end - 1)});
01560 
01561                 //cerr << "downloading" << endl;
01562 
01563                 auto partResult
01564                     = owner->get(bucket, "/" + object, end - start,
01565                                  "", headerParams, {});
01566                 if (partResult.code_ != 206) {
01567                     cerr << "error getting part "
01568                          << partResult.bodyXmlStr() << endl;
01569                     return;
01570                 }
01571 
01572                 //cerr << "done downloading" << endl;
01573 
01574                 // Wait until the reader needs my part
01575                 while (!shutdown) {
01576                     int currentReadPart = readPartDone;
01577                     if (currentReadPart == partToDo) break;
01578                     ML::futex_wait(readPartDone, currentReadPart);
01579                 }
01580                 if (shutdown) return;
01581 
01582                 //cerr << "ready for part " << partToDo << endl;
01583 
01584                 bytesDone += partResult.body_.size();
01585 
01586                 double elapsed = Date::now().secondsSince(startDate);
01587 
01588                 // Give my part to the reader
01589                 readPart = partResult.body();
01590                 readPartOffset = 0;
01591 
01592                 // Wake up the reader
01593                 ++readPartReady;
01594                 ML::futex_wake(readPartReady);
01595 
01596                 cerr << "done " << bytesDone << " at "
01597                      << bytesDone / elapsed / 1024 / 1024
01598                      << "MB/second" << endl;
01599             }
01600             cerr << "finished thread" << endl;
01601         }
01602     };
01603 
01604     std::shared_ptr<Impl> impl;
01605 
01606     std::streamsize read(char_type* s, std::streamsize n)
01607     {
01608         return impl->read(s, n);
01609     }
01610 
01611     bool is_open() const
01612     {
01613         return !!impl;
01614     }
01615 
01616     void close()
01617     {
01618         impl.reset();
01619     }
01620 };
01621 
01622 std::auto_ptr<std::streambuf>
01623 S3Api::
01624 streamingDownload(const std::string & bucket,
01625                   const std::string & object,
01626                   ssize_t startOffset,
01627                   ssize_t endOffset,
01628                   const OnChunk & onChunk) const
01629 {
01630     std::auto_ptr<std::streambuf> result;
01631     result.reset(new boost::iostreams::stream_buffer<StreamingDownloadSource>
01632                  (StreamingDownloadSource(this, bucket, object),
01633                   131072));
01634     return result;
01635 }
01636 
01637 struct StreamingUploadSource {
01638 
01639     StreamingUploadSource(const S3Api * owner,
01640                           const std::string & bucket,
01641                           const std::string & object,
01642                           const S3Api::ObjectMetadata & metadata)
01643     {
01644         impl.reset(new Impl());
01645         impl->owner = owner;
01646         impl->bucket = bucket;
01647         impl->object = object;
01648         impl->metadata = metadata;
01649         impl->chunkSize = 8 * 1024 * 1024;  // start with 8MB and ramp up
01650 
01651         impl->start();
01652     }
01653 
01654     typedef char char_type;
01655     struct category
01656         : public boost::iostreams::output,
01657                         boost::iostreams::device_tag,
01658                         boost::iostreams::closable_tag
01659     {
01660     };
01661 
01662     struct Impl {
01663         Impl()
01664             : owner(0), offset(0), chunkIndex(0), shutdown(false),
01665               chunks(16)
01666         {
01667         }
01668 
01669         ~Impl()
01670         {
01671             //cerr << "destroying streaming upload at " << object << endl;
01672             stop();
01673         }
01674 
01675         const S3Api * owner;
01676         std::string bucket;
01677         std::string object;
01678         S3Api::ObjectMetadata metadata;
01679         std::string uploadId;
01680         size_t offset;
01681         size_t chunkSize;
01682         size_t chunkIndex;
01683         bool shutdown;
01684         boost::thread_group tg;
01685 
01686         Date startDate;
01687 
01688         struct Chunk {
01689 
01690             void init(uint64_t offset, size_t capacity, int index)
01691             {
01692                 this->offset = offset;
01693                 this->size = 0;
01694                 this->capacity = capacity;
01695                 this->index = index;
01696                 this->data = new char[capacity];
01697             }
01698 
01699             size_t append(const char * input, size_t n)
01700             {
01701                 size_t todo = std::min(n, capacity - size);
01702                 std::copy(input, input + todo, data + size);
01703                 size += todo;
01704                 return todo;
01705             }
01706 
01707             char * data;
01708             size_t size;
01709             size_t capacity;
01710             int index;
01711             uint64_t offset;
01712         };
01713 
01714         Chunk current;
01715 
01716         RingBufferSWMR<Chunk> chunks;
01717 
01718         boost::mutex etagsLock;
01719         std::vector<std::string> etags;
01720         std::exception_ptr exc;
01721 
01722         void start()
01723         {
01724             auto upload = owner->obtainMultiPartUpload(bucket, "/" + object, metadata);
01725 
01726             uploadId = upload.id;
01727             //cerr << "uploadId = " << uploadId << endl;
01728 
01729             startDate = Date::now();
01730             for (unsigned i = 0;  i < 8;  ++i)
01731                 tg.create_thread(boost::bind<void>(&Impl::runThread, this));
01732             current.init(0, chunkSize, 0);
01733         }
01734 
01735         void stop()
01736         {
01737             shutdown = true;
01738             tg.join_all();
01739         }
01740 
01741         std::streamsize write(const char_type* s, std::streamsize n)
01742         {
01743             if (exc)
01744                 std::rethrow_exception(exc);
01745 
01746             size_t done = current.append(s, n);
01747             offset += done;
01748             if (done < n) {
01749                 flush();
01750                 done = current.append(s + done, n - done);
01751             }
01752 
01753             //cerr << "writing " << n << " characters returned "
01754             //     << done << endl;
01755 
01756             if (exc)
01757                 std::rethrow_exception(exc);
01758 
01759             return done;
01760         }
01761 
01762         void flush()
01763         {
01764             if (current.size == 0) return;
01765             chunks.push(current);
01766             ++chunkIndex;
01767 
01768             // Get bigger for bigger files
01769             if (chunkIndex % 5 == 0 && chunkSize < 64 * 1024 * 1024)
01770                 chunkSize *= 2;
01771 
01772             current.init(offset, chunkSize, chunkIndex);
01773         }
01774 
01775         void finish()
01776         {
01777             if (exc)
01778                 std::rethrow_exception(exc);
01779             //cerr << "pushing last chunk " << chunkIndex << endl;
01780             flush();
01781             //cerr << "waiting for everything to stop" << endl;
01782             chunks.waitUntilEmpty();
01783             //cerr << "empty" << endl;
01784             stop();
01785             //cerr << "stopped" << endl;
01786             string etag = owner->finishMultiPartUpload(bucket, "/" + object,
01787                                                        uploadId,
01788                                                        etags);
01789             //cerr << "final etag is " << etag << endl;
01790 
01791             if (exc)
01792                 std::rethrow_exception(exc);
01793 
01794             double elapsed = Date::now().secondsSince(startDate);
01795 
01796             cerr << "uploaded " << offset / 1024.0 / 1024.0
01797                  << "MB in " << elapsed << "s at "
01798                  << offset / 1024.0 / 1024.0 / elapsed
01799                  << "MB/s" << " to " << etag << endl;
01800         }
01801 
01802         void runThread()
01803         {
01804             while (!shutdown) {
01805                 Chunk chunk;
01806                 if (chunks.tryPop(chunk, 0.01)) {
01807                     if (exc)
01808                         return;
01809                     try {
01810                         //cerr << "got chunk " << chunk.index
01811                         //     << " with " << chunk.size << " bytes at index "
01812                         //     << chunk.index << endl;
01813 
01814                         // Upload the data
01815                         string md5 = md5HashToHex(chunk.data, chunk.size);
01816 
01817                         auto putResult = owner->put(bucket, "/" + object,
01818                                                     ML::format("partNumber=%d&uploadId=%s",
01819                                                                chunk.index + 1, uploadId),
01820                                                     {}, {},
01821                                                     S3Api::Content(chunk.data,
01822                                                                    chunk.size,
01823                                                                    md5));
01824                         if (putResult.code_ != 200) {
01825                             cerr << putResult.bodyXmlStr() << endl;
01826 
01827                             throw ML::Exception("put didn't work: %d", (int)putResult.code_);
01828                         }
01829                         string etag = putResult.getHeader("etag");
01830                         //cerr << "successfully uploaded part " << chunk.index
01831                         //     << " with etag " << etag << endl;
01832 
01833                         boost::unique_lock<boost::mutex> guard(etagsLock);
01834                         while (etags.size() <= chunk.index)
01835                             etags.push_back("");
01836                         etags[chunk.index] = etag;
01837                     } catch (...) {
01838                         // Capture exception to be thrown later
01839                         exc = std::current_exception();
01840                     }
01841                 }
01842             }
01843         }
01844     };
01845 
01846     std::shared_ptr<Impl> impl;
01847 
01848     std::streamsize write(const char_type* s, std::streamsize n)
01849     {
01850         return impl->write(s, n);
01851     }
01852 
01853     bool is_open() const
01854     {
01855         return !!impl;
01856     }
01857 
01858     void close()
01859     {
01860         impl->finish();
01861         impl.reset();
01862     }
01863 };
01864 
01865 std::auto_ptr<std::streambuf>
01866 S3Api::
01867 streamingUpload(const std::string & uri,
01868                 const ObjectMetadata & metadata) const
01869 {
01870     string bucket, object;
01871     std::tie(bucket, object) = parseUri(uri);
01872     return streamingUpload(bucket, object, metadata);
01873 }
01874 
01875 std::auto_ptr<std::streambuf>
01876 S3Api::
01877 streamingUpload(const std::string & bucket,
01878                 const std::string & object,
01879                 const ObjectMetadata & metadata) const
01880 {
01881     std::auto_ptr<std::streambuf> result;
01882     result.reset(new boost::iostreams::stream_buffer<StreamingUploadSource>
01883                  (StreamingUploadSource(this, bucket, object, metadata),
01884                   131072));
01885     return result;
01886 }
01887 
01888 std::pair<std::string, std::string>
01889 S3Api::
01890 parseUri(const std::string & uri)
01891 {
01892     if (uri.find("s3://") != 0)
01893         throw ML::Exception("wrong scheme (should start with s3://)");
01894     string pathPart(uri, 5);
01895     string::size_type pos = pathPart.find('/');
01896     if (pos == string::npos)
01897         throw ML::Exception("couldn't find bucket name");
01898     string bucket(pathPart, 0, pos);
01899     string object(pathPart, pos + 1);
01900 
01901     return make_pair(bucket, object);
01902 }
01903 
01904 std::auto_ptr<std::streambuf>
01905 S3Api::
01906 streamingDownload(const std::string & uri,
01907                   ssize_t startOffset,
01908                   ssize_t endOffset,
01909                   const OnChunk & onChunk) const
01910 {
01911     string bucket, object;
01912     std::tie(bucket, object) = parseUri(uri);
01913 
01914     //cerr << "bucket = " << bucket << " object = " << object << endl;
01915 
01916     return streamingDownload(bucket, object, startOffset, endOffset, onChunk);
01917 }
01918 
01919 void
01920 S3Handle::
01921 initS3(const std::string & accessKeyId,
01922        const std::string & accessKey,
01923        const std::string & uriPrefix)
01924 {
01925     s3.init(accessKeyId, accessKey);
01926     this->s3UriPrefix = uriPrefix;
01927 }
01928 
01929 size_t
01930 S3Handle::
01931 getS3Buffer(const std::string & filename, char** outBuffer){
01932     auto stats = s3.getObjectInfo(filename);
01933     if (!stats)
01934         throw ML::Exception("unknown s3 object");
01935 
01936     *outBuffer = new char[stats.size];
01937 
01938     uint64_t done = 0;
01939 
01940     auto onChunk = [&] (const char * data,
01941                         size_t size,
01942                         int chunkIndex,
01943                         uint64_t offset,
01944                         uint64_t totalSize)
01945         {
01946             ExcAssertEqual(stats.size, totalSize);
01947             ExcAssertLessEqual(offset + size, totalSize);
01948             std::copy(data, data + size, *outBuffer + offset);
01949             ML::atomic_add(done, size);
01950         };
01951 
01952     s3.download(filename, onChunk);
01953 
01954     ExcAssertEqual(done, stats.size);
01955 
01956     cerr << "done downloading " << stats.size << " bytes from "
01957          << filename << endl;
01958 
01959     return stats.size;
01960 
01961 }
01962 
01963 bool
01964 S3Api::
01965 forEachBucket(const OnBucket & onBucket) const
01966 {
01967     using namespace tinyxml2;
01968 
01969     //cerr << "forEachObject under " << prefix << endl;
01970 
01971     auto listingResult = get("", "/", 8192, "");
01972     auto listingResultXml = listingResult.bodyXml();
01973 
01974     //listingResultXml->Print();
01975 
01976     auto foundBucket
01977         = XMLHandle(*listingResultXml)
01978         .FirstChildElement("ListAllMyBucketsResult")
01979         .FirstChildElement("Buckets")
01980         .FirstChildElement("Bucket")
01981         .ToElement();
01982 
01983     for (; onBucket && foundBucket;
01984          foundBucket = foundBucket->NextSiblingElement("Bucket")) {
01985 
01986         string foundName
01987             = extract<string>(foundBucket, "Name");
01988         if (!onBucket(foundName))
01989             return false;
01990     }
01991 
01992     return true;
01993 }
01994 
01995 void
01996 S3Api::
01997 uploadRecursive(string dirSrc, string bucketDest, bool includeDir){
01998     using namespace boost::filesystem;
01999     path targetDir(dirSrc);
02000     if(!is_directory(targetDir)){
02001         throw ML::Exception("%s is not a directory", dirSrc.c_str());
02002     }
02003     recursive_directory_iterator it(targetDir), itEnd;
02004     int toTrim = includeDir ? 0 : dirSrc.length() + 1;
02005     for(; it != itEnd; it ++){
02006         if(!is_directory(*it)){
02007             string path = it->path().string();
02008             ML::File_Read_Buffer frb(path);
02009             size_t size = file_size(path);
02010             if(toTrim){
02011                 path = path.substr(toTrim);
02012             }
02013             upload(frb.start(), size, "s3://" + bucketDest + "/" + path);
02014         }
02015     }
02016 }
02017 
02018 void S3Api::setDefaultBandwidthToServiceMbps(double mbps){
02019     S3Api::defaultBandwidthToServiceMbps = mbps;
02020 }
02021 
02022 namespace {
02023 
02024 struct S3BucketInfo {
02025     std::string s3Bucket;
02026     std::shared_ptr<S3Api> api;  //< Used to access this uri
02027 };
02028 
02029 std::mutex s3BucketsLock;
02030 std::unordered_map<std::string, S3BucketInfo> s3Buckets;
02031 
02032 } // file scope
02033 
02038 void registerS3Bucket(const std::string & bucketName,
02039                       const std::string & accessKeyId,
02040                       const std::string & accessKey,
02041                       double bandwidthToServiceMbps,
02042                       const std::string & protocol,
02043                       const std::string & serviceUri)
02044 {
02045     std::unique_lock<std::mutex> guard(s3BucketsLock);
02046     if (s3Buckets.count(bucketName)){
02047         throw BucketAlreadyRegistered(bucketName);
02048     }
02049 
02050     S3BucketInfo info;
02051     info.s3Bucket = bucketName;
02052     info.api = std::make_shared<S3Api>(accessKeyId, accessKey,
02053                                        bandwidthToServiceMbps,
02054                                        protocol, serviceUri);
02055 
02056     s3Buckets[bucketName] = info;
02057 }
02058 
02059 struct RegisterS3Handler {
02060     static std::pair<std::streambuf *, bool>
02061     getS3Handler(const std::string & scheme,
02062                  const std::string & resource,
02063                  std::ios_base::open_mode mode)
02064     {
02065         string::size_type pos = resource.find('/');
02066         if (pos == string::npos)
02067             throw ML::Exception("unable to find s3 bucket name in resource "
02068                                 + resource);
02069         string bucket(resource, 0, pos);
02070 
02071         std::shared_ptr<S3Api> api;
02072 
02073         {
02074             std::unique_lock<std::mutex> guard(s3BucketsLock);
02075             auto it = s3Buckets.find(bucket);
02076             if (it == s3Buckets.end())
02077                 throw ML::Exception("unregistered s3 bucket " + bucket);
02078             api = it->second.api;
02079         }
02080 
02081         ExcAssert(api);
02082 
02083         if (mode == ios::in) {
02084             return make_pair(api->streamingDownload("s3://" + resource)
02085                              .release(),
02086                              true);
02087         }
02088         else if (mode == ios::out) {
02089             return make_pair(api->streamingUpload("s3://" + resource)
02090                              .release(),
02091                              true);
02092         }
02093         else throw ML::Exception("no way to create s3 handler for non in/out");
02094     }
02095 
02096     RegisterS3Handler()
02097     {
02098         ML::registerUriHandler("s3", getS3Handler);
02099     }
02100 
02101 } registerS3Handler;
02102 
02103 void registerS3Buckets(const std::string & accessKeyId,
02104                        const std::string & accessKey,
02105                        double bandwidthToServiceMbps,
02106                        const std::string & protocol,
02107                        const std::string & serviceUri)
02108 {
02109     std::unique_lock<std::mutex> guard(s3BucketsLock);
02110 
02111     auto api = std::make_shared<S3Api>(accessKeyId, accessKey,
02112                                        bandwidthToServiceMbps,
02113                                        protocol, serviceUri);
02114 
02115     auto onBucket = [&] (const std::string & bucketName)
02116         {
02117             //cerr << "got bucket " << bucketName << endl;
02118 
02119             if (s3Buckets.count(bucketName)){
02120                 throw BucketAlreadyRegistered(bucketName);
02121             }
02122 
02123             S3BucketInfo info;
02124             info.s3Bucket = bucketName;
02125             info.api = api;
02126             s3Buckets[bucketName] = info;
02127 
02128             return true;
02129         };
02130 
02131     api->forEachBucket(onBucket);
02132 }
02133 
02134 std::shared_ptr<S3Api> getS3ApiForBucket(const std::string & bucketName)
02135 {
02136     std::unique_lock<std::mutex> guard(s3BucketsLock);
02137     auto it = s3Buckets.find(bucketName);
02138     if (it == s3Buckets.end())
02139         throw ML::Exception("unregistered s3 bucket " + bucketName);
02140     return it->second.api;
02141 }
02142 
02143 // Return an URI for either a file or an s3 object
02144 size_t getUriSize(const std::string & filename)
02145 {
02146     if (filename.find("s3://") == 0) {
02147         string bucket = S3Api::parseUri(filename).first;
02148         auto api = getS3ApiForBucket(bucket);
02149         return api->getObjectInfo(filename).size;
02150     }
02151     else {
02152         struct stat stats;
02153         int res = stat(filename.c_str(), &stats);
02154         if (res == -1)
02155             throw ML::Exception("error getting stats file");
02156         return stats.st_size;
02157     }
02158 }
02159 
02160 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator