RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/s3.h
00001 /* s3.h                                                            -*- C++ -*-
00002    Jeremy Barnes, 3 July 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005    Class to deal with doing s3.
00006 */
00007 
00008 #pragma once
00009 
00010 #include <string>
00011 #include <vector>
00012 #include <map>
00013 #include "jml/arch/exception.h"
00014 #include "jml/utils/unnamed_bool.h"
00015 #include "tinyxml2/tinyxml2.h"
00016 #include "soa/service/http_endpoint.h"
00017 #include <memory>
00018 
00019 namespace Datacratic {
00020 
00021 struct S3Api {
00025     static std::string sign(const std::string & stringToSign,
00026                             const std::string & accessKey);
00027 
00032     static double defaultBandwidthToServiceMbps;
00033 
00034     S3Api();
00035 
00037     S3Api(const std::string & accessKeyId,
00038           const std::string & accessKey,
00039           double bandwidthToServiceMbps = defaultBandwidthToServiceMbps,
00040           const std::string & defaultProtocol = "http",
00041           const std::string & serviceUri = "s3.amazonaws.com");
00042 
00044     void init(const std::string & accessKeyId,
00045               const std::string & accessKey,
00046               double bandwidthToServiceMbps = defaultBandwidthToServiceMbps,
00047               const std::string & defaultProtocol = "http",
00048               const std::string & serviceUri = "s3.amazonaws.com");
00049 
00050     std::string accessKeyId;
00051     std::string accessKey;
00052     std::string defaultProtocol;
00053     std::string serviceUri;
00054     double bandwidthToServiceMbps;
00055 
00056     typedef std::vector<std::pair<std::string, std::string> > StrPairVector;
00057 
00058     struct Content {
00059         Content()
00060             : data(0), size(0), hasContent(false)
00061         {
00062         }
00063 
00064         Content(const char * data, uint64_t size,
00065                 const std::string & contentType = "",
00066                 const std::string & contentMd5 = "")
00067             : data(data), size(size), hasContent(true),
00068               contentType(contentType), contentMd5(contentMd5)
00069         {
00070         }
00071 
00072         Content(const tinyxml2::XMLDocument & xml);
00073 
00074         const char * data;
00075         uint64_t size;
00076         bool hasContent;
00077 
00078         std::string str;
00079         std::string contentType;
00080         std::string contentMd5;
00081     };
00082 
00084     struct RequestParams {
00085 
00086         RequestParams()
00087             : expectedBytesToDownload(0)
00088         {
00089         }
00090 
00091         std::string verb;
00092         std::string bucket;
00093         std::string resource;
00094         std::string subResource;
00095         std::string date;
00096 
00097         std::string contentType;
00098         std::string contentMd5;
00099         Content content;
00100         uint64_t expectedBytesToDownload;
00101 
00102         StrPairVector headers;
00103         StrPairVector queryParams;
00104     };
00105 
00107     struct Response {
00108         Response()
00109             : code_(0)
00110         {
00111         }
00112 
00113         std::string body() const
00114         {
00115             if (code_ < 200 || code_ >= 300)
00116                 throw ML::Exception("invalid http code returned");
00117             return body_;
00118         }
00119 
00120         std::unique_ptr<tinyxml2::XMLDocument> bodyXml() const
00121         {
00122             std::unique_ptr<tinyxml2::XMLDocument> result(new tinyxml2::XMLDocument());
00123             result->Parse(body_.c_str());
00124             return result;
00125         }
00126 
00127         operator std::unique_ptr<tinyxml2::XMLDocument>() const
00128         {
00129             return bodyXml();
00130         }
00131 
00132         std::string bodyXmlStr() const
00133         {
00134             auto x = bodyXml();
00135             tinyxml2::XMLPrinter printer;
00136             x->Print(&printer);
00137             return printer.CStr();
00138         }
00139 
00140         std::string getHeader(const std::string & name) const
00141         {
00142             auto it = header_.headers.find(name);
00143             if (it == header_.headers.end())
00144                 throw ML::Exception("required header " + name + " not found");
00145             return it->second;
00146         }
00147 
00148         long code_;
00149         std::string body_;
00150         HttpHeader header_;
00151     };
00152 
00153     enum Redundancy {
00154         REDUNDANCY_STANDARD,
00155         REDUNDANCY_REDUCED,
00156         REDUNDANCY_GLACIER
00157     };
00158 
00159     enum ServerSideEncryption {
00160         SSE_NONE,
00161         SSE_AES256
00162     };
00163 
00164     struct ObjectMetadata {
00165         ObjectMetadata()
00166             : redundancy(REDUNDANCY_STANDARD),
00167               serverSideEncryption(SSE_NONE)
00168         {
00169         }
00170 
00171         ObjectMetadata(const Redundancy & redundancy)
00172             : redundancy(redundancy),
00173               serverSideEncryption(SSE_NONE)
00174         {
00175         }
00176 
00177         std::vector<std::pair<std::string, std::string> >
00178         getRequestHeaders() const;
00179 
00180         Redundancy redundancy;
00181         ServerSideEncryption serverSideEncryption;
00182         std::string contentType;
00183         std::string contentEncoding;
00184         std::map<std::string, std::string> metadata;
00185     };
00186 
00188     struct SignedRequest : public RequestParams {
00189         RequestParams params;
00190         std::string auth;
00191         std::string uri;
00192         double bandwidthToServiceMbps;
00193 
00195         Response performSync() const;
00196     };
00197 
00199     std::string signature(const RequestParams & request) const;
00200 
00202     SignedRequest prepare(const RequestParams & request) const;
00203 
00205     Response get(const std::string & bucket,
00206                  const std::string & resource,
00207                  uint64_t expectedBytesToTransfer,
00208                  const std::string & subResource = "",
00209                  const StrPairVector & headers = StrPairVector(),
00210                  const StrPairVector & queryParams = StrPairVector()) const;
00211 
00212 
00214     Response post(const std::string & bucket,
00215                   const std::string & resource,
00216                   const std::string & subResource = "",
00217                   const StrPairVector & headers = StrPairVector(),
00218                   const StrPairVector & queryParams = StrPairVector(),
00219                   const Content & content = Content()) const;
00220 
00222     Response put(const std::string & bucket,
00223                  const std::string & resource,
00224                  const std::string & subResource = "",
00225                  const StrPairVector & headers = StrPairVector(),
00226                  const StrPairVector & queryParams = StrPairVector(),
00227                  const Content & content = Content()) const;
00228 
00230      Response erase(const std::string & bucket,
00231                     const std::string & resource,
00232                     const std::string & subResource = "",
00233                     const StrPairVector & headers = StrPairVector(),
00234                     const StrPairVector & queryParams = StrPairVector(),
00235                     const Content & content = Content()) const;
00236 
00237 
00238     enum CheckMethod {
00239         CM_SIZE,     
00240         CM_MD5_ETAG, 
00241         CM_ASSUME_INVALID  
00242     };
00243 
00252     std::string upload(const char * data,
00253                        size_t bytes,
00254                        const std::string & bucket,
00255                        const std::string & resource,
00256                        CheckMethod check = CM_SIZE,
00257                        const ObjectMetadata & md = ObjectMetadata(),
00258                        int numInParallel = -1);
00259 
00260     std::string upload(const char * data,
00261                        size_t bytes,
00262                        const std::string & uri,
00263                        CheckMethod check = CM_SIZE,
00264                        const ObjectMetadata & md = ObjectMetadata(),
00265                        int numInParallel = -1);
00266 
00267     typedef std::function<void (const char * chunk,
00268                                 size_t size,
00269                                 int chunkIndex,
00270                                 uint64_t offset,
00271                                 uint64_t totalSize) >
00272         OnChunk;
00273 
00275     static OnChunk writeToFile(const std::string & filename);
00276 
00282     void download(const std::string & bucket,
00283                   const std::string & object,
00284                   const OnChunk & onChunk,
00285                   ssize_t startOffset = 0,
00286                   ssize_t endOffset = -1) const;
00287 
00288     void download(const std::string & uri,
00289                   const OnChunk & onChunk,
00290                   ssize_t startOffset = 0,
00291                   ssize_t endOffset = -1) const;
00292 
00293     void downloadToFile(const std::string & uri,
00294                   const std::string & outfile,
00295                   ssize_t endOffset = -1) const;
00296 
00297     struct StreamingDownloadStreambuf;
00298 
00303     std::auto_ptr<std::streambuf>
00304     streamingDownload(const std::string & bucket,
00305                       const std::string & object,
00306                       ssize_t startOffset = 0,
00307                       ssize_t endOffset = -1,
00308                       const OnChunk & onChunk = OnChunk()) const;
00309 
00314     std::auto_ptr<std::streambuf>
00315     streamingDownload(const std::string & uri,
00316                       ssize_t startOffset = 0,
00317                       ssize_t endOffset = -1,
00318                       const OnChunk & onChunk = OnChunk()) const;
00319 
00321     std::auto_ptr<std::streambuf>
00322     streamingUpload(const std::string & uri,
00323                     const ObjectMetadata & md = ObjectMetadata()) const;
00324 
00326     std::auto_ptr<std::streambuf>
00327     streamingUpload(const std::string & bucket,
00328                     const std::string & object,
00329                     const ObjectMetadata & md = ObjectMetadata()) const;
00330 
00331     struct ObjectInfo {
00332         ObjectInfo();
00333         ObjectInfo(tinyxml2::XMLNode * element);
00334 
00335         JML_IMPLEMENT_OPERATOR_BOOL(exists);
00336 
00337         std::string key;
00338         uint64_t size;
00339         bool exists;
00340         std::string etag;
00341         std::string ownerId;
00342         std::string ownerName;
00343         Date lastModified;
00344         std::string storageClass;
00345     };
00346 
00347     typedef std::function<bool (const std::string & prefix,
00348                                 const std::string & objectName,
00349                                 const ObjectInfo & info,
00350                                 int depth)>
00351         OnObject;
00352 
00353     typedef std::function<bool (const std::string & prefix,
00354                                 const std::string & dirName,
00355                                 int depth)>
00356         OnSubdir;
00357 
00361     void forEachObject(const std::string & bucket,
00362                        const std::string & prefix = "",
00363                        const OnObject & onObject = OnObject(),
00364                        const OnSubdir & onSubdir = OnSubdir(),
00365                        const std::string & delimiter = "/",
00366                        int depth = 1) const;
00367 
00369     ObjectInfo tryGetObjectInfo(const std::string & bucket,
00370                                 const std::string & object) const;
00371 
00372     ObjectInfo tryGetObjectInfo(const std::string & uri) const;
00373 
00374 
00378     ObjectInfo getObjectInfo(const std::string & bucket,
00379                              const std::string & object) const;
00380 
00381     ObjectInfo getObjectInfo(const std::string & uri) const;
00382 
00383     typedef std::function<bool (std::string bucket)> OnBucket;
00384 
00388     bool forEachBucket(const OnBucket & bucket) const;
00389 
00397     static std::string
00398     getDigestMulti(const std::string & verb,
00399                    const std::string & bucket,
00400                    const std::string & resource,
00401                    const std::string & subResource,
00402                    const std::string & contentType,
00403                    const std::string & contentMd5,
00404                    const std::string & date,
00405                    const std::vector<std::pair<std::string, std::string> > & headers);
00406 
00413     static std::string
00414     getDigest(const std::string & verb,
00415               const std::string & bucket,
00416               const std::string & resource,
00417               const std::string & subResource,
00418               const std::string & contentType,
00419               const std::string & contentMd5,
00420               const std::string & date,
00421               const std::map<std::string, std::string> & headers);
00422 
00424     static std::pair<std::string, std::string>
00425     parseUri(const std::string & uri);
00426 
00427     struct MultiPartUploadPart {
00428         MultiPartUploadPart()
00429             : partNumber(0), done(false)
00430         {
00431         }
00432 
00433         int partNumber;
00434         uint64_t startOffset;
00435         uint64_t size;
00436         std::string lastModified;
00437         std::string contentMd5;
00438         std::string etag;
00439         bool done;
00440 
00441         void fromXml(tinyxml2::XMLElement * element);
00442     };
00443 
00444     struct MultiPartUpload {
00445         std::string id;
00446         std::vector<MultiPartUploadPart> parts;
00447     };
00448 
00450     MultiPartUpload
00451     obtainMultiPartUpload(const std::string & bucket,
00452                           const std::string & resource,
00453                           const ObjectMetadata & metadata) const;
00454 
00455     std::string
00456     finishMultiPartUpload(const std::string & bucket,
00457                           const std::string & resource,
00458                           const std::string & uploadId,
00459                           const std::vector<std::string> & etags) const;
00460 
00461     void uploadRecursive(std::string dirSrc,
00462                          std::string bucketDest,
00463                          bool includeDir);
00464 
00465     //easy handle for v8 wrapping
00466     void setDefaultBandwidthToServiceMbps(double mpbs);
00467 
00468 };
00469 
00471 struct S3IStream : public std::istream {
00472     S3IStream();
00473     S3IStream(const S3Api & s3, const std::string & uri);
00474 
00475     void open(const std::string & uri);
00476     void open(const std::string & bucket,
00477               const std::string & object);
00478 };
00479 
00480 struct S3Handle{
00481     S3Api s3;
00482     std::string s3UriPrefix;
00483 
00484     void initS3(const std::string & accessKeyId,
00485                 const std::string & accessKey,
00486                 const std::string & uriPrefix);
00487 
00488     size_t getS3Buffer(const std::string & filename, char** outBuffer);
00489 };
00490 
00495 class BucketAlreadyRegistered : public ML::Exception{
00496     public:
00497         BucketAlreadyRegistered(const std::string & bucketName) : 
00498                 ML::Exception("s3 bucket %s already registered",
00499                               bucketName.c_str()){}
00500 };
00501 
00502 void registerS3Bucket(const std::string & bucketName,
00503                       const std::string & accessKeyId,
00504                       const std::string & accessKey,
00505                       double bandwidthToServiceMbps = S3Api::defaultBandwidthToServiceMbps,
00506                       const std::string & protocol = "http",
00507                       const std::string & serviceUri = "s3.amazonaws.com");
00508 
00514 void registerS3Buckets(const std::string & accessKeyId,
00515                        const std::string & accessKey,
00516                        double bandwidthToServiceMbps = S3Api::defaultBandwidthToServiceMbps,
00517                        const std::string & protocol = "http",
00518                        const std::string & serviceUri = "s3.amazonaws.com");
00519 
00520 std::shared_ptr<S3Api> getS3ApiForBucket(const std::string & bucketName);
00521 
00522 // Return an URI for either a file or an s3 object
00523 size_t getUriSize(const std::string & filename);
00524 
00525 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator