![]() |
RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* s3.h -*- C++ -*- 00002 Jeremy Barnes, 3 July 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 Class to deal with doing s3. 00006 */ 00007 00008 #pragma once 00009 00010 #include <string> 00011 #include <vector> 00012 #include <map> 00013 #include "jml/arch/exception.h" 00014 #include "jml/utils/unnamed_bool.h" 00015 #include "tinyxml2/tinyxml2.h" 00016 #include "soa/service/http_endpoint.h" 00017 #include <memory> 00018 00019 namespace Datacratic { 00020 00021 struct S3Api { 00025 static std::string sign(const std::string & stringToSign, 00026 const std::string & accessKey); 00027 00032 static double defaultBandwidthToServiceMbps; 00033 00034 S3Api(); 00035 00037 S3Api(const std::string & accessKeyId, 00038 const std::string & accessKey, 00039 double bandwidthToServiceMbps = defaultBandwidthToServiceMbps, 00040 const std::string & defaultProtocol = "http", 00041 const std::string & serviceUri = "s3.amazonaws.com"); 00042 00044 void init(const std::string & accessKeyId, 00045 const std::string & accessKey, 00046 double bandwidthToServiceMbps = defaultBandwidthToServiceMbps, 00047 const std::string & defaultProtocol = "http", 00048 const std::string & serviceUri = "s3.amazonaws.com"); 00049 00050 std::string accessKeyId; 00051 std::string accessKey; 00052 std::string defaultProtocol; 00053 std::string serviceUri; 00054 double bandwidthToServiceMbps; 00055 00056 typedef std::vector<std::pair<std::string, std::string> > StrPairVector; 00057 00058 struct Content { 00059 Content() 00060 : data(0), size(0), hasContent(false) 00061 { 00062 } 00063 00064 Content(const char * data, uint64_t size, 00065 const std::string & contentType = "", 00066 const std::string & contentMd5 = "") 00067 : data(data), size(size), hasContent(true), 00068 contentType(contentType), contentMd5(contentMd5) 00069 { 00070 } 00071 00072 Content(const tinyxml2::XMLDocument & xml); 00073 00074 const char * data; 00075 uint64_t size; 00076 bool hasContent; 00077 00078 std::string str; 00079 std::string contentType; 00080 std::string contentMd5; 00081 }; 00082 00084 struct RequestParams { 00085 00086 RequestParams() 00087 : expectedBytesToDownload(0) 00088 { 00089 } 00090 00091 std::string verb; 00092 std::string bucket; 00093 std::string resource; 00094 std::string subResource; 00095 std::string date; 00096 00097 std::string contentType; 00098 std::string contentMd5; 00099 Content content; 00100 uint64_t expectedBytesToDownload; 00101 00102 StrPairVector headers; 00103 StrPairVector queryParams; 00104 }; 00105 00107 struct Response { 00108 Response() 00109 : code_(0) 00110 { 00111 } 00112 00113 std::string body() const 00114 { 00115 if (code_ < 200 || code_ >= 300) 00116 throw ML::Exception("invalid http code returned"); 00117 return body_; 00118 } 00119 00120 std::unique_ptr<tinyxml2::XMLDocument> bodyXml() const 00121 { 00122 std::unique_ptr<tinyxml2::XMLDocument> result(new tinyxml2::XMLDocument()); 00123 result->Parse(body_.c_str()); 00124 return result; 00125 } 00126 00127 operator std::unique_ptr<tinyxml2::XMLDocument>() const 00128 { 00129 return bodyXml(); 00130 } 00131 00132 std::string bodyXmlStr() const 00133 { 00134 auto x = bodyXml(); 00135 tinyxml2::XMLPrinter printer; 00136 x->Print(&printer); 00137 return printer.CStr(); 00138 } 00139 00140 std::string getHeader(const std::string & name) const 00141 { 00142 auto it = header_.headers.find(name); 00143 if (it == header_.headers.end()) 00144 throw ML::Exception("required header " + name + " not found"); 00145 return it->second; 00146 } 00147 00148 long code_; 00149 std::string body_; 00150 HttpHeader header_; 00151 }; 00152 00153 enum Redundancy { 00154 REDUNDANCY_STANDARD, 00155 REDUNDANCY_REDUCED, 00156 REDUNDANCY_GLACIER 00157 }; 00158 00159 enum ServerSideEncryption { 00160 SSE_NONE, 00161 SSE_AES256 00162 }; 00163 00164 struct ObjectMetadata { 00165 ObjectMetadata() 00166 : redundancy(REDUNDANCY_STANDARD), 00167 serverSideEncryption(SSE_NONE) 00168 { 00169 } 00170 00171 ObjectMetadata(const Redundancy & redundancy) 00172 : redundancy(redundancy), 00173 serverSideEncryption(SSE_NONE) 00174 { 00175 } 00176 00177 std::vector<std::pair<std::string, std::string> > 00178 getRequestHeaders() const; 00179 00180 Redundancy redundancy; 00181 ServerSideEncryption serverSideEncryption; 00182 std::string contentType; 00183 std::string contentEncoding; 00184 std::map<std::string, std::string> metadata; 00185 }; 00186 00188 struct SignedRequest : public RequestParams { 00189 RequestParams params; 00190 std::string auth; 00191 std::string uri; 00192 double bandwidthToServiceMbps; 00193 00195 Response performSync() const; 00196 }; 00197 00199 std::string signature(const RequestParams & request) const; 00200 00202 SignedRequest prepare(const RequestParams & request) const; 00203 00205 Response get(const std::string & bucket, 00206 const std::string & resource, 00207 uint64_t expectedBytesToTransfer, 00208 const std::string & subResource = "", 00209 const StrPairVector & headers = StrPairVector(), 00210 const StrPairVector & queryParams = StrPairVector()) const; 00211 00212 00214 Response post(const std::string & bucket, 00215 const std::string & resource, 00216 const std::string & subResource = "", 00217 const StrPairVector & headers = StrPairVector(), 00218 const StrPairVector & queryParams = StrPairVector(), 00219 const Content & content = Content()) const; 00220 00222 Response put(const std::string & bucket, 00223 const std::string & resource, 00224 const std::string & subResource = "", 00225 const StrPairVector & headers = StrPairVector(), 00226 const StrPairVector & queryParams = StrPairVector(), 00227 const Content & content = Content()) const; 00228 00230 Response erase(const std::string & bucket, 00231 const std::string & resource, 00232 const std::string & subResource = "", 00233 const StrPairVector & headers = StrPairVector(), 00234 const StrPairVector & queryParams = StrPairVector(), 00235 const Content & content = Content()) const; 00236 00237 00238 enum CheckMethod { 00239 CM_SIZE, 00240 CM_MD5_ETAG, 00241 CM_ASSUME_INVALID 00242 }; 00243 00252 std::string upload(const char * data, 00253 size_t bytes, 00254 const std::string & bucket, 00255 const std::string & resource, 00256 CheckMethod check = CM_SIZE, 00257 const ObjectMetadata & md = ObjectMetadata(), 00258 int numInParallel = -1); 00259 00260 std::string upload(const char * data, 00261 size_t bytes, 00262 const std::string & uri, 00263 CheckMethod check = CM_SIZE, 00264 const ObjectMetadata & md = ObjectMetadata(), 00265 int numInParallel = -1); 00266 00267 typedef std::function<void (const char * chunk, 00268 size_t size, 00269 int chunkIndex, 00270 uint64_t offset, 00271 uint64_t totalSize) > 00272 OnChunk; 00273 00275 static OnChunk writeToFile(const std::string & filename); 00276 00282 void download(const std::string & bucket, 00283 const std::string & object, 00284 const OnChunk & onChunk, 00285 ssize_t startOffset = 0, 00286 ssize_t endOffset = -1) const; 00287 00288 void download(const std::string & uri, 00289 const OnChunk & onChunk, 00290 ssize_t startOffset = 0, 00291 ssize_t endOffset = -1) const; 00292 00293 void downloadToFile(const std::string & uri, 00294 const std::string & outfile, 00295 ssize_t endOffset = -1) const; 00296 00297 struct StreamingDownloadStreambuf; 00298 00303 std::auto_ptr<std::streambuf> 00304 streamingDownload(const std::string & bucket, 00305 const std::string & object, 00306 ssize_t startOffset = 0, 00307 ssize_t endOffset = -1, 00308 const OnChunk & onChunk = OnChunk()) const; 00309 00314 std::auto_ptr<std::streambuf> 00315 streamingDownload(const std::string & uri, 00316 ssize_t startOffset = 0, 00317 ssize_t endOffset = -1, 00318 const OnChunk & onChunk = OnChunk()) const; 00319 00321 std::auto_ptr<std::streambuf> 00322 streamingUpload(const std::string & uri, 00323 const ObjectMetadata & md = ObjectMetadata()) const; 00324 00326 std::auto_ptr<std::streambuf> 00327 streamingUpload(const std::string & bucket, 00328 const std::string & object, 00329 const ObjectMetadata & md = ObjectMetadata()) const; 00330 00331 struct ObjectInfo { 00332 ObjectInfo(); 00333 ObjectInfo(tinyxml2::XMLNode * element); 00334 00335 JML_IMPLEMENT_OPERATOR_BOOL(exists); 00336 00337 std::string key; 00338 uint64_t size; 00339 bool exists; 00340 std::string etag; 00341 std::string ownerId; 00342 std::string ownerName; 00343 Date lastModified; 00344 std::string storageClass; 00345 }; 00346 00347 typedef std::function<bool (const std::string & prefix, 00348 const std::string & objectName, 00349 const ObjectInfo & info, 00350 int depth)> 00351 OnObject; 00352 00353 typedef std::function<bool (const std::string & prefix, 00354 const std::string & dirName, 00355 int depth)> 00356 OnSubdir; 00357 00361 void forEachObject(const std::string & bucket, 00362 const std::string & prefix = "", 00363 const OnObject & onObject = OnObject(), 00364 const OnSubdir & onSubdir = OnSubdir(), 00365 const std::string & delimiter = "/", 00366 int depth = 1) const; 00367 00369 ObjectInfo tryGetObjectInfo(const std::string & bucket, 00370 const std::string & object) const; 00371 00372 ObjectInfo tryGetObjectInfo(const std::string & uri) const; 00373 00374 00378 ObjectInfo getObjectInfo(const std::string & bucket, 00379 const std::string & object) const; 00380 00381 ObjectInfo getObjectInfo(const std::string & uri) const; 00382 00383 typedef std::function<bool (std::string bucket)> OnBucket; 00384 00388 bool forEachBucket(const OnBucket & bucket) const; 00389 00397 static std::string 00398 getDigestMulti(const std::string & verb, 00399 const std::string & bucket, 00400 const std::string & resource, 00401 const std::string & subResource, 00402 const std::string & contentType, 00403 const std::string & contentMd5, 00404 const std::string & date, 00405 const std::vector<std::pair<std::string, std::string> > & headers); 00406 00413 static std::string 00414 getDigest(const std::string & verb, 00415 const std::string & bucket, 00416 const std::string & resource, 00417 const std::string & subResource, 00418 const std::string & contentType, 00419 const std::string & contentMd5, 00420 const std::string & date, 00421 const std::map<std::string, std::string> & headers); 00422 00424 static std::pair<std::string, std::string> 00425 parseUri(const std::string & uri); 00426 00427 struct MultiPartUploadPart { 00428 MultiPartUploadPart() 00429 : partNumber(0), done(false) 00430 { 00431 } 00432 00433 int partNumber; 00434 uint64_t startOffset; 00435 uint64_t size; 00436 std::string lastModified; 00437 std::string contentMd5; 00438 std::string etag; 00439 bool done; 00440 00441 void fromXml(tinyxml2::XMLElement * element); 00442 }; 00443 00444 struct MultiPartUpload { 00445 std::string id; 00446 std::vector<MultiPartUploadPart> parts; 00447 }; 00448 00450 MultiPartUpload 00451 obtainMultiPartUpload(const std::string & bucket, 00452 const std::string & resource, 00453 const ObjectMetadata & metadata) const; 00454 00455 std::string 00456 finishMultiPartUpload(const std::string & bucket, 00457 const std::string & resource, 00458 const std::string & uploadId, 00459 const std::vector<std::string> & etags) const; 00460 00461 void uploadRecursive(std::string dirSrc, 00462 std::string bucketDest, 00463 bool includeDir); 00464 00465 //easy handle for v8 wrapping 00466 void setDefaultBandwidthToServiceMbps(double mpbs); 00467 00468 }; 00469 00471 struct S3IStream : public std::istream { 00472 S3IStream(); 00473 S3IStream(const S3Api & s3, const std::string & uri); 00474 00475 void open(const std::string & uri); 00476 void open(const std::string & bucket, 00477 const std::string & object); 00478 }; 00479 00480 struct S3Handle{ 00481 S3Api s3; 00482 std::string s3UriPrefix; 00483 00484 void initS3(const std::string & accessKeyId, 00485 const std::string & accessKey, 00486 const std::string & uriPrefix); 00487 00488 size_t getS3Buffer(const std::string & filename, char** outBuffer); 00489 }; 00490 00495 class BucketAlreadyRegistered : public ML::Exception{ 00496 public: 00497 BucketAlreadyRegistered(const std::string & bucketName) : 00498 ML::Exception("s3 bucket %s already registered", 00499 bucketName.c_str()){} 00500 }; 00501 00502 void registerS3Bucket(const std::string & bucketName, 00503 const std::string & accessKeyId, 00504 const std::string & accessKey, 00505 double bandwidthToServiceMbps = S3Api::defaultBandwidthToServiceMbps, 00506 const std::string & protocol = "http", 00507 const std::string & serviceUri = "s3.amazonaws.com"); 00508 00514 void registerS3Buckets(const std::string & accessKeyId, 00515 const std::string & accessKey, 00516 double bandwidthToServiceMbps = S3Api::defaultBandwidthToServiceMbps, 00517 const std::string & protocol = "http", 00518 const std::string & serviceUri = "s3.amazonaws.com"); 00519 00520 std::shared_ptr<S3Api> getS3ApiForBucket(const std::string & bucketName); 00521 00522 // Return an URI for either a file or an s3 object 00523 size_t getUriSize(const std::string & filename); 00524 00525 } // namespace Datacratic