RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 00008 #include "soa/service/s3.h" 00009 #include "jml/utils/string_functions.h" 00010 #include "soa/types/date.h" 00011 #include "soa/types/url.h" 00012 #include "jml/arch/futex.h" 00013 #include "jml/utils/exc_assert.h" 00014 #include "jml/utils/pair_utils.h" 00015 #include "jml/utils/vector_utils.h" 00016 #include "jml/utils/filter_streams.h" 00017 #include "jml/arch/timers.h" 00018 #include "jml/utils/ring_buffer.h" 00019 #include "jml/utils/hash.h" 00020 #include "jml/utils/file_functions.h" 00021 00022 #define CRYPTOPP_ENABLE_NAMESPACE_WEAK 1 00023 #include "crypto++/sha.h" 00024 #include "crypto++/md5.h" 00025 #include "crypto++/hmac.h" 00026 #include "crypto++/base64.h" 00027 00028 #include <curlpp/cURLpp.hpp> 00029 #include <curlpp/Easy.hpp> 00030 #include <curlpp/Options.hpp> 00031 #include <curlpp/Info.hpp> 00032 #include <curlpp/Infos.hpp> 00033 00034 #include <boost/iostreams/stream_buffer.hpp> 00035 #include <exception> 00036 #include <thread> 00037 #include <unordered_map> 00038 00039 #include <boost/filesystem.hpp> 00040 00041 00042 using namespace std; 00043 using namespace ML; 00044 00045 namespace Datacratic { 00046 00047 double 00048 S3Api:: 00049 defaultBandwidthToServiceMbps = 20.0; 00050 00051 S3Api:: 00052 S3Api() 00053 { 00054 bandwidthToServiceMbps = defaultBandwidthToServiceMbps; 00055 } 00056 00057 S3Api:: 00058 S3Api(const std::string & accessKeyId, 00059 const std::string & accessKey, 00060 double bandwidthToServiceMbps, 00061 const std::string & defaultProtocol, 00062 const std::string & serviceUri) 00063 : accessKeyId(accessKeyId), 00064 accessKey(accessKey), 00065 defaultProtocol(defaultProtocol), 00066 serviceUri(serviceUri), 00067 bandwidthToServiceMbps(bandwidthToServiceMbps) 00068 { 00069 } 00070 00071 void 00072 S3Api:: 00073 init(const std::string & accessKeyId, 00074 const std::string & accessKey, 00075 double bandwidthToServiceMbps, 00076 const std::string & defaultProtocol, 00077 const std::string & serviceUri) 00078 { 00079 this->accessKeyId = accessKeyId; 00080 this->accessKey = accessKey; 00081 this->defaultProtocol = defaultProtocol; 00082 this->serviceUri = serviceUri; 00083 this->bandwidthToServiceMbps = bandwidthToServiceMbps; 00084 } 00085 00086 S3Api::Content:: 00087 Content(const tinyxml2::XMLDocument & xml) 00088 { 00089 tinyxml2::XMLPrinter printer; 00090 const_cast<tinyxml2::XMLDocument &>(xml).Print(&printer); 00091 this->contentType = "application/xml"; 00092 this->str = printer.CStr(); 00093 this->hasContent = true; 00094 this->data = str.c_str(); 00095 this->size = str.length(); 00096 } 00097 00098 std::string 00099 S3Api:: 00100 getDigestMulti(const std::string & verb, 00101 const std::string & bucket, 00102 const std::string & resource, 00103 const std::string & subResource, 00104 const std::string & contentType, 00105 const std::string & contentMd5, 00106 const std::string & date, 00107 const std::vector<std::pair<std::string, std::string> > & headers) 00108 { 00109 map<string, string> canonHeaders; 00110 for (auto it = headers.begin(), end = headers.end(); 00111 it != end; ++it) { 00112 string key = lowercase(it->first); 00113 if (key.find("x-amz") != 0) continue; 00114 00115 string value = it->second; 00116 if (canonHeaders.count(key)) 00117 canonHeaders[key] += ","; 00118 canonHeaders[key] += value; 00119 } 00120 00121 return getDigest(verb, bucket, resource, subResource, 00122 contentType, contentMd5, date, canonHeaders); 00123 00124 } 00125 00126 /* 00127 Authorization = "AWS" + " " + AWSAccessKeyId + ":" + Signature; 00128 00129 Signature = Base64( HMAC-SHA1( YourSecretAccessKeyID, UTF-8-Encoding-Of( StringToSign ) ) ); 00130 00131 StringToSign = HTTP-Verb + "\n" + 00132 Content-MD5 + "\n" + 00133 Content-Type + "\n" + 00134 Date + "\n" + 00135 CanonicalizedAmzHeaders + 00136 CanonicalizedResource; 00137 00138 CanonicalizedResource = [ "/" + Bucket ] + 00139 <HTTP-Request-URI, from the protocol name up to the query string> + 00140 [ sub-resource, if present. For example "?acl", "?location", "?logging", or "?torrent"]; 00141 00142 CanonicalizedAmzHeaders = <described below> 00143 To construct the CanonicalizedAmzHeaders part of StringToSign, select all HTTP request headers that start with 'x-amz-' (using a case-insensitive comparison) and use the following process. 00144 00145 CanonicalizedAmzHeaders Process 00146 1 Convert each HTTP header name to lower-case. For example, 'X-Amz-Date' becomes 'x-amz-date'. 00147 2 Sort the collection of headers lexicographically by header name. 00148 3 Combine header fields with the same name into one "header-name:comma-separated-value-list" pair as prescribed by RFC 2616, section 4.2, without any white-space between values. For example, the two metadata headers 'x-amz-meta-username: fred' and 'x-amz-meta-username: barney' would be combined into the single header 'x-amz-meta-username: fred,barney'. 00149 4 "Unfold" long headers that span multiple lines (as allowed by RFC 2616, section 4.2) by replacing the folding white-space (including new-line) by a single space. 00150 5 Trim any white-space around the colon in the header. For example, the header 'x-amz-meta-username: fred,barney' would become 'x-amz-meta-username:fred,barney' 00151 6 Finally, append a new-line (U+000A) to each canonicalized header in the resulting list. Construct the CanonicalizedResource element by concatenating all headers in this list into a single string. 00152 00153 00154 */ 00155 std::string 00156 S3Api:: 00157 getDigest(const std::string & verb, 00158 const std::string & bucket, 00159 const std::string & resource, 00160 const std::string & subResource, 00161 const std::string & contentType, 00162 const std::string & contentMd5, 00163 const std::string & date, 00164 const std::map<std::string, std::string> & headers) 00165 { 00166 string canonHeaderString; 00167 00168 for (auto it = headers.begin(), end = headers.end(); 00169 it != end; ++it) { 00170 string key = lowercase(it->first); 00171 if (key.find("x-amz") != 0) continue; 00172 00173 string value = it->second; 00174 00175 canonHeaderString += key + ":" + value + "\n"; 00176 } 00177 00178 //cerr << "bucket = " << bucket << " resource = " << resource << endl; 00179 00180 string canonResource 00181 = (bucket == "" ? "" : "/" + bucket) 00182 + resource 00183 + (subResource.empty() ? "" : "?") 00184 + subResource; 00185 00186 string stringToSign 00187 = verb + "\n" 00188 + contentMd5 + "\n" 00189 + contentType + "\n" 00190 + date + "\n" 00191 + canonHeaderString 00192 + canonResource; 00193 00194 return stringToSign; 00195 } 00196 00197 std::string 00198 S3Api:: 00199 sign(const std::string & stringToSign, 00200 const std::string & accessKey) 00201 { 00202 typedef CryptoPP::SHA1 Hash; 00203 00204 size_t digestLen = Hash::DIGESTSIZE; 00205 byte digest[digestLen]; 00206 CryptoPP::HMAC<Hash> hmac((byte *)accessKey.c_str(), accessKey.length()); 00207 hmac.CalculateDigest(digest, 00208 (byte *)stringToSign.c_str(), 00209 stringToSign.length()); 00210 00211 // base64 00212 char outBuf[256]; 00213 00214 CryptoPP::Base64Encoder baseEncoder; 00215 baseEncoder.Put(digest, digestLen); 00216 baseEncoder.MessageEnd(); 00217 size_t got = baseEncoder.Get((byte *)outBuf, 256); 00218 outBuf[got] = 0; 00219 00220 //cerr << "got " << got << " characters" << endl; 00221 00222 string base64digest(outBuf, outBuf + got - 1); 00223 00224 //cerr << "base64digest.size() = " << base64digest.size() << endl; 00225 00226 return base64digest; 00227 } 00228 00229 S3Api::Response 00230 S3Api::SignedRequest:: 00231 performSync() const 00232 { 00233 int numRetries = 7; 00234 00235 for (unsigned i = 0; i < numRetries; ++i) { 00236 string responseHeaders; 00237 string body; 00238 00239 try { 00240 responseHeaders.clear(); 00241 body.clear(); 00242 00243 curlpp::Easy myRequest; 00244 00245 using namespace curlpp::options; 00246 using namespace curlpp::infos; 00247 00248 list<string> curlHeaders; 00249 for (unsigned i = 0; i < params.headers.size(); ++i) { 00250 curlHeaders.push_back(params.headers[i].first + ": " 00251 + params.headers[i].second); 00252 } 00253 00254 curlHeaders.push_back("Date: " + params.date); 00255 curlHeaders.push_back("Authorization: " + auth); 00256 00257 //cerr << "getting " << uri << " " << params.headers << endl; 00258 00259 uint64_t totalBytesToTransfer = params.expectedBytesToDownload 00260 + params.content.size; 00261 double expectedTimeSeconds 00262 = totalBytesToTransfer 00263 / 1000000.0 00264 / bandwidthToServiceMbps; 00265 int timeout = 15 + std::max<int>(30, expectedTimeSeconds * 3); 00266 00267 #if 0 00268 cerr << "totalBytesToTransfer = " << totalBytesToTransfer << endl; 00269 cerr << "expectedTimeSeconds = " << expectedTimeSeconds << endl; 00270 cerr << "timeout = " << timeout << endl; 00271 #endif 00272 00273 #if 0 00274 if (params.verb == "GET") ; 00275 else if (params.verb == "POST") { 00276 //myRequest.setOpt<Post>(true); 00277 } 00278 else if (params.verb == "PUT") { 00279 myRequest.setOpt<Post>(true); 00280 } 00281 else throw ML::Exception("unknown verb " + params.verb); 00282 #endif 00283 //cerr << "!!!Setting params verb " << params.verb << endl; 00284 myRequest.setOpt<CustomRequest>(params.verb); 00285 00286 myRequest.setOpt<curlpp::options::Url>(uri); 00287 //myRequest.setOpt<Verbose>(true); 00288 myRequest.setOpt<ErrorBuffer>((char *)0); 00289 myRequest.setOpt<Timeout>(timeout); 00290 myRequest.setOpt<NoSignal>(1); 00291 00292 auto onData = [&] (char * data, size_t ofs1, size_t ofs2) -> size_t 00293 { 00294 //cerr << "called onData for " << ofs1 << " " << ofs2 << endl; 00295 return 0; 00296 }; 00297 00298 auto onWriteData = [&] (char * data, size_t ofs1, size_t ofs2) -> size_t 00299 { 00300 body.append(data, ofs1 * ofs2); 00301 return ofs1 * ofs2; 00302 //cerr << "called onWrite for " << ofs1 << " " << ofs2 << endl; 00303 return 0; 00304 }; 00305 00306 auto onProgress = [&] (double p1, double p2, double p3, double p4) -> int 00307 { 00308 cerr << "progress " << p1 << " " << p2 << " " << p3 << " " 00309 << p4 << endl; 00310 return 0; 00311 }; 00312 00313 bool afterContinue = false; 00314 00315 auto onHeader = [&] (char * data, size_t ofs1, size_t ofs2) -> size_t 00316 { 00317 string headerLine(data, ofs1 * ofs2); 00318 if (headerLine.find("HTTP/1.1 100 Continue") == 0) { 00319 afterContinue = true; 00320 } 00321 else if (afterContinue) { 00322 if (headerLine == "\r\n") 00323 afterContinue = false; 00324 } 00325 else { 00326 responseHeaders.append(headerLine); 00327 //cerr << "got header data " << headerLine << endl; 00328 } 00329 return ofs1 * ofs2; 00330 }; 00331 00332 myRequest.setOpt<BoostHeaderFunction>(onHeader); 00333 myRequest.setOpt<BoostWriteFunction>(onWriteData); 00334 myRequest.setOpt<BoostProgressFunction>(onProgress); 00335 //myRequest.setOpt<Header>(true); 00336 if (params.content.data) { 00337 string s(params.content.data, params.content.size); 00338 myRequest.setOpt<PostFields>(s); 00339 } 00340 else 00341 { 00342 string s; 00343 myRequest.setOpt<PostFields>(s); 00344 } 00345 myRequest.setOpt<PostFieldSize>(params.content.size); 00346 curlHeaders.push_back(ML::format("Content-Length: %lld", 00347 params.content.size)); 00348 curlHeaders.push_back("Transfer-Encoding:"); 00349 curlHeaders.push_back("Content-Type:"); 00350 myRequest.setOpt<curlpp::options::HttpHeader>(curlHeaders); 00351 00352 myRequest.perform(); 00353 00354 Response response; 00355 response.body_ = body; 00356 00357 curlpp::InfoGetter::get(myRequest, CURLINFO_RESPONSE_CODE, 00358 response.code_); 00359 00360 if (response.code_ == 500) { 00361 // Internal server error 00362 // Wait 10 seconds and retry 00363 cerr << "Service returned 500: " << endl; 00364 cerr << "uri is " << uri << endl; 00365 cerr << "response headers " << responseHeaders << endl; 00366 cerr << "body is " << body << endl; 00367 00368 ML::sleep(10); 00369 continue; // retry 00370 } 00371 00372 double bytesUploaded; 00373 00374 curlpp::InfoGetter::get(myRequest, CURLINFO_SIZE_UPLOAD, 00375 bytesUploaded); 00376 00377 //cerr << "uploaded " << bytesUploaded << " bytes" << endl; 00378 00379 response.header_.parse(responseHeaders); 00380 00381 return response; 00382 } catch (const curlpp::LibcurlRuntimeError & exc) { 00383 cerr << "libCurl returned an error with code " << exc.whatCode() 00384 << endl; 00385 cerr << "error is " << curl_easy_strerror(exc.whatCode()) 00386 << endl; 00387 cerr << "uri is " << uri << endl; 00388 cerr << "headers are " << responseHeaders << endl; 00389 cerr << "body contains " << body.size() << " bytes" << endl; 00390 00391 if (i < 2) 00392 cerr << "retrying" << endl; 00393 else throw; 00394 } 00395 } 00396 00397 throw ML::Exception("logic error"); 00398 } 00399 00400 std::string 00401 S3Api:: 00402 signature(const RequestParams & request) const 00403 { 00404 string digest 00405 = S3Api::getDigestMulti(request.verb, 00406 request.bucket, 00407 request.resource, request.subResource, 00408 request.contentType, request.contentMd5, 00409 request.date, request.headers); 00410 00411 //cerr << "digest = " << digest << endl; 00412 00413 return S3Api::sign(digest, accessKey); 00414 } 00415 00416 inline std::string uriEncode(const std::string & str) 00417 { 00418 return str; 00419 } 00420 00421 S3Api::SignedRequest 00422 S3Api:: 00423 prepare(const RequestParams & request) const 00424 { 00425 string protocol = defaultProtocol; 00426 if(protocol.length() == 0){ 00427 throw ML::Exception("attempt to perform s3 request without a " 00428 "default protocol. (Could be caused by S3Api initialisation with " 00429 "the empty constructor.)"); 00430 } 00431 00432 SignedRequest result; 00433 result.params = request; 00434 result.bandwidthToServiceMbps = bandwidthToServiceMbps; 00435 00436 if (request.resource.find("//") != string::npos) 00437 throw ML::Exception("attempt to perform s3 request with double slash: " 00438 + request.resource); 00439 00440 if (request.bucket.empty()) { 00441 result.uri = protocol + "://" + serviceUri 00442 + request.resource 00443 + (request.subResource != "" ? "?" + request.subResource : ""); 00444 } 00445 else { 00446 result.uri = protocol + "://" + request.bucket + "." + serviceUri 00447 + request.resource 00448 + (request.subResource != "" ? "?" + request.subResource : ""); 00449 } 00450 00451 for (unsigned i = 0; i < request.queryParams.size(); ++i) { 00452 if (i == 0 && request.subResource == "") 00453 result.uri += "?"; 00454 else result.uri += "&"; 00455 result.uri += uriEncode(request.queryParams[i].first) 00456 + "=" + uriEncode(request.queryParams[i].second); 00457 } 00458 00459 string sig = signature(request); 00460 result.auth = "AWS " + accessKeyId + ":" + sig; 00461 00462 //cerr << "result.uri = " << result.uri << endl; 00463 //cerr << "result.auth = " << result.auth << endl; 00464 00465 return result; 00466 } 00467 00468 S3Api::Response 00469 S3Api:: 00470 get(const std::string & bucket, 00471 const std::string & resource, 00472 uint64_t expectedBytesToDownload, 00473 const std::string & subResource, 00474 const StrPairVector & headers, 00475 const StrPairVector & queryParams) const 00476 { 00477 RequestParams request; 00478 request.verb = "GET"; 00479 request.bucket = bucket; 00480 request.resource = resource; 00481 request.subResource = subResource; 00482 request.headers = headers; 00483 request.queryParams = queryParams; 00484 request.date = Date::now().printRfc2616(); 00485 request.expectedBytesToDownload = expectedBytesToDownload; 00486 00487 return prepare(request).performSync(); 00488 } 00489 00491 S3Api::Response 00492 S3Api:: 00493 post(const std::string & bucket, 00494 const std::string & resource, 00495 const std::string & subResource, 00496 const StrPairVector & headers, 00497 const StrPairVector & queryParams, 00498 const Content & content) const 00499 { 00500 RequestParams request; 00501 request.verb = "POST"; 00502 request.bucket = bucket; 00503 request.resource = resource; 00504 request.subResource = subResource; 00505 request.headers = headers; 00506 request.queryParams = queryParams; 00507 request.date = Date::now().printRfc2616(); 00508 request.content = content; 00509 00510 return prepare(request).performSync(); 00511 } 00512 00513 S3Api::Response 00514 S3Api:: 00515 put(const std::string & bucket, 00516 const std::string & resource, 00517 const std::string & subResource, 00518 const StrPairVector & headers, 00519 const StrPairVector & queryParams, 00520 const Content & content) const 00521 { 00522 RequestParams request; 00523 request.verb = "PUT"; 00524 request.bucket = bucket; 00525 request.resource = resource; 00526 request.subResource = subResource; 00527 request.headers = headers; 00528 request.queryParams = queryParams; 00529 request.date = Date::now().printRfc2616(); 00530 request.content = content; 00531 00532 return prepare(request).performSync(); 00533 } 00534 S3Api::Response 00535 S3Api:: 00536 erase(const std::string & bucket, 00537 const std::string & resource, 00538 const std::string & subResource, 00539 const StrPairVector & headers, 00540 const StrPairVector & queryParams, 00541 const Content & content) const 00542 { 00543 RequestParams request; 00544 request.verb = "DELETE"; 00545 request.bucket = bucket; 00546 request.resource = resource; 00547 request.subResource = subResource; 00548 request.headers = headers; 00549 request.queryParams = queryParams; 00550 request.date = Date::now().printRfc2616(); 00551 request.content = content; 00552 00553 return prepare(request).performSync(); 00554 } 00555 00556 template<typename T> 00557 T extract(tinyxml2::XMLNode * element, const std::string & path) 00558 { 00559 if (!element) 00560 throw ML::Exception("can't extract from missing element"); 00561 //tinyxml2::XMLHandle handle(element); 00562 00563 vector<string> splitPath = ML::split(path, '/'); 00564 auto p = element; 00565 for (unsigned i = 0; i < splitPath.size(); ++i) { 00566 p = p->FirstChildElement(splitPath[i].c_str()); 00567 if (!p) { 00568 element->GetDocument()->Print(); 00569 throw ML::Exception("required key " + splitPath[i] 00570 + " not found on path " + path); 00571 } 00572 } 00573 00574 auto text = tinyxml2::XMLHandle(p).FirstChild().ToText(); 00575 00576 if (!text) { 00577 element->GetDocument()->Print(); 00578 throw ML::Exception("no text at node " + path); 00579 } 00580 return boost::lexical_cast<T>(text->Value()); 00581 } 00582 00583 template<typename T> 00584 T extractDef(tinyxml2::XMLNode * element, const std::string & path, 00585 const T & ifMissing) 00586 { 00587 if (!element) return ifMissing; 00588 00589 vector<string> splitPath = ML::split(path, '/'); 00590 auto p = element; 00591 for (unsigned i = 0; i < splitPath.size(); ++i) { 00592 p = p->FirstChildElement(splitPath[i].c_str()); 00593 if (!p) 00594 return ifMissing; 00595 } 00596 00597 auto text = tinyxml2::XMLHandle(p).FirstChild().ToText(); 00598 00599 if (!text) return ifMissing; 00600 00601 return boost::lexical_cast<T>(text->Value()); 00602 } 00603 00604 template<typename T> 00605 T extract(const std::unique_ptr<tinyxml2::XMLDocument> & doc, 00606 const std::string & path) 00607 { 00608 return extract<T>(doc.get(), path); 00609 } 00610 00611 template<typename T> 00612 T extractDef(const std::unique_ptr<tinyxml2::XMLDocument> & doc, 00613 const std::string & path, const T & def) 00614 { 00615 return extractDef<T>(doc.get(), path, def); 00616 } 00617 00618 namespace { 00619 00620 } // file scope 00621 00622 std::vector<std::pair<std::string, std::string> > 00623 S3Api::ObjectMetadata:: 00624 getRequestHeaders() const 00625 { 00626 std::vector<std::pair<std::string, std::string> > result; 00627 if (redundancy == REDUNDANCY_REDUCED) 00628 result.push_back({"x-amz-storage-class", "REDUCED_REDUNDANCY"}); 00629 else if(redundancy == REDUNDANCY_GLACIER) 00630 result.push_back({"x-amz-storage-class", "GLACIER"}); 00631 if (serverSideEncryption == SSE_AES256) 00632 result.push_back({"x-amz-server-side-encryption", "AES256"}); 00633 if (contentType != "") 00634 result.push_back({"Content-Type", contentType}); 00635 if (contentEncoding != "") 00636 result.push_back({"Content-Encoding", contentEncoding}); 00637 for (auto md: metadata) { 00638 result.push_back({"x-amz-meta-" + md.first, md.second}); 00639 } 00640 return result; 00641 } 00642 00643 S3Api::MultiPartUpload 00644 S3Api:: 00645 obtainMultiPartUpload(const std::string & bucket, 00646 const std::string & resource, 00647 const ObjectMetadata & metadata) const 00648 { 00649 // Contains the resource without the leading slash 00650 string outputPrefix(resource, 1); 00651 00652 // Check if there is already a multipart upload in progress 00653 auto inProgress = get(bucket, "/", 8192, "uploads", {}, 00654 { { "prefix", outputPrefix } }) 00655 .bodyXml(); 00656 00657 using namespace tinyxml2; 00658 00659 XMLHandle handle(*inProgress); 00660 00661 auto upload 00662 = handle 00663 .FirstChildElement("ListMultipartUploadsResult") 00664 .FirstChildElement("Upload") 00665 .ToElement(); 00666 00667 string uploadId; 00668 vector<MultiPartUploadPart> parts; 00669 00670 uint64_t partSize = 0; 00671 uint64_t currentOffset = 0; 00672 00673 for (; upload; upload = upload->NextSiblingElement("Upload")) { 00674 XMLHandle uploadHandle(upload); 00675 00676 auto foundNode 00677 = uploadHandle 00678 .FirstChildElement("UploadId") 00679 .FirstChild() 00680 .ToText(); 00681 00682 if (!foundNode) 00683 throw ML::Exception("found node has no ID"); 00684 00685 // TODO: check metadata, etc 00686 00687 // Already an upload in progress 00688 uploadId = foundNode->Value(); 00689 00690 auto inProgressInfo = get(bucket, resource, 8192, 00691 "uploadId=" + uploadId) 00692 .bodyXml(); 00693 00694 //inProgressInfo->Print(); 00695 00696 XMLHandle handle(*inProgressInfo); 00697 00698 auto foundPart 00699 = handle 00700 .FirstChildElement("ListPartsResult") 00701 .FirstChildElement("Part") 00702 .ToElement(); 00703 00704 int numPartsDone = 0; 00705 uint64_t biggestPartSize = 0; 00706 for (; foundPart; 00707 foundPart = foundPart->NextSiblingElement("Part"), 00708 ++numPartsDone) { 00709 MultiPartUploadPart currentPart; 00710 currentPart.fromXml(foundPart); 00711 if (currentPart.partNumber != numPartsDone + 1) { 00712 //cerr << "missing part " << numPartsDone + 1 << endl; 00713 // from here we continue alone 00714 break; 00715 } 00716 currentPart.startOffset = currentOffset; 00717 currentOffset += currentPart.size; 00718 biggestPartSize = std::max(biggestPartSize, currentPart.size); 00719 parts.push_back(currentPart); 00720 } 00721 00722 partSize = biggestPartSize; 00723 00724 //cerr << "numPartsDone = " << numPartsDone << endl; 00725 //cerr << "currentOffset = " << currentOffset 00726 // << "dataSize = " << dataSize << endl; 00727 } 00728 00729 if (uploadId.empty()) { 00730 //cerr << "getting new ID" << endl; 00731 00732 vector<pair<string, string> > headers = metadata.getRequestHeaders(); 00733 auto result = post(bucket, resource, "uploads", headers).bodyXml(); 00734 //result->Print(); 00735 //cerr << "result = " << result << endl; 00736 00737 uploadId 00738 = extract<string>(result, "InitiateMultipartUploadResult/UploadId"); 00739 00740 //cerr << "new upload = " << uploadId << endl; 00741 } 00742 //return; 00743 00744 MultiPartUpload result; 00745 result.parts.swap(parts); 00746 result.id = uploadId; 00747 return result; 00748 } 00749 00750 std::string 00751 S3Api:: 00752 finishMultiPartUpload(const std::string & bucket, 00753 const std::string & resource, 00754 const std::string & uploadId, 00755 const std::vector<std::string> & etags) const 00756 { 00757 using namespace tinyxml2; 00758 // Finally, send back a response to join the parts together 00759 XMLDocument joinRequest; 00760 auto r = joinRequest.InsertFirstChild(joinRequest.NewElement("CompleteMultipartUpload")); 00761 for (unsigned i = 0; i < etags.size(); ++i) { 00762 auto n = r->InsertEndChild(joinRequest.NewElement("Part")); 00763 n->InsertEndChild(joinRequest.NewElement("PartNumber")) 00764 ->InsertEndChild(joinRequest.NewText(ML::format("%d", i + 1).c_str())); 00765 n->InsertEndChild(joinRequest.NewElement("ETag")) 00766 ->InsertEndChild(joinRequest.NewText(etags[i].c_str())); 00767 } 00768 auto joinResponse 00769 = post(bucket, resource, "uploadId=" + uploadId, 00770 {}, {}, joinRequest); 00771 00772 //cerr << joinResponse.bodyXmlStr() << endl; 00773 00774 auto joinResponseXml = joinResponse.bodyXml(); 00775 00776 try { 00777 00778 string etag = extract<string>(joinResponseXml, 00779 "CompleteMultipartUploadResult/ETag"); 00780 return etag; 00781 } catch (const std::exception & exc) { 00782 cerr << "--- request is " << endl; 00783 joinRequest.Print(); 00784 cerr << "error completing multipart upload: " << exc.what() << endl; 00785 throw; 00786 } 00787 } 00788 00789 void 00790 S3Api::MultiPartUploadPart:: 00791 fromXml(tinyxml2::XMLElement * element) 00792 { 00793 partNumber = extract<int>(element, "PartNumber"); 00794 lastModified = extract<string>(element, "LastModified"); 00795 etag = extract<string>(element, "ETag"); 00796 size = extract<uint64_t>(element, "Size"); 00797 done = true; 00798 } 00799 00800 std::string 00801 S3Api:: 00802 upload(const char * data, 00803 size_t dataSize, 00804 const std::string & bucket, 00805 const std::string & resource, 00806 CheckMethod check, 00807 const ObjectMetadata & metadata, 00808 int numInParallel) 00809 { 00810 if (resource == "" || resource[0] != '/') 00811 throw ML::Exception("resource should start with a /"); 00812 // Contains the resource without the leading slash 00813 string outputPrefix(resource, 1); 00814 00815 //cerr << "need to upload " << dataSize << " bytes" << endl; 00816 00817 // Check if it's already there 00818 00819 if (check == CM_SIZE || check == CM_MD5_ETAG) { 00820 auto existingResource 00821 = get(bucket, "/", 8192, "", {}, 00822 { { "prefix", outputPrefix } }) 00823 .bodyXml(); 00824 00825 //cerr << "existing" << endl; 00826 //existingResource->Print(); 00827 00828 auto foundContent 00829 = tinyxml2::XMLHandle(*existingResource) 00830 .FirstChildElement("ListBucketResult") 00831 .FirstChildElement("Contents") 00832 .ToElement(); 00833 00834 if (foundContent) { 00835 uint64_t size = extract<uint64_t>(foundContent, "Size"); 00836 std::string etag = extract<string>(foundContent, "ETag"); 00837 std::string lastModified = extract<string>(foundContent, "LastModified"); 00838 00839 if (size == dataSize) { 00840 //cerr << "already uploaded" << endl; 00841 return etag; 00842 } 00843 } 00844 } 00845 00846 auto upload = obtainMultiPartUpload(bucket, resource, metadata); 00847 00848 uint64_t partSize = 0; 00849 uint64_t currentOffset = 0; 00850 00851 for (auto & part: upload.parts) { 00852 partSize = std::max(partSize, part.size); 00853 currentOffset = std::max(currentOffset, part.startOffset + part.size); 00854 } 00855 00856 if (partSize == 0) { 00857 if (dataSize < 5 * 1024 * 1024) { 00858 partSize = dataSize; 00859 } 00860 else { 00861 partSize = 8 * 1024 * 1024; 00862 while (dataSize / partSize > 150) { 00863 partSize *= 2; 00864 } 00865 } 00866 } 00867 00868 string uploadId = upload.id; 00869 vector<MultiPartUploadPart> & parts = upload.parts; 00870 00871 uint64_t offset = currentOffset; 00872 for (int i = 0; offset < dataSize; offset += partSize, ++i) { 00873 MultiPartUploadPart part; 00874 part.partNumber = parts.size() + 1; 00875 part.startOffset = offset; 00876 part.size = min<uint64_t>(partSize, dataSize - offset); 00877 parts.push_back(part); 00878 } 00879 // we are dealing with an empty file 00880 if(parts.empty()) 00881 { 00882 MultiPartUploadPart part; 00883 part.partNumber = parts.size() + 1; 00884 part.startOffset = offset; 00885 part.size = min<uint64_t>(partSize, dataSize - offset); 00886 parts.push_back(part); 00887 } 00888 //cerr << "total parts = " << parts.size() << endl; 00889 00890 //if (!foundId) 00891 00892 uint64_t bytesDone = 0; 00893 Date start; 00894 00895 auto touchByte = [] (const char * c) 00896 { 00897 00898 __asm__ 00899 (" # [in]" 00900 : 00901 : [in] "r" (*c) 00902 : 00903 ); 00904 }; 00905 00906 auto touch = [&] (const char * start, size_t size) 00907 { 00908 for (size_t i = 0; i < size; i += 4096) { 00909 touchByte(start + i); 00910 } 00911 }; 00912 00913 int readyPart = 0; 00914 00915 auto doPart = [&] (int i) 00916 { 00917 MultiPartUploadPart & part = parts[i]; 00918 //cerr << "part " << i << " with " << part.size << " bytes" << endl; 00919 00920 // Wait until we're allowed to go 00921 for (;;) { 00922 int isReadyPart = readyPart; 00923 if (isReadyPart >= i) { 00924 break; 00925 } 00926 futex_wait(readyPart, isReadyPart); 00927 } 00928 00929 // First touch the input range 00930 touch(data + part.startOffset, 00931 part.size); 00932 00933 //cerr << "done touching " << i << endl; 00934 00935 // Now let the next one go 00936 ExcAssertEqual(readyPart, i); 00937 ++readyPart; 00938 00939 futex_wake(readyPart); 00940 00941 string md5 = md5HashToHex(data + part.startOffset, 00942 part.size); 00943 00944 if (part.done) { 00945 //cerr << "etag is " << part.etag << endl; 00946 if ('"' + md5 + '"' == part.etag) { 00947 //cerr << "part " << i << " verified done" << endl; 00948 return; 00949 } 00950 } 00951 00952 auto putResult = put(bucket, resource, 00953 ML::format("partNumber=%d&uploadId=%s", 00954 part.partNumber, uploadId), 00955 {}, {}, 00956 S3Api::Content(data 00957 + part.startOffset, 00958 part.size, 00959 md5)); 00960 00961 //cerr << "result of part " << i << " is " 00962 //<< putResult.bodyXmlStr() << endl; 00963 00964 if (putResult.code_ != 200) { 00965 part.etag = "ERROR"; 00966 cerr << putResult.bodyXmlStr() << endl; 00967 throw ML::Exception("put didn't work: %d", (int)putResult.code_); 00968 } 00969 00970 00971 00972 ML::atomic_add(bytesDone, part.size); 00973 double seconds = Date::now().secondsSince(start); 00974 cerr << "done " << bytesDone / 1024 / 1024 << " MB in " 00975 << seconds << " s at " 00976 << bytesDone / 1024.0 / 1024 / seconds 00977 << " MB/second" << endl; 00978 00979 //cerr << putResult.header_ << endl; 00980 00981 string etag = putResult.getHeader("etag"); 00982 00983 //cerr << "etag = " << etag << endl; 00984 00985 part.etag = etag; 00986 }; 00987 00988 int currentPart = 0; 00989 00990 start = Date::now(); 00991 00992 auto doPartThread = [&] () 00993 { 00994 for (;;) { 00995 if (currentPart >= parts.size()) break; 00996 int partToDo = __sync_fetch_and_add(¤tPart, 1); 00997 if (partToDo >= parts.size()) break; 00998 doPart(partToDo); 00999 } 01000 }; 01001 01002 if (numInParallel == -1) 01003 numInParallel = 16; 01004 01005 boost::thread_group tg; 01006 for (unsigned i = 0; i < numInParallel; ++i) 01007 tg.create_thread(doPartThread); 01008 01009 tg.join_all(); 01010 01011 vector<string> etags; 01012 for (unsigned i = 0; i < parts.size(); ++i) { 01013 etags.push_back(parts[i].etag); 01014 } 01015 string finalEtag = finishMultiPartUpload(bucket, resource, uploadId, etags); 01016 return finalEtag; 01017 } 01018 01019 std::string 01020 S3Api:: 01021 upload(const char * data, 01022 size_t bytes, 01023 const std::string & uri, 01024 CheckMethod check, 01025 const ObjectMetadata & metadata, 01026 int numInParallel) 01027 { 01028 string bucket, object; 01029 std::tie(bucket, object) = parseUri(uri); 01030 return upload(data, bytes, bucket, "/" + object, check, metadata, 01031 numInParallel); 01032 } 01033 01034 S3Api::ObjectInfo:: 01035 ObjectInfo() 01036 : size(0), exists(false) 01037 { 01038 } 01039 01040 S3Api::ObjectInfo:: 01041 ObjectInfo(tinyxml2::XMLNode * element) 01042 { 01043 size = extract<uint64_t>(element, "Size"); 01044 key = extract<string>(element, "Key"); 01045 string lastModifiedStr = extract<string>(element, "LastModified"); 01046 lastModified = Date::parseIso8601(lastModifiedStr); 01047 etag = extract<string>(element, "ETag"); 01048 ownerId = extract<string>(element, "Owner/ID"); 01049 ownerName = extractDef<string>(element, "Owner/DisplayName", ""); 01050 storageClass = extract<string>(element, "StorageClass"); 01051 exists = true; 01052 } 01053 01054 void 01055 S3Api:: 01056 forEachObject(const std::string & bucket, 01057 const std::string & prefix, 01058 const OnObject & onObject, 01059 const OnSubdir & onSubdir, 01060 const std::string & delimiter, 01061 int depth) const 01062 { 01063 using namespace tinyxml2; 01064 01065 //cerr << "forEachObject under " << prefix << endl; 01066 01067 string marker; 01068 do { 01069 StrPairVector queryParams; 01070 if (prefix != "") 01071 queryParams.push_back({"prefix", prefix}); 01072 if (delimiter != "") 01073 queryParams.push_back({"delimiter", delimiter}); 01074 if (marker != "") 01075 queryParams.push_back({"marker", marker}); 01076 01077 auto listingResult = get(bucket, "/", 8192, "", 01078 {}, queryParams); 01079 auto listingResultXml = listingResult.bodyXml(); 01080 01081 //listingResultXml->Print(); 01082 01083 string foundPrefix 01084 = extractDef<string>(listingResult, "ListBucketResult/Prefix", ""); 01085 string truncated 01086 = extract<string>(listingResult, "ListBucketResult/IsTruncated"); 01087 if (truncated == "true") { 01088 marker = extract<string>(listingResult, "ListBucketResult/Marker"); 01089 } 01090 else marker = ""; 01091 01092 auto foundObject 01093 = XMLHandle(*listingResultXml) 01094 .FirstChildElement("ListBucketResult") 01095 .FirstChildElement("Contents") 01096 .ToElement(); 01097 01098 for (; onObject && foundObject; 01099 foundObject = foundObject->NextSiblingElement("Contents")) { 01100 ObjectInfo info(foundObject); 01101 01102 ExcAssertEqual(info.key.find(foundPrefix), 0); 01103 string basename(info.key, foundPrefix.length()); 01104 01105 if (!onObject(foundPrefix, basename, info, depth)) 01106 break; 01107 } 01108 01109 auto foundDir 01110 = XMLHandle(*listingResultXml) 01111 .FirstChildElement("ListBucketResult") 01112 .FirstChildElement("CommonPrefixes") 01113 .ToElement(); 01114 01115 for (; onSubdir && foundDir; 01116 foundDir = foundDir->NextSiblingElement("CommonPrefixes")) { 01117 string dirName = extract<string>(foundDir, "Prefix"); 01118 01119 // Strip off the delimiter 01120 if (dirName.rfind(delimiter) == dirName.size() - delimiter.size()) { 01121 dirName = string(dirName, 0, dirName.size() - delimiter.size()); 01122 ExcAssertEqual(dirName.find(prefix), 0); 01123 dirName = string(dirName, prefix.size()); 01124 } 01125 if (onSubdir(foundPrefix, dirName, depth)) { 01126 string newPrefix = foundPrefix + dirName + "/"; 01127 //cerr << "newPrefix = " << newPrefix << endl; 01128 forEachObject(bucket, newPrefix, onObject, onSubdir, delimiter, 01129 depth + 1); 01130 } 01131 } 01132 } while (marker != ""); 01133 01134 //cerr << "done scanning" << endl; 01135 } 01136 01137 S3Api::ObjectInfo 01138 S3Api:: 01139 getObjectInfo(const std::string & bucket, 01140 const std::string & object) const 01141 { 01142 StrPairVector queryParams; 01143 queryParams.push_back({"prefix", object}); 01144 01145 auto listingResult = get(bucket, "/", 8192, "", {}, queryParams); 01146 01147 if (listingResult.code_ != 200) { 01148 cerr << listingResult.bodyXmlStr() << endl; 01149 throw ML::Exception("error getting object"); 01150 } 01151 01152 auto listingResultXml = listingResult.bodyXml(); 01153 01154 auto foundObject 01155 = tinyxml2::XMLHandle(*listingResultXml) 01156 .FirstChildElement("ListBucketResult") 01157 .FirstChildElement("Contents") 01158 .ToElement(); 01159 01160 if (!foundObject) 01161 throw ML::Exception("object " + object + " not found in bucket " 01162 + bucket); 01163 01164 ObjectInfo info(foundObject); 01165 01166 if(info.key != object){ 01167 throw ML::Exception("object " + object + " not found in bucket " 01168 + bucket); 01169 } 01170 01171 01172 return info; 01173 } 01174 01175 S3Api::ObjectInfo 01176 S3Api:: 01177 tryGetObjectInfo(const std::string & bucket, 01178 const std::string & object) const 01179 { 01180 StrPairVector queryParams; 01181 queryParams.push_back({"prefix", object}); 01182 01183 auto listingResult = get(bucket, "/", 8192, "", {}, queryParams); 01184 if (listingResult.code_ != 200) { 01185 cerr << listingResult.bodyXmlStr() << endl; 01186 throw ML::Exception("error getting object request: %d", 01187 listingResult.code_); 01188 } 01189 auto listingResultXml = listingResult.bodyXml(); 01190 01191 auto foundObject 01192 = tinyxml2::XMLHandle(*listingResultXml) 01193 .FirstChildElement("ListBucketResult") 01194 .FirstChildElement("Contents") 01195 .ToElement(); 01196 01197 if (!foundObject) 01198 return ObjectInfo(); 01199 01200 ObjectInfo info(foundObject); 01201 01202 if(info.key != object){ 01203 return ObjectInfo(); 01204 } 01205 01206 return info; 01207 } 01208 01209 S3Api::ObjectInfo 01210 S3Api:: 01211 getObjectInfo(const std::string & uri) const 01212 { 01213 string bucket, object; 01214 std::tie(bucket, object) = parseUri(uri); 01215 return getObjectInfo(bucket, object); 01216 } 01217 01218 S3Api::ObjectInfo 01219 S3Api:: 01220 tryGetObjectInfo(const std::string & uri) const 01221 { 01222 string bucket, object; 01223 std::tie(bucket, object) = parseUri(uri); 01224 return tryGetObjectInfo(bucket, object); 01225 } 01226 01227 void 01228 S3Api:: 01229 download(const std::string & uri, 01230 const OnChunk & onChunk, 01231 ssize_t startOffset, 01232 ssize_t endOffset) const 01233 { 01234 string bucket, object; 01235 std::tie(bucket, object) = parseUri(uri); 01236 return download(bucket, object, onChunk, startOffset, endOffset); 01237 } 01238 01239 void 01240 S3Api:: 01241 download(const std::string & bucket, 01242 const std::string & object, 01243 const OnChunk & onChunk, 01244 ssize_t startOffset, 01245 ssize_t endOffset) const 01246 { 01247 01248 ObjectInfo info = getObjectInfo(bucket, object); 01249 if(info.storageClass == "GLACIER"){ 01250 throw ML::Exception("Cannot download [" + info.key + "] because its " 01251 "storage class is [GLACIER]"); 01252 } 01253 01254 size_t chunkSize = 128 * 1024 * 1024; // 128MB probably good 01255 01256 struct Part { 01257 uint64_t offset; 01258 uint64_t size; 01259 }; 01260 01261 if (endOffset == -1) 01262 endOffset = info.size; 01263 01264 //cerr << "getting " << endOffset << " bytes" << endl; 01265 01266 vector<Part> parts; 01267 01268 for (uint64_t offset = 0; offset < endOffset; offset += chunkSize) { 01269 Part part; 01270 part.offset = offset; 01271 part.size = std::min<ssize_t>(endOffset - offset, chunkSize); 01272 parts.push_back(part); 01273 } 01274 01275 //cerr << "getting in " << parts.size() << " parts" << endl; 01276 01277 uint64_t bytesDone = 0; 01278 Date start; 01279 bool failed = false; 01280 01281 auto doPart = [&] (int i) 01282 { 01283 if (failed) return; 01284 01285 Part & part = parts[i]; 01286 //cerr << "part " << i << " with " << part.size << " bytes" << endl; 01287 01288 StrPairVector headerParams; 01289 headerParams.push_back({"range", 01290 ML::format("bytes=%zd-%zd", 01291 part.offset, 01292 part.offset + part.size - 1)}); 01293 01294 auto partResult = get(bucket, "/" + object, part.size, "", headerParams, {}); 01295 if (partResult.code_ != 206) { 01296 cerr << "error getting part " << i << ": " 01297 << partResult.bodyXmlStr() << endl; 01298 failed = true; 01299 return; 01300 } 01301 01302 ExcAssertEqual(partResult.body_.size(), part.size); 01303 01304 onChunk(partResult.body_.c_str(), 01305 part.size, 01306 i, 01307 part.offset, 01308 info.size); 01309 01310 ML::atomic_add(bytesDone, part.size); 01311 double seconds = Date::now().secondsSince(start); 01312 cerr << "done " << bytesDone / 1024 / 1024 << " MB in " 01313 << seconds << " s at " 01314 << bytesDone / 1024.0 / 1024 / seconds 01315 << " MB/second" << endl; 01316 }; 01317 01318 int currentPart = 0; 01319 01320 start = Date::now(); 01321 01322 auto doPartThread = [&] () 01323 { 01324 for (;;) { 01325 if (currentPart >= parts.size()) break; 01326 int partToDo = __sync_fetch_and_add(¤tPart, 1); 01327 if (partToDo >= parts.size()) break; 01328 doPart(partToDo); 01329 } 01330 }; 01331 01332 boost::thread_group tg; 01333 for (unsigned i = 0; i < 16; ++i) 01334 tg.create_thread(doPartThread); 01335 01336 tg.join_all(); 01337 01338 if (failed) 01339 throw ML::Exception("Failed to get part"); 01340 } 01341 01346 void 01347 S3Api:: 01348 downloadToFile(const std::string & uri, const std::string & outfile, 01349 ssize_t endOffset) const 01350 { 01351 01352 auto info = getObjectInfo(uri); 01353 if (!info){ 01354 throw ML::Exception("unknown s3 object"); 01355 } 01356 if(endOffset == -1 || endOffset > info.size){ 01357 endOffset = info.size; 01358 } 01359 01360 ofstream myFile; 01361 myFile.open(outfile.c_str()); 01362 01363 uint64_t done = 0; 01364 01365 auto onChunk = [&] (const char * data, 01366 size_t size, 01367 int chunkIndex, 01368 uint64_t offset, 01369 uint64_t totalSize){ 01370 ExcAssertEqual(info.size, totalSize); 01371 ExcAssertLessEqual(offset + size, totalSize); 01372 myFile.seekp(offset); 01373 myFile.write(data, size); 01374 ML::atomic_add(done, size); 01375 }; 01376 download(uri, onChunk, 0, endOffset); 01377 } 01378 01379 struct StreamingDownloadSource { 01380 01381 StreamingDownloadSource(const S3Api * owner, 01382 const std::string & bucket, 01383 const std::string & object) 01384 { 01385 impl.reset(new Impl()); 01386 impl->owner = owner; 01387 impl->bucket = bucket; 01388 impl->object = object; 01389 impl->info = owner->getObjectInfo(bucket, object); 01390 impl->chunkSize = 1024 * 1024; // start with 1MB and ramp up 01391 01392 impl->start(); 01393 } 01394 01395 typedef char char_type; 01396 struct category 01397 : //input_seekable, 01398 boost::iostreams::input, 01399 boost::iostreams::device_tag, 01400 boost::iostreams::closable_tag 01401 { }; 01402 01403 struct Impl { 01404 Impl() 01405 : owner(0), offset(0), shutdown(false), 01406 readPartOffset(0), readPartDone(1) 01407 { 01408 } 01409 01410 ~Impl() 01411 { 01412 stop(); 01413 } 01414 01415 const S3Api * owner; 01416 S3Api::ObjectInfo info; 01417 std::string bucket; 01418 std::string object; 01419 size_t offset; 01420 size_t chunkSize; 01421 size_t bytesDone; 01422 bool shutdown; 01423 boost::thread_group tg; 01424 01425 string readPart; 01426 size_t readPartOffset; 01427 01428 Date startDate; 01429 01430 size_t writeOffset, readOffset; 01431 int readPartReady, readPartDone, writePartNumber, allocPartNumber; 01432 01433 01434 void start() 01435 { 01436 readPartOffset = offset = bytesDone = writeOffset 01437 = writePartNumber = allocPartNumber = readOffset = 0; 01438 readPartReady = 0; 01439 readPartDone = 0; 01440 startDate = Date::now(); 01441 for (unsigned i = 0; i < 5; ++i) 01442 tg.create_thread(boost::bind<void>(&Impl::runThread, this)); 01443 } 01444 01445 void stop() 01446 { 01447 shutdown = true; 01448 futex_wake(writePartNumber); 01449 futex_wake(readPartReady); 01450 futex_wake(readPartDone); 01451 tg.join_all(); 01452 } 01453 01454 std::streamsize read(char_type* s, std::streamsize n) 01455 { 01456 if (readOffset == info.size) 01457 return -1; 01458 01459 //Date start = Date::now(); 01460 01461 #if 0 01462 cerr << "read: readPartReady = " << readPartReady 01463 << " readPartDone = " << readPartDone 01464 << " writePartNumber = " << writePartNumber 01465 << " allocPartNumber = " << allocPartNumber 01466 << " readPartOffset = " << readPartOffset 01467 << endl; 01468 #endif 01469 01470 //cerr << "trying to read " << n << " characters at offset " 01471 // << readPartOffset << " of " 01472 // << readPart.size() << endl; 01473 01474 while (readPartDone == readPartReady) { 01475 //cerr << "waiting for part " << readPartDone << endl; 01476 ML::futex_wait(readPartReady, readPartDone); 01477 } 01478 01479 ExcAssertGreaterEqual(readPartReady, readPartDone); 01480 01481 //cerr << "ready to start reading" << endl; 01482 01483 //cerr << "trying to read " << n << " characters at offset " 01484 // << readPartOffset << " of " 01485 // << readPart.size() << endl; 01486 01487 ExcAssertGreaterEqual(readPart.size(), readPartOffset); 01488 01489 size_t toDo = std::min<size_t>(readPart.size() - readPartOffset, 01490 n); 01491 01492 //cerr << "toDo = " << toDo << endl; 01493 01494 std::copy(readPart.c_str() + readPartOffset, 01495 readPart.c_str() + readPartOffset + toDo, 01496 s); 01497 01498 readPartOffset += toDo; 01499 readOffset += toDo; 01500 if (readPartOffset == readPart.size()) { 01501 //cerr << "finished part " << readPartDone << endl; 01502 ++readPartDone; 01503 ML::futex_wake(readPartDone); 01504 } 01505 01506 //Date end = Date::now(); 01507 //double elapsed = end.secondsSince(start); 01508 //if (elapsed > 0.0001) 01509 // cerr << "read elapsed " << elapsed << endl; 01510 01511 return toDo; 01512 } 01513 01514 void runThread() 01515 { 01516 // Maximum chunk size is what we can do in 30 seconds 01517 size_t maxChunkSize 01518 = owner->bandwidthToServiceMbps 01519 * 15.0 * 1000000; 01520 01521 while (!shutdown) { 01522 // Go in the lottery to see which part I need to download 01523 int partToDo = __sync_fetch_and_add(&allocPartNumber, 1); 01524 01525 //cerr << "partToDo = " << partToDo << endl; 01526 01527 // Wait until it's my turn to increment the offset 01528 while (!shutdown) { 01529 int currentWritePart = writePartNumber; 01530 if (currentWritePart == partToDo) break; 01531 ML::futex_wait(writePartNumber, currentWritePart); 01532 } 01533 if (shutdown) return; 01534 01535 //cerr << "ready" << endl; 01536 01537 ExcAssertEqual(writePartNumber, partToDo); 01538 01539 // If we're done then get out 01540 if (writeOffset >= info.size || shutdown) return; // done 01541 01542 if (partToDo && partToDo % 2 == 0 && chunkSize < maxChunkSize) 01543 chunkSize *= 2; 01544 01545 size_t start = writeOffset; 01546 size_t end = std::min<size_t>(writeOffset + chunkSize, 01547 info.size); 01548 01549 writeOffset = end; 01550 01551 // Finished my turn to increment. Wake up the next thread 01552 ++writePartNumber; 01553 futex_wake(writePartNumber); 01554 01555 // Download my part 01556 S3Api::StrPairVector headerParams; 01557 headerParams.push_back({"range", 01558 ML::format("bytes=%zd-%zd", 01559 start, end - 1)}); 01560 01561 //cerr << "downloading" << endl; 01562 01563 auto partResult 01564 = owner->get(bucket, "/" + object, end - start, 01565 "", headerParams, {}); 01566 if (partResult.code_ != 206) { 01567 cerr << "error getting part " 01568 << partResult.bodyXmlStr() << endl; 01569 return; 01570 } 01571 01572 //cerr << "done downloading" << endl; 01573 01574 // Wait until the reader needs my part 01575 while (!shutdown) { 01576 int currentReadPart = readPartDone; 01577 if (currentReadPart == partToDo) break; 01578 ML::futex_wait(readPartDone, currentReadPart); 01579 } 01580 if (shutdown) return; 01581 01582 //cerr << "ready for part " << partToDo << endl; 01583 01584 bytesDone += partResult.body_.size(); 01585 01586 double elapsed = Date::now().secondsSince(startDate); 01587 01588 // Give my part to the reader 01589 readPart = partResult.body(); 01590 readPartOffset = 0; 01591 01592 // Wake up the reader 01593 ++readPartReady; 01594 ML::futex_wake(readPartReady); 01595 01596 cerr << "done " << bytesDone << " at " 01597 << bytesDone / elapsed / 1024 / 1024 01598 << "MB/second" << endl; 01599 } 01600 cerr << "finished thread" << endl; 01601 } 01602 }; 01603 01604 std::shared_ptr<Impl> impl; 01605 01606 std::streamsize read(char_type* s, std::streamsize n) 01607 { 01608 return impl->read(s, n); 01609 } 01610 01611 bool is_open() const 01612 { 01613 return !!impl; 01614 } 01615 01616 void close() 01617 { 01618 impl.reset(); 01619 } 01620 }; 01621 01622 std::auto_ptr<std::streambuf> 01623 S3Api:: 01624 streamingDownload(const std::string & bucket, 01625 const std::string & object, 01626 ssize_t startOffset, 01627 ssize_t endOffset, 01628 const OnChunk & onChunk) const 01629 { 01630 std::auto_ptr<std::streambuf> result; 01631 result.reset(new boost::iostreams::stream_buffer<StreamingDownloadSource> 01632 (StreamingDownloadSource(this, bucket, object), 01633 131072)); 01634 return result; 01635 } 01636 01637 struct StreamingUploadSource { 01638 01639 StreamingUploadSource(const S3Api * owner, 01640 const std::string & bucket, 01641 const std::string & object, 01642 const S3Api::ObjectMetadata & metadata) 01643 { 01644 impl.reset(new Impl()); 01645 impl->owner = owner; 01646 impl->bucket = bucket; 01647 impl->object = object; 01648 impl->metadata = metadata; 01649 impl->chunkSize = 8 * 1024 * 1024; // start with 8MB and ramp up 01650 01651 impl->start(); 01652 } 01653 01654 typedef char char_type; 01655 struct category 01656 : public boost::iostreams::output, 01657 boost::iostreams::device_tag, 01658 boost::iostreams::closable_tag 01659 { 01660 }; 01661 01662 struct Impl { 01663 Impl() 01664 : owner(0), offset(0), chunkIndex(0), shutdown(false), 01665 chunks(16) 01666 { 01667 } 01668 01669 ~Impl() 01670 { 01671 //cerr << "destroying streaming upload at " << object << endl; 01672 stop(); 01673 } 01674 01675 const S3Api * owner; 01676 std::string bucket; 01677 std::string object; 01678 S3Api::ObjectMetadata metadata; 01679 std::string uploadId; 01680 size_t offset; 01681 size_t chunkSize; 01682 size_t chunkIndex; 01683 bool shutdown; 01684 boost::thread_group tg; 01685 01686 Date startDate; 01687 01688 struct Chunk { 01689 01690 void init(uint64_t offset, size_t capacity, int index) 01691 { 01692 this->offset = offset; 01693 this->size = 0; 01694 this->capacity = capacity; 01695 this->index = index; 01696 this->data = new char[capacity]; 01697 } 01698 01699 size_t append(const char * input, size_t n) 01700 { 01701 size_t todo = std::min(n, capacity - size); 01702 std::copy(input, input + todo, data + size); 01703 size += todo; 01704 return todo; 01705 } 01706 01707 char * data; 01708 size_t size; 01709 size_t capacity; 01710 int index; 01711 uint64_t offset; 01712 }; 01713 01714 Chunk current; 01715 01716 RingBufferSWMR<Chunk> chunks; 01717 01718 boost::mutex etagsLock; 01719 std::vector<std::string> etags; 01720 std::exception_ptr exc; 01721 01722 void start() 01723 { 01724 auto upload = owner->obtainMultiPartUpload(bucket, "/" + object, metadata); 01725 01726 uploadId = upload.id; 01727 //cerr << "uploadId = " << uploadId << endl; 01728 01729 startDate = Date::now(); 01730 for (unsigned i = 0; i < 8; ++i) 01731 tg.create_thread(boost::bind<void>(&Impl::runThread, this)); 01732 current.init(0, chunkSize, 0); 01733 } 01734 01735 void stop() 01736 { 01737 shutdown = true; 01738 tg.join_all(); 01739 } 01740 01741 std::streamsize write(const char_type* s, std::streamsize n) 01742 { 01743 if (exc) 01744 std::rethrow_exception(exc); 01745 01746 size_t done = current.append(s, n); 01747 offset += done; 01748 if (done < n) { 01749 flush(); 01750 done = current.append(s + done, n - done); 01751 } 01752 01753 //cerr << "writing " << n << " characters returned " 01754 // << done << endl; 01755 01756 if (exc) 01757 std::rethrow_exception(exc); 01758 01759 return done; 01760 } 01761 01762 void flush() 01763 { 01764 if (current.size == 0) return; 01765 chunks.push(current); 01766 ++chunkIndex; 01767 01768 // Get bigger for bigger files 01769 if (chunkIndex % 5 == 0 && chunkSize < 64 * 1024 * 1024) 01770 chunkSize *= 2; 01771 01772 current.init(offset, chunkSize, chunkIndex); 01773 } 01774 01775 void finish() 01776 { 01777 if (exc) 01778 std::rethrow_exception(exc); 01779 //cerr << "pushing last chunk " << chunkIndex << endl; 01780 flush(); 01781 //cerr << "waiting for everything to stop" << endl; 01782 chunks.waitUntilEmpty(); 01783 //cerr << "empty" << endl; 01784 stop(); 01785 //cerr << "stopped" << endl; 01786 string etag = owner->finishMultiPartUpload(bucket, "/" + object, 01787 uploadId, 01788 etags); 01789 //cerr << "final etag is " << etag << endl; 01790 01791 if (exc) 01792 std::rethrow_exception(exc); 01793 01794 double elapsed = Date::now().secondsSince(startDate); 01795 01796 cerr << "uploaded " << offset / 1024.0 / 1024.0 01797 << "MB in " << elapsed << "s at " 01798 << offset / 1024.0 / 1024.0 / elapsed 01799 << "MB/s" << " to " << etag << endl; 01800 } 01801 01802 void runThread() 01803 { 01804 while (!shutdown) { 01805 Chunk chunk; 01806 if (chunks.tryPop(chunk, 0.01)) { 01807 if (exc) 01808 return; 01809 try { 01810 //cerr << "got chunk " << chunk.index 01811 // << " with " << chunk.size << " bytes at index " 01812 // << chunk.index << endl; 01813 01814 // Upload the data 01815 string md5 = md5HashToHex(chunk.data, chunk.size); 01816 01817 auto putResult = owner->put(bucket, "/" + object, 01818 ML::format("partNumber=%d&uploadId=%s", 01819 chunk.index + 1, uploadId), 01820 {}, {}, 01821 S3Api::Content(chunk.data, 01822 chunk.size, 01823 md5)); 01824 if (putResult.code_ != 200) { 01825 cerr << putResult.bodyXmlStr() << endl; 01826 01827 throw ML::Exception("put didn't work: %d", (int)putResult.code_); 01828 } 01829 string etag = putResult.getHeader("etag"); 01830 //cerr << "successfully uploaded part " << chunk.index 01831 // << " with etag " << etag << endl; 01832 01833 boost::unique_lock<boost::mutex> guard(etagsLock); 01834 while (etags.size() <= chunk.index) 01835 etags.push_back(""); 01836 etags[chunk.index] = etag; 01837 } catch (...) { 01838 // Capture exception to be thrown later 01839 exc = std::current_exception(); 01840 } 01841 } 01842 } 01843 } 01844 }; 01845 01846 std::shared_ptr<Impl> impl; 01847 01848 std::streamsize write(const char_type* s, std::streamsize n) 01849 { 01850 return impl->write(s, n); 01851 } 01852 01853 bool is_open() const 01854 { 01855 return !!impl; 01856 } 01857 01858 void close() 01859 { 01860 impl->finish(); 01861 impl.reset(); 01862 } 01863 }; 01864 01865 std::auto_ptr<std::streambuf> 01866 S3Api:: 01867 streamingUpload(const std::string & uri, 01868 const ObjectMetadata & metadata) const 01869 { 01870 string bucket, object; 01871 std::tie(bucket, object) = parseUri(uri); 01872 return streamingUpload(bucket, object, metadata); 01873 } 01874 01875 std::auto_ptr<std::streambuf> 01876 S3Api:: 01877 streamingUpload(const std::string & bucket, 01878 const std::string & object, 01879 const ObjectMetadata & metadata) const 01880 { 01881 std::auto_ptr<std::streambuf> result; 01882 result.reset(new boost::iostreams::stream_buffer<StreamingUploadSource> 01883 (StreamingUploadSource(this, bucket, object, metadata), 01884 131072)); 01885 return result; 01886 } 01887 01888 std::pair<std::string, std::string> 01889 S3Api:: 01890 parseUri(const std::string & uri) 01891 { 01892 if (uri.find("s3://") != 0) 01893 throw ML::Exception("wrong scheme (should start with s3://)"); 01894 string pathPart(uri, 5); 01895 string::size_type pos = pathPart.find('/'); 01896 if (pos == string::npos) 01897 throw ML::Exception("couldn't find bucket name"); 01898 string bucket(pathPart, 0, pos); 01899 string object(pathPart, pos + 1); 01900 01901 return make_pair(bucket, object); 01902 } 01903 01904 std::auto_ptr<std::streambuf> 01905 S3Api:: 01906 streamingDownload(const std::string & uri, 01907 ssize_t startOffset, 01908 ssize_t endOffset, 01909 const OnChunk & onChunk) const 01910 { 01911 string bucket, object; 01912 std::tie(bucket, object) = parseUri(uri); 01913 01914 //cerr << "bucket = " << bucket << " object = " << object << endl; 01915 01916 return streamingDownload(bucket, object, startOffset, endOffset, onChunk); 01917 } 01918 01919 void 01920 S3Handle:: 01921 initS3(const std::string & accessKeyId, 01922 const std::string & accessKey, 01923 const std::string & uriPrefix) 01924 { 01925 s3.init(accessKeyId, accessKey); 01926 this->s3UriPrefix = uriPrefix; 01927 } 01928 01929 size_t 01930 S3Handle:: 01931 getS3Buffer(const std::string & filename, char** outBuffer){ 01932 auto stats = s3.getObjectInfo(filename); 01933 if (!stats) 01934 throw ML::Exception("unknown s3 object"); 01935 01936 *outBuffer = new char[stats.size]; 01937 01938 uint64_t done = 0; 01939 01940 auto onChunk = [&] (const char * data, 01941 size_t size, 01942 int chunkIndex, 01943 uint64_t offset, 01944 uint64_t totalSize) 01945 { 01946 ExcAssertEqual(stats.size, totalSize); 01947 ExcAssertLessEqual(offset + size, totalSize); 01948 std::copy(data, data + size, *outBuffer + offset); 01949 ML::atomic_add(done, size); 01950 }; 01951 01952 s3.download(filename, onChunk); 01953 01954 ExcAssertEqual(done, stats.size); 01955 01956 cerr << "done downloading " << stats.size << " bytes from " 01957 << filename << endl; 01958 01959 return stats.size; 01960 01961 } 01962 01963 bool 01964 S3Api:: 01965 forEachBucket(const OnBucket & onBucket) const 01966 { 01967 using namespace tinyxml2; 01968 01969 //cerr << "forEachObject under " << prefix << endl; 01970 01971 auto listingResult = get("", "/", 8192, ""); 01972 auto listingResultXml = listingResult.bodyXml(); 01973 01974 //listingResultXml->Print(); 01975 01976 auto foundBucket 01977 = XMLHandle(*listingResultXml) 01978 .FirstChildElement("ListAllMyBucketsResult") 01979 .FirstChildElement("Buckets") 01980 .FirstChildElement("Bucket") 01981 .ToElement(); 01982 01983 for (; onBucket && foundBucket; 01984 foundBucket = foundBucket->NextSiblingElement("Bucket")) { 01985 01986 string foundName 01987 = extract<string>(foundBucket, "Name"); 01988 if (!onBucket(foundName)) 01989 return false; 01990 } 01991 01992 return true; 01993 } 01994 01995 void 01996 S3Api:: 01997 uploadRecursive(string dirSrc, string bucketDest, bool includeDir){ 01998 using namespace boost::filesystem; 01999 path targetDir(dirSrc); 02000 if(!is_directory(targetDir)){ 02001 throw ML::Exception("%s is not a directory", dirSrc.c_str()); 02002 } 02003 recursive_directory_iterator it(targetDir), itEnd; 02004 int toTrim = includeDir ? 0 : dirSrc.length() + 1; 02005 for(; it != itEnd; it ++){ 02006 if(!is_directory(*it)){ 02007 string path = it->path().string(); 02008 ML::File_Read_Buffer frb(path); 02009 size_t size = file_size(path); 02010 if(toTrim){ 02011 path = path.substr(toTrim); 02012 } 02013 upload(frb.start(), size, "s3://" + bucketDest + "/" + path); 02014 } 02015 } 02016 } 02017 02018 void S3Api::setDefaultBandwidthToServiceMbps(double mbps){ 02019 S3Api::defaultBandwidthToServiceMbps = mbps; 02020 } 02021 02022 namespace { 02023 02024 struct S3BucketInfo { 02025 std::string s3Bucket; 02026 std::shared_ptr<S3Api> api; //< Used to access this uri 02027 }; 02028 02029 std::mutex s3BucketsLock; 02030 std::unordered_map<std::string, S3BucketInfo> s3Buckets; 02031 02032 } // file scope 02033 02038 void registerS3Bucket(const std::string & bucketName, 02039 const std::string & accessKeyId, 02040 const std::string & accessKey, 02041 double bandwidthToServiceMbps, 02042 const std::string & protocol, 02043 const std::string & serviceUri) 02044 { 02045 std::unique_lock<std::mutex> guard(s3BucketsLock); 02046 if (s3Buckets.count(bucketName)){ 02047 throw BucketAlreadyRegistered(bucketName); 02048 } 02049 02050 S3BucketInfo info; 02051 info.s3Bucket = bucketName; 02052 info.api = std::make_shared<S3Api>(accessKeyId, accessKey, 02053 bandwidthToServiceMbps, 02054 protocol, serviceUri); 02055 02056 s3Buckets[bucketName] = info; 02057 } 02058 02059 struct RegisterS3Handler { 02060 static std::pair<std::streambuf *, bool> 02061 getS3Handler(const std::string & scheme, 02062 const std::string & resource, 02063 std::ios_base::open_mode mode) 02064 { 02065 string::size_type pos = resource.find('/'); 02066 if (pos == string::npos) 02067 throw ML::Exception("unable to find s3 bucket name in resource " 02068 + resource); 02069 string bucket(resource, 0, pos); 02070 02071 std::shared_ptr<S3Api> api; 02072 02073 { 02074 std::unique_lock<std::mutex> guard(s3BucketsLock); 02075 auto it = s3Buckets.find(bucket); 02076 if (it == s3Buckets.end()) 02077 throw ML::Exception("unregistered s3 bucket " + bucket); 02078 api = it->second.api; 02079 } 02080 02081 ExcAssert(api); 02082 02083 if (mode == ios::in) { 02084 return make_pair(api->streamingDownload("s3://" + resource) 02085 .release(), 02086 true); 02087 } 02088 else if (mode == ios::out) { 02089 return make_pair(api->streamingUpload("s3://" + resource) 02090 .release(), 02091 true); 02092 } 02093 else throw ML::Exception("no way to create s3 handler for non in/out"); 02094 } 02095 02096 RegisterS3Handler() 02097 { 02098 ML::registerUriHandler("s3", getS3Handler); 02099 } 02100 02101 } registerS3Handler; 02102 02103 void registerS3Buckets(const std::string & accessKeyId, 02104 const std::string & accessKey, 02105 double bandwidthToServiceMbps, 02106 const std::string & protocol, 02107 const std::string & serviceUri) 02108 { 02109 std::unique_lock<std::mutex> guard(s3BucketsLock); 02110 02111 auto api = std::make_shared<S3Api>(accessKeyId, accessKey, 02112 bandwidthToServiceMbps, 02113 protocol, serviceUri); 02114 02115 auto onBucket = [&] (const std::string & bucketName) 02116 { 02117 //cerr << "got bucket " << bucketName << endl; 02118 02119 if (s3Buckets.count(bucketName)){ 02120 throw BucketAlreadyRegistered(bucketName); 02121 } 02122 02123 S3BucketInfo info; 02124 info.s3Bucket = bucketName; 02125 info.api = api; 02126 s3Buckets[bucketName] = info; 02127 02128 return true; 02129 }; 02130 02131 api->forEachBucket(onBucket); 02132 } 02133 02134 std::shared_ptr<S3Api> getS3ApiForBucket(const std::string & bucketName) 02135 { 02136 std::unique_lock<std::mutex> guard(s3BucketsLock); 02137 auto it = s3Buckets.find(bucketName); 02138 if (it == s3Buckets.end()) 02139 throw ML::Exception("unregistered s3 bucket " + bucketName); 02140 return it->second.api; 02141 } 02142 02143 // Return an URI for either a file or an s3 object 02144 size_t getUriSize(const std::string & filename) 02145 { 02146 if (filename.find("s3://") == 0) { 02147 string bucket = S3Api::parseUri(filename).first; 02148 auto api = getS3ApiForBucket(bucket); 02149 return api->getObjectInfo(filename).size; 02150 } 02151 else { 02152 struct stat stats; 02153 int res = stat(filename.c_str(), &stats); 02154 if (res == -1) 02155 throw ML::Exception("error getting stats file"); 02156 return stats.st_size; 02157 } 02158 } 02159 02160 } // namespace Datacratic