RTBKit  0.9
Open-source framework to create real-time ad bidding systems.
soa/service/sftp.cc
00001 /* sftp.cc
00002    Jeremy Barnes, 21 June 2012
00003    Copyright (c) 2012 Datacratic.  All rights reserved.
00004 
00005    sftp connection.
00006 */
00007 
00008 #include "soa/service/sftp.h"
00009 #include <sys/types.h>
00010 #include <sys/socket.h>
00011 #include <netdb.h>
00012 #include <stdio.h>
00013 #include <stdlib.h>
00014 #include <unistd.h>
00015 #include <string.h>
00016 #include "jml/arch/exception.h"
00017 #include "jml/arch/format.h"
00018 #include "soa/types/date.h"
00019 #include <fstream>
00020 
00021 
00022 using namespace std;
00023 using namespace ML;
00024 
00025 
00026 namespace Datacratic {
00027 
00028 
00029 /*****************************************************************************/
00030 /* SOCKET CONNECTION                                                         */
00031 /*****************************************************************************/
00032 
00033 SocketConnection::
00034 SocketConnection()
00035     : sock(-1)
00036 {
00037 }
00038 
00039 SocketConnection::
00040 ~SocketConnection()
00041 {
00042     close();
00043 }
00044 
00045 void
00046 SocketConnection::
00047 connect(const std::string & hostname,
00048         const std::string & port)
00049 {
00050     struct addrinfo hints;
00051     struct addrinfo *result, *rp;
00052 
00053     /* Obtain address(es) matching host/port */
00054     memset(&hints, 0, sizeof(struct addrinfo));
00055     hints.ai_family = AF_UNSPEC;     /* Allow IPv4 or IPv6 */
00056     hints.ai_socktype = SOCK_STREAM; /* Datagram socket */
00057     hints.ai_flags = AI_CANONNAME;
00058     hints.ai_protocol = 0;           /* Any protocol */
00059 
00060     int res = getaddrinfo(hostname.c_str(), port.c_str(), &hints, &result);
00061     if (res != 0)
00062         throw ML::Exception("getaddrinfo: %s", gai_strerror(res));
00063 
00064     cerr << "res = " << res << endl;
00065     cerr << "result = " << result << endl;
00066 
00067     /* getaddrinfo() returns a list of address structures.
00068        Try each address until we successfully connect(2).
00069        If socket(2) (or connect(2)) fails, we (close the socket
00070        and) try the next address. */
00071 
00072     for (rp = result; rp; rp = rp->ai_next) {
00073         if (rp->ai_canonname)
00074             cerr << "trying " << rp->ai_canonname << endl;
00075         else cerr << "trying null" << endl;
00076 
00077         sock = socket(rp->ai_family, rp->ai_socktype,
00078                       rp->ai_protocol);
00079         if (sock == -1) {
00080             cerr << "couldn't create connection socket: "
00081                  << strerror(errno) << endl;
00082             continue;
00083         }
00084             
00085         if (::connect(sock, rp->ai_addr, rp->ai_addrlen) != -1) {
00086             cerr << "connected" << endl;
00087             break;                  /* Success */
00088         }            
00089 
00090         cerr << "couldn't connect: " << strerror(errno) << endl;
00091 
00092         ::close(sock);
00093     }
00094         
00095     if (!rp)
00096         throw ML::Exception("couldn't connect anywhere");
00097         
00098     freeaddrinfo(result);           /* No longer needed */
00099 }
00100 
00101 void
00102 SocketConnection::
00103 close()
00104 {
00105     ::close(sock);
00106 }
00107 
00108 
00109 /*****************************************************************************/
00110 /* SSH CONNECTION                                                            */
00111 /*****************************************************************************/
00112 
00113 SshConnection::
00114 SshConnection()
00115     : session(0)
00116 {
00117 }
00118 
00119 SshConnection::
00120 ~SshConnection()
00121 {
00122     close();
00123 }
00124 
00125 void
00126 SshConnection::
00127 connect(const std::string & hostname,
00128         const std::string & port)
00129 {
00130     SocketConnection::connect(hostname, port);
00131 
00132     /* Create a session instance
00133      */ 
00134     session = libssh2_session_init();
00135 
00136     if(!session)
00137         throw ML::Exception("couldn't get libssh2 session");
00138  
00139     /* ... start it up. This will trade welcome banners, exchange keys,
00140      * and setup crypto, compression, and MAC layers
00141      */ 
00142     int rc = libssh2_session_handshake(session, sock);
00143 
00144     if(rc) {
00145         throw ML::Exception("error establishing session");
00146     }
00147  
00148     /* At this point we havn't yet authenticated.  The first thing to do
00149      * is check the hostkey's fingerprint against our known hosts Your app
00150      * may have it hard coded, may go to a file, may present it to the
00151      * user, that's your call
00152      */ 
00153     const char * fingerprint
00154         = libssh2_hostkey_hash(session, LIBSSH2_HOSTKEY_HASH_SHA1);
00155 
00156     printf("Fingerprint: ");
00157     for(int i = 0; i < 20; i++) {
00158         printf("%02X ", (unsigned char)fingerprint[i]);
00159     }
00160     printf("\n");
00161 }
00162 
00163 void
00164 SshConnection::
00165 passwordAuth(const std::string & username,
00166                   const std::string & password)
00167 {
00168     /* We could authenticate via password */ 
00169     if (libssh2_userauth_password(session,
00170                                   username.c_str(),
00171                                   password.c_str())) {
00172 
00173         throw ML::Exception("password authentication failed: " + lastError());
00174     }
00175 }
00176 
00177 void
00178 SshConnection::
00179 publicKeyAuth(const std::string & username,
00180               const std::string & publicKeyFile,
00181               const std::string & privateKeyFile)
00182 {
00183 /* Or by public key */ 
00184     if (libssh2_userauth_publickey_fromfile(session, username.c_str(),
00185                                             publicKeyFile.c_str(),
00186                                             privateKeyFile.c_str(),
00187                                             "")) {
00188         throw ML::Exception("public key authentication failed: " + lastError());
00189     }
00190 }
00191  
00192 void
00193 SshConnection::
00194 setBlocking()
00195 {
00196     /* Since we have not set non-blocking, tell libssh2 we are blocking */ 
00197     libssh2_session_set_blocking(session, 1);
00198 }
00199 
00200 std::string
00201 SshConnection::
00202 lastError() const
00203 {
00204     char * errmsg = 0;
00205     int res = libssh2_session_last_error(session, &errmsg, 0, 0);
00206     if (res)
00207         cerr << "error getting error: " << res << endl;
00208     return errmsg;
00209 }
00210 
00211 void
00212 SshConnection::
00213 close()
00214 {
00215     if (session) {
00216         libssh2_session_disconnect(session, "Normal Shutdown");
00217         libssh2_session_free(session);
00218     }
00219     session = 0;
00220 
00221     SocketConnection::close();
00222 }
00223 
00224 
00225 /*****************************************************************************/
00226 /* ATTRIBUTES                                                                */
00227 /*****************************************************************************/
00228 
00229 
00230 
00231 /*****************************************************************************/
00232 /* DIRECTORY                                                                 */
00233 /*****************************************************************************/
00234 
00235 SftpConnection::Directory::
00236 Directory(const std::string & path,
00237           LIBSSH2_SFTP_HANDLE * handle,
00238           SftpConnection * owner)
00239     : path(path), handle(handle), owner(owner)
00240 {
00241 }
00242 
00243 SftpConnection::Directory::
00244 ~Directory()
00245 {
00246     libssh2_sftp_close(handle);
00247 }
00248 
00249 void
00250 SftpConnection::Directory::
00251 ls() const
00252 {
00253     do {
00254         char mem[512];
00255         char longentry[512];
00256         LIBSSH2_SFTP_ATTRIBUTES attrs;
00257  
00258         /* loop until we fail */ 
00259         int rc = libssh2_sftp_readdir_ex(handle, mem, sizeof(mem),
00260 
00261                                          longentry, sizeof(longentry),
00262                                          &attrs);
00263         if(rc > 0) {
00264             /* rc is the length of the file name in the mem
00265                buffer */ 
00266  
00267             if (longentry[0] != '\0') {
00268                 printf("%s\n", longentry);
00269             } else {
00270                 if(attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS) {
00271                     /* this should check what permissions it
00272                        is and print the output accordingly */ 
00273                     printf("--fix----- ");
00274                 }
00275                 else {
00276                     printf("---------- ");
00277                 }
00278  
00279                 if(attrs.flags & LIBSSH2_SFTP_ATTR_UIDGID) {
00280                     printf("%4ld %4ld ", attrs.uid, attrs.gid);
00281                 }
00282                 else {
00283                     printf("   -    - ");
00284                 }
00285  
00286                 if(attrs.flags & LIBSSH2_SFTP_ATTR_SIZE) {
00287                     printf("%8lld ", (unsigned long long)attrs.filesize);
00288                 }
00289                     
00290                 printf("%s\n", mem);
00291             }
00292         }
00293         else
00294             break;
00295  
00296     } while (1);
00297 }
00298 
00299 void
00300 SftpConnection::Directory::
00301 forEachFile(const OnFile & onFile) const
00302 {
00303     do {
00304         char mem[512];
00305         char longentry[512];
00306         Attributes attrs;
00307  
00308         /* loop until we fail */ 
00309         int rc = libssh2_sftp_readdir_ex(handle,
00310                                          mem, sizeof(mem),
00311                                          longentry, sizeof(longentry),
00312                                          &attrs);
00313 
00314         if(rc > 0) {
00315             /* rc is the length of the file name in the mem
00316                buffer */ 
00317             string filename(mem, mem + rc);
00318             onFile(filename, attrs);
00319         }
00320         else
00321             break;
00322  
00323     } while (1);
00324 }
00325 
00326 
00327 /*****************************************************************************/
00328 /* FILE                                                                      */
00329 /*****************************************************************************/
00330 
00331 SftpConnection::File::
00332 File(const std::string & path,
00333      LIBSSH2_SFTP_HANDLE * handle,
00334      SftpConnection * owner)
00335     : path(path), handle(handle), owner(owner)
00336 {
00337 }
00338 
00339 SftpConnection::File::
00340 ~File()
00341 {
00342     libssh2_sftp_close(handle);
00343 }
00344 
00345 SftpConnection::Attributes
00346 SftpConnection::File::
00347 getAttr() const
00348 {
00349     Attributes result;
00350     int res = libssh2_sftp_fstat_ex(handle, &result, 0);
00351     if (res == -1)
00352         throw ML::Exception("getAttr(): " + owner->lastError());
00353     return result;
00354 }
00355 
00356 uint64_t
00357 SftpConnection::File::
00358 size() const
00359 {
00360     return getAttr().filesize;
00361 }
00362 
00363 void
00364 SftpConnection::File::
00365 downloadTo(const std::string & filename) const
00366 {
00367     uint64_t bytesToRead = size();
00368 
00369     uint64_t done = 0;
00370     std::ofstream stream(filename.c_str());
00371 
00372     size_t bufSize = 1024 * 1024;
00373 
00374     char * buf = new char[bufSize];
00375             
00376     Date start = Date::now();
00377 
00378     for (;;) {
00379         ssize_t numRead = libssh2_sftp_read(handle, buf, bufSize);
00380         //cerr << "read " << numRead << " bytes" << endl;
00381         if (numRead < 0) {
00382             throw ML::Exception("read(): " + owner->lastError());
00383         }
00384         if (numRead == 0) break;
00385 
00386         stream.write(buf, numRead);
00387         uint64_t doneBefore = done;
00388         done += numRead;
00389 
00390         if (doneBefore / 10000000 != done / 10000000) {
00391             double elapsed = Date::now().secondsSince(start);
00392             double rate = done / elapsed;
00393             cerr << "done " << done << " of "
00394                  << bytesToRead << " at "
00395                  << rate / 1024.0
00396                  << "k/sec window " << numRead
00397                  << " time left "
00398                  << (bytesToRead - done) / rate
00399                  << "s" << endl;
00400         }
00401     }
00402 
00403     delete[] buf;
00404 }
00405 
00406 
00407 /*****************************************************************************/
00408 /* SFTP CONNECTION                                                           */
00409 /*****************************************************************************/
00410 
00411 SftpConnection::
00412 SftpConnection()
00413     : sftp_session(0)
00414 {
00415 }
00416 
00417 SftpConnection::
00418 ~SftpConnection()
00419 {
00420     close();
00421 }
00422 
00423 void
00424 SftpConnection::
00425 connectPasswordAuth(const std::string & hostname,
00426                     const std::string & username,
00427                     const std::string & password,
00428                     const std::string & port)
00429 {
00430     SshConnection::connect(hostname, port);
00431     SshConnection::passwordAuth(username, password);
00432 
00433     sftp_session = libssh2_sftp_init(session);
00434  
00435     if (!sftp_session) {
00436         throw ML::Exception("can't initialize SFTP session: "
00437                             + lastError());
00438     }
00439 
00440 }
00441 
00442 void
00443 SftpConnection::
00444 connectPublicKeyAuth(const std::string & hostname,
00445                               const std::string & username,
00446                               const std::string & publicKeyFile,
00447                               const std::string & privateKeyFile,
00448                               const std::string & port)
00449 {
00450     SshConnection::connect(hostname, port);
00451     SshConnection::publicKeyAuth(username, publicKeyFile, privateKeyFile);
00452 
00453     sftp_session = libssh2_sftp_init(session);
00454  
00455     if (!sftp_session) {
00456         throw ML::Exception("can't initialize SFTP session: "
00457                             + lastError());
00458     }
00459 
00460 }
00461 
00462 SftpConnection::Directory
00463 SftpConnection::
00464 getDirectory(const std::string & path)
00465 {
00466     LIBSSH2_SFTP_HANDLE * handle
00467         = libssh2_sftp_opendir(sftp_session, path.c_str());
00468         
00469     if (!handle) {
00470         throw ML::Exception("couldn't open path: " + lastError());
00471     }
00472 
00473     return Directory(path, handle, this);
00474 }
00475 
00476 SftpConnection::File
00477 SftpConnection::
00478 openFile(const std::string & path)
00479 {
00480     LIBSSH2_SFTP_HANDLE * handle
00481         = libssh2_sftp_open_ex(sftp_session, path.c_str(),
00482                                path.length(), LIBSSH2_FXF_READ, 0,
00483                                LIBSSH2_SFTP_OPENFILE);
00484         
00485     if (!handle) {
00486         throw ML::Exception("couldn't open path: " + lastError());
00487     }
00488 
00489     return File(path, handle, this);
00490 }
00491 
00492 void
00493 SftpConnection::
00494 close()
00495 {
00496     if (sftp_session) {
00497         libssh2_sftp_shutdown(sftp_session);
00498         sftp_session = 0;
00499     }
00500 
00501     SshConnection::close();
00502 }
00503 
00504 void
00505 SftpConnection::
00506 uploadFile(const char * start,
00507            size_t size,
00508            const std::string & path)
00509 {
00510     /* Request a file via SFTP */ 
00511     LIBSSH2_SFTP_HANDLE * handle =
00512         libssh2_sftp_open(sftp_session, path.c_str(),
00513                           LIBSSH2_FXF_WRITE|LIBSSH2_FXF_CREAT|LIBSSH2_FXF_TRUNC,
00514                           LIBSSH2_SFTP_S_IRUSR|LIBSSH2_SFTP_S_IWUSR|
00515                           LIBSSH2_SFTP_S_IRGRP|LIBSSH2_SFTP_S_IROTH);
00516     
00517     if (!handle) {
00518         throw ML::Exception("couldn't open path: " + lastError());
00519     }
00520 
00521     Date started = Date::now();
00522 
00523     uint64_t offset = 0;
00524     uint64_t lastPrint = 0;
00525     Date lastTime = started;
00526 
00527     for (; offset < size; ) {
00528         /* write data in a loop until we block */ 
00529         size_t toSend = std::min<size_t>(size - offset,
00530                                          1024 * 1024);
00531 
00532         ssize_t rc = libssh2_sftp_write(handle,
00533                                         start + offset,
00534                                         toSend);
00535         
00536         if (rc == -1)
00537             throw ML::Exception("couldn't upload file: " + lastError());
00538 
00539         offset += rc;
00540         
00541         if (offset > lastPrint + 5 * 1024 * 1024 || offset == size) {
00542             Date now = Date::now();
00543 
00544             double mb = 1024 * 1024;
00545 
00546             double doneMb = offset / mb;
00547             double totalMb = size / mb;
00548             double elapsedOverall = now.secondsSince(started);
00549             double mbSecOverall = doneMb / elapsedOverall;
00550             double elapsedSince = now.secondsSince(lastTime);
00551             double mbSecInst = (offset - lastPrint) / mb / elapsedSince;
00552 
00553             cerr << ML::format("done %.2fMB of %.2fMB (%.2f%%) at %.2fMB/sec inst and %.2fMB/sec overall",
00554                                doneMb, totalMb,
00555                                100.0 * doneMb / totalMb,
00556                                mbSecInst,
00557                                mbSecOverall)
00558                  << endl;
00559                                
00560 
00561             lastPrint = offset;
00562             lastTime = now;
00563         }
00564         //cerr << "at " << offset / 1024.0 / 1024.0
00565         //     << " of " << size << endl;
00566     }
00567  
00568     libssh2_sftp_close(handle);
00569 }
00570 
00571 } // namespace Datacratic
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator