RTBKit
0.9
Open-source framework to create real-time ad bidding systems.
|
00001 /* sftp.cc 00002 Jeremy Barnes, 21 June 2012 00003 Copyright (c) 2012 Datacratic. All rights reserved. 00004 00005 sftp connection. 00006 */ 00007 00008 #include "soa/service/sftp.h" 00009 #include <sys/types.h> 00010 #include <sys/socket.h> 00011 #include <netdb.h> 00012 #include <stdio.h> 00013 #include <stdlib.h> 00014 #include <unistd.h> 00015 #include <string.h> 00016 #include "jml/arch/exception.h" 00017 #include "jml/arch/format.h" 00018 #include "soa/types/date.h" 00019 #include <fstream> 00020 00021 00022 using namespace std; 00023 using namespace ML; 00024 00025 00026 namespace Datacratic { 00027 00028 00029 /*****************************************************************************/ 00030 /* SOCKET CONNECTION */ 00031 /*****************************************************************************/ 00032 00033 SocketConnection:: 00034 SocketConnection() 00035 : sock(-1) 00036 { 00037 } 00038 00039 SocketConnection:: 00040 ~SocketConnection() 00041 { 00042 close(); 00043 } 00044 00045 void 00046 SocketConnection:: 00047 connect(const std::string & hostname, 00048 const std::string & port) 00049 { 00050 struct addrinfo hints; 00051 struct addrinfo *result, *rp; 00052 00053 /* Obtain address(es) matching host/port */ 00054 memset(&hints, 0, sizeof(struct addrinfo)); 00055 hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ 00056 hints.ai_socktype = SOCK_STREAM; /* Datagram socket */ 00057 hints.ai_flags = AI_CANONNAME; 00058 hints.ai_protocol = 0; /* Any protocol */ 00059 00060 int res = getaddrinfo(hostname.c_str(), port.c_str(), &hints, &result); 00061 if (res != 0) 00062 throw ML::Exception("getaddrinfo: %s", gai_strerror(res)); 00063 00064 cerr << "res = " << res << endl; 00065 cerr << "result = " << result << endl; 00066 00067 /* getaddrinfo() returns a list of address structures. 00068 Try each address until we successfully connect(2). 00069 If socket(2) (or connect(2)) fails, we (close the socket 00070 and) try the next address. */ 00071 00072 for (rp = result; rp; rp = rp->ai_next) { 00073 if (rp->ai_canonname) 00074 cerr << "trying " << rp->ai_canonname << endl; 00075 else cerr << "trying null" << endl; 00076 00077 sock = socket(rp->ai_family, rp->ai_socktype, 00078 rp->ai_protocol); 00079 if (sock == -1) { 00080 cerr << "couldn't create connection socket: " 00081 << strerror(errno) << endl; 00082 continue; 00083 } 00084 00085 if (::connect(sock, rp->ai_addr, rp->ai_addrlen) != -1) { 00086 cerr << "connected" << endl; 00087 break; /* Success */ 00088 } 00089 00090 cerr << "couldn't connect: " << strerror(errno) << endl; 00091 00092 ::close(sock); 00093 } 00094 00095 if (!rp) 00096 throw ML::Exception("couldn't connect anywhere"); 00097 00098 freeaddrinfo(result); /* No longer needed */ 00099 } 00100 00101 void 00102 SocketConnection:: 00103 close() 00104 { 00105 ::close(sock); 00106 } 00107 00108 00109 /*****************************************************************************/ 00110 /* SSH CONNECTION */ 00111 /*****************************************************************************/ 00112 00113 SshConnection:: 00114 SshConnection() 00115 : session(0) 00116 { 00117 } 00118 00119 SshConnection:: 00120 ~SshConnection() 00121 { 00122 close(); 00123 } 00124 00125 void 00126 SshConnection:: 00127 connect(const std::string & hostname, 00128 const std::string & port) 00129 { 00130 SocketConnection::connect(hostname, port); 00131 00132 /* Create a session instance 00133 */ 00134 session = libssh2_session_init(); 00135 00136 if(!session) 00137 throw ML::Exception("couldn't get libssh2 session"); 00138 00139 /* ... start it up. This will trade welcome banners, exchange keys, 00140 * and setup crypto, compression, and MAC layers 00141 */ 00142 int rc = libssh2_session_handshake(session, sock); 00143 00144 if(rc) { 00145 throw ML::Exception("error establishing session"); 00146 } 00147 00148 /* At this point we havn't yet authenticated. The first thing to do 00149 * is check the hostkey's fingerprint against our known hosts Your app 00150 * may have it hard coded, may go to a file, may present it to the 00151 * user, that's your call 00152 */ 00153 const char * fingerprint 00154 = libssh2_hostkey_hash(session, LIBSSH2_HOSTKEY_HASH_SHA1); 00155 00156 printf("Fingerprint: "); 00157 for(int i = 0; i < 20; i++) { 00158 printf("%02X ", (unsigned char)fingerprint[i]); 00159 } 00160 printf("\n"); 00161 } 00162 00163 void 00164 SshConnection:: 00165 passwordAuth(const std::string & username, 00166 const std::string & password) 00167 { 00168 /* We could authenticate via password */ 00169 if (libssh2_userauth_password(session, 00170 username.c_str(), 00171 password.c_str())) { 00172 00173 throw ML::Exception("password authentication failed: " + lastError()); 00174 } 00175 } 00176 00177 void 00178 SshConnection:: 00179 publicKeyAuth(const std::string & username, 00180 const std::string & publicKeyFile, 00181 const std::string & privateKeyFile) 00182 { 00183 /* Or by public key */ 00184 if (libssh2_userauth_publickey_fromfile(session, username.c_str(), 00185 publicKeyFile.c_str(), 00186 privateKeyFile.c_str(), 00187 "")) { 00188 throw ML::Exception("public key authentication failed: " + lastError()); 00189 } 00190 } 00191 00192 void 00193 SshConnection:: 00194 setBlocking() 00195 { 00196 /* Since we have not set non-blocking, tell libssh2 we are blocking */ 00197 libssh2_session_set_blocking(session, 1); 00198 } 00199 00200 std::string 00201 SshConnection:: 00202 lastError() const 00203 { 00204 char * errmsg = 0; 00205 int res = libssh2_session_last_error(session, &errmsg, 0, 0); 00206 if (res) 00207 cerr << "error getting error: " << res << endl; 00208 return errmsg; 00209 } 00210 00211 void 00212 SshConnection:: 00213 close() 00214 { 00215 if (session) { 00216 libssh2_session_disconnect(session, "Normal Shutdown"); 00217 libssh2_session_free(session); 00218 } 00219 session = 0; 00220 00221 SocketConnection::close(); 00222 } 00223 00224 00225 /*****************************************************************************/ 00226 /* ATTRIBUTES */ 00227 /*****************************************************************************/ 00228 00229 00230 00231 /*****************************************************************************/ 00232 /* DIRECTORY */ 00233 /*****************************************************************************/ 00234 00235 SftpConnection::Directory:: 00236 Directory(const std::string & path, 00237 LIBSSH2_SFTP_HANDLE * handle, 00238 SftpConnection * owner) 00239 : path(path), handle(handle), owner(owner) 00240 { 00241 } 00242 00243 SftpConnection::Directory:: 00244 ~Directory() 00245 { 00246 libssh2_sftp_close(handle); 00247 } 00248 00249 void 00250 SftpConnection::Directory:: 00251 ls() const 00252 { 00253 do { 00254 char mem[512]; 00255 char longentry[512]; 00256 LIBSSH2_SFTP_ATTRIBUTES attrs; 00257 00258 /* loop until we fail */ 00259 int rc = libssh2_sftp_readdir_ex(handle, mem, sizeof(mem), 00260 00261 longentry, sizeof(longentry), 00262 &attrs); 00263 if(rc > 0) { 00264 /* rc is the length of the file name in the mem 00265 buffer */ 00266 00267 if (longentry[0] != '\0') { 00268 printf("%s\n", longentry); 00269 } else { 00270 if(attrs.flags & LIBSSH2_SFTP_ATTR_PERMISSIONS) { 00271 /* this should check what permissions it 00272 is and print the output accordingly */ 00273 printf("--fix----- "); 00274 } 00275 else { 00276 printf("---------- "); 00277 } 00278 00279 if(attrs.flags & LIBSSH2_SFTP_ATTR_UIDGID) { 00280 printf("%4ld %4ld ", attrs.uid, attrs.gid); 00281 } 00282 else { 00283 printf(" - - "); 00284 } 00285 00286 if(attrs.flags & LIBSSH2_SFTP_ATTR_SIZE) { 00287 printf("%8lld ", (unsigned long long)attrs.filesize); 00288 } 00289 00290 printf("%s\n", mem); 00291 } 00292 } 00293 else 00294 break; 00295 00296 } while (1); 00297 } 00298 00299 void 00300 SftpConnection::Directory:: 00301 forEachFile(const OnFile & onFile) const 00302 { 00303 do { 00304 char mem[512]; 00305 char longentry[512]; 00306 Attributes attrs; 00307 00308 /* loop until we fail */ 00309 int rc = libssh2_sftp_readdir_ex(handle, 00310 mem, sizeof(mem), 00311 longentry, sizeof(longentry), 00312 &attrs); 00313 00314 if(rc > 0) { 00315 /* rc is the length of the file name in the mem 00316 buffer */ 00317 string filename(mem, mem + rc); 00318 onFile(filename, attrs); 00319 } 00320 else 00321 break; 00322 00323 } while (1); 00324 } 00325 00326 00327 /*****************************************************************************/ 00328 /* FILE */ 00329 /*****************************************************************************/ 00330 00331 SftpConnection::File:: 00332 File(const std::string & path, 00333 LIBSSH2_SFTP_HANDLE * handle, 00334 SftpConnection * owner) 00335 : path(path), handle(handle), owner(owner) 00336 { 00337 } 00338 00339 SftpConnection::File:: 00340 ~File() 00341 { 00342 libssh2_sftp_close(handle); 00343 } 00344 00345 SftpConnection::Attributes 00346 SftpConnection::File:: 00347 getAttr() const 00348 { 00349 Attributes result; 00350 int res = libssh2_sftp_fstat_ex(handle, &result, 0); 00351 if (res == -1) 00352 throw ML::Exception("getAttr(): " + owner->lastError()); 00353 return result; 00354 } 00355 00356 uint64_t 00357 SftpConnection::File:: 00358 size() const 00359 { 00360 return getAttr().filesize; 00361 } 00362 00363 void 00364 SftpConnection::File:: 00365 downloadTo(const std::string & filename) const 00366 { 00367 uint64_t bytesToRead = size(); 00368 00369 uint64_t done = 0; 00370 std::ofstream stream(filename.c_str()); 00371 00372 size_t bufSize = 1024 * 1024; 00373 00374 char * buf = new char[bufSize]; 00375 00376 Date start = Date::now(); 00377 00378 for (;;) { 00379 ssize_t numRead = libssh2_sftp_read(handle, buf, bufSize); 00380 //cerr << "read " << numRead << " bytes" << endl; 00381 if (numRead < 0) { 00382 throw ML::Exception("read(): " + owner->lastError()); 00383 } 00384 if (numRead == 0) break; 00385 00386 stream.write(buf, numRead); 00387 uint64_t doneBefore = done; 00388 done += numRead; 00389 00390 if (doneBefore / 10000000 != done / 10000000) { 00391 double elapsed = Date::now().secondsSince(start); 00392 double rate = done / elapsed; 00393 cerr << "done " << done << " of " 00394 << bytesToRead << " at " 00395 << rate / 1024.0 00396 << "k/sec window " << numRead 00397 << " time left " 00398 << (bytesToRead - done) / rate 00399 << "s" << endl; 00400 } 00401 } 00402 00403 delete[] buf; 00404 } 00405 00406 00407 /*****************************************************************************/ 00408 /* SFTP CONNECTION */ 00409 /*****************************************************************************/ 00410 00411 SftpConnection:: 00412 SftpConnection() 00413 : sftp_session(0) 00414 { 00415 } 00416 00417 SftpConnection:: 00418 ~SftpConnection() 00419 { 00420 close(); 00421 } 00422 00423 void 00424 SftpConnection:: 00425 connectPasswordAuth(const std::string & hostname, 00426 const std::string & username, 00427 const std::string & password, 00428 const std::string & port) 00429 { 00430 SshConnection::connect(hostname, port); 00431 SshConnection::passwordAuth(username, password); 00432 00433 sftp_session = libssh2_sftp_init(session); 00434 00435 if (!sftp_session) { 00436 throw ML::Exception("can't initialize SFTP session: " 00437 + lastError()); 00438 } 00439 00440 } 00441 00442 void 00443 SftpConnection:: 00444 connectPublicKeyAuth(const std::string & hostname, 00445 const std::string & username, 00446 const std::string & publicKeyFile, 00447 const std::string & privateKeyFile, 00448 const std::string & port) 00449 { 00450 SshConnection::connect(hostname, port); 00451 SshConnection::publicKeyAuth(username, publicKeyFile, privateKeyFile); 00452 00453 sftp_session = libssh2_sftp_init(session); 00454 00455 if (!sftp_session) { 00456 throw ML::Exception("can't initialize SFTP session: " 00457 + lastError()); 00458 } 00459 00460 } 00461 00462 SftpConnection::Directory 00463 SftpConnection:: 00464 getDirectory(const std::string & path) 00465 { 00466 LIBSSH2_SFTP_HANDLE * handle 00467 = libssh2_sftp_opendir(sftp_session, path.c_str()); 00468 00469 if (!handle) { 00470 throw ML::Exception("couldn't open path: " + lastError()); 00471 } 00472 00473 return Directory(path, handle, this); 00474 } 00475 00476 SftpConnection::File 00477 SftpConnection:: 00478 openFile(const std::string & path) 00479 { 00480 LIBSSH2_SFTP_HANDLE * handle 00481 = libssh2_sftp_open_ex(sftp_session, path.c_str(), 00482 path.length(), LIBSSH2_FXF_READ, 0, 00483 LIBSSH2_SFTP_OPENFILE); 00484 00485 if (!handle) { 00486 throw ML::Exception("couldn't open path: " + lastError()); 00487 } 00488 00489 return File(path, handle, this); 00490 } 00491 00492 void 00493 SftpConnection:: 00494 close() 00495 { 00496 if (sftp_session) { 00497 libssh2_sftp_shutdown(sftp_session); 00498 sftp_session = 0; 00499 } 00500 00501 SshConnection::close(); 00502 } 00503 00504 void 00505 SftpConnection:: 00506 uploadFile(const char * start, 00507 size_t size, 00508 const std::string & path) 00509 { 00510 /* Request a file via SFTP */ 00511 LIBSSH2_SFTP_HANDLE * handle = 00512 libssh2_sftp_open(sftp_session, path.c_str(), 00513 LIBSSH2_FXF_WRITE|LIBSSH2_FXF_CREAT|LIBSSH2_FXF_TRUNC, 00514 LIBSSH2_SFTP_S_IRUSR|LIBSSH2_SFTP_S_IWUSR| 00515 LIBSSH2_SFTP_S_IRGRP|LIBSSH2_SFTP_S_IROTH); 00516 00517 if (!handle) { 00518 throw ML::Exception("couldn't open path: " + lastError()); 00519 } 00520 00521 Date started = Date::now(); 00522 00523 uint64_t offset = 0; 00524 uint64_t lastPrint = 0; 00525 Date lastTime = started; 00526 00527 for (; offset < size; ) { 00528 /* write data in a loop until we block */ 00529 size_t toSend = std::min<size_t>(size - offset, 00530 1024 * 1024); 00531 00532 ssize_t rc = libssh2_sftp_write(handle, 00533 start + offset, 00534 toSend); 00535 00536 if (rc == -1) 00537 throw ML::Exception("couldn't upload file: " + lastError()); 00538 00539 offset += rc; 00540 00541 if (offset > lastPrint + 5 * 1024 * 1024 || offset == size) { 00542 Date now = Date::now(); 00543 00544 double mb = 1024 * 1024; 00545 00546 double doneMb = offset / mb; 00547 double totalMb = size / mb; 00548 double elapsedOverall = now.secondsSince(started); 00549 double mbSecOverall = doneMb / elapsedOverall; 00550 double elapsedSince = now.secondsSince(lastTime); 00551 double mbSecInst = (offset - lastPrint) / mb / elapsedSince; 00552 00553 cerr << ML::format("done %.2fMB of %.2fMB (%.2f%%) at %.2fMB/sec inst and %.2fMB/sec overall", 00554 doneMb, totalMb, 00555 100.0 * doneMb / totalMb, 00556 mbSecInst, 00557 mbSecOverall) 00558 << endl; 00559 00560 00561 lastPrint = offset; 00562 lastTime = now; 00563 } 00564 //cerr << "at " << offset / 1024.0 / 1024.0 00565 // << " of " << size << endl; 00566 } 00567 00568 libssh2_sftp_close(handle); 00569 } 00570 00571 } // namespace Datacratic