MediaWiki  REL1_19
MemcachedClient.php
Go to the documentation of this file.
00001 <?php
00064 // {{{ requirements
00065 // }}}
00066 
00067 // {{{ class MWMemcached
00074 class MWMemcached {
00075         // {{{ properties
00076         // {{{ public
00077 
00078         // {{{ constants
00079         // {{{ flags
00080 
00084         const SERIALIZED = 1;
00085 
00089         const COMPRESSED = 2;
00090 
00091         // }}}
00092 
00096         const COMPRESSION_SAVINGS = 0.20;
00097 
00098         // }}}
00099 
00100 
00107         var $stats;
00108 
00109         // }}}
00110         // {{{ private
00111 
00118         var $_cache_sock;
00119 
00126         var $_debug;
00127 
00134         var $_host_dead;
00135 
00142         var $_have_zlib;
00143 
00150         var $_compress_enable;
00151 
00158         var $_compress_threshold;
00159 
00166         var $_persistent;
00167 
00174         var $_single_sock;
00175 
00182         var $_servers;
00183 
00190         var $_buckets;
00191 
00198         var $_bucketcount;
00199 
00206         var $_active;
00207 
00214         var $_timeout_seconds;
00215 
00222         var $_timeout_microseconds;
00223 
00227         var $_connect_timeout;
00228 
00232         var $_connect_attempts;
00233 
00234         // }}}
00235         // }}}
00236         // {{{ methods
00237         // {{{ public functions
00238         // {{{ memcached()
00239 
00247         public function __construct( $args ) {
00248                 $this->set_servers( isset( $args['servers'] ) ? $args['servers'] : array() );
00249                 $this->_debug = isset( $args['debug'] ) ? $args['debug'] : false;
00250                 $this->stats = array();
00251                 $this->_compress_threshold = isset( $args['compress_threshold'] ) ? $args['compress_threshold'] : 0;
00252                 $this->_persistent = isset( $args['persistent'] ) ? $args['persistent'] : false;
00253                 $this->_compress_enable = true;
00254                 $this->_have_zlib = function_exists( 'gzcompress' );
00255 
00256                 $this->_cache_sock = array();
00257                 $this->_host_dead = array();
00258 
00259                 $this->_timeout_seconds = 0;
00260                 $this->_timeout_microseconds = isset( $args['timeout'] ) ? $args['timeout'] : 100000;
00261 
00262                 $this->_connect_timeout = isset( $args['connect_timeout'] ) ? $args['connect_timeout'] : 0.1;
00263                 $this->_connect_attempts = 2;
00264         }
00265 
00266         // }}}
00267         // {{{ add()
00268 
00283         public function add( $key, $val, $exp = 0 ) {
00284                 return $this->_set( 'add', $key, $val, $exp );
00285         }
00286 
00287         // }}}
00288         // {{{ decr()
00289 
00298         public function decr( $key, $amt = 1 ) {
00299                 return $this->_incrdecr( 'decr', $key, $amt );
00300         }
00301 
00302         // }}}
00303         // {{{ delete()
00304 
00313         public function delete( $key, $time = 0 ) {
00314                 if ( !$this->_active ) {
00315                         return false;
00316                 }
00317 
00318                 $sock = $this->get_sock( $key );
00319                 if ( !is_resource( $sock ) ) {
00320                         return false;
00321                 }
00322 
00323                 $key = is_array( $key ) ? $key[1] : $key;
00324 
00325                 if ( isset( $this->stats['delete'] ) ) {
00326                         $this->stats['delete']++;
00327                 } else {
00328                         $this->stats['delete'] = 1;
00329                 }
00330                 $cmd = "delete $key $time\r\n";
00331                 if( !$this->_safe_fwrite( $sock, $cmd, strlen( $cmd ) ) ) {
00332                         $this->_dead_sock( $sock );
00333                         return false;
00334                 }
00335                 $res = trim( fgets( $sock ) );
00336 
00337                 if ( $this->_debug ) {
00338                         $this->_debugprint( sprintf( "MemCache: delete %s (%s)\n", $key, $res ) );
00339                 }
00340 
00341                 if ( $res == "DELETED" ) {
00342                         return true;
00343                 }
00344                 return false;
00345         }
00346 
00347         public function lock( $key, $timeout = 0 ) {
00348                 /* stub */
00349                 return true;
00350         }
00351 
00352         public function unlock( $key ) {
00353                 /* stub */
00354                 return true;
00355         }
00356 
00357         // }}}
00358         // {{{ disconnect_all()
00359 
00363         public function disconnect_all() {
00364                 foreach ( $this->_cache_sock as $sock ) {
00365                         fclose( $sock );
00366                 }
00367 
00368                 $this->_cache_sock = array();
00369         }
00370 
00371         // }}}
00372         // {{{ enable_compress()
00373 
00379         public function enable_compress( $enable ) {
00380                 $this->_compress_enable = $enable;
00381         }
00382 
00383         // }}}
00384         // {{{ forget_dead_hosts()
00385 
00389         public function forget_dead_hosts() {
00390                 $this->_host_dead = array();
00391         }
00392 
00393         // }}}
00394         // {{{ get()
00395 
00403         public function get( $key ) {
00404                 wfProfileIn( __METHOD__ );
00405 
00406                 if ( $this->_debug ) {
00407                         $this->_debugprint( "get($key)\n" );
00408                 }
00409 
00410                 if ( !$this->_active ) {
00411                         wfProfileOut( __METHOD__ );
00412                         return false;
00413                 }
00414 
00415                 $sock = $this->get_sock( $key );
00416 
00417                 if ( !is_resource( $sock ) ) {
00418                         wfProfileOut( __METHOD__ );
00419                         return false;
00420                 }
00421 
00422                 $key = is_array( $key ) ? $key[1] : $key;
00423                 if ( isset( $this->stats['get'] ) ) {
00424                         $this->stats['get']++;
00425                 } else {
00426                         $this->stats['get'] = 1;
00427                 }
00428 
00429                 $cmd = "get $key\r\n";
00430                 if ( !$this->_safe_fwrite( $sock, $cmd, strlen( $cmd ) ) ) {
00431                         $this->_dead_sock( $sock );
00432                         wfProfileOut( __METHOD__ );
00433                         return false;
00434                 }
00435 
00436                 $val = array();
00437                 $this->_load_items( $sock, $val );
00438 
00439                 if ( $this->_debug ) {
00440                         foreach ( $val as $k => $v ) {
00441                                 $this->_debugprint( sprintf( "MemCache: sock %s got %s\n", serialize( $sock ), $k ) );
00442                         }
00443                 }
00444 
00445                 $value = false;
00446                 if ( isset( $val[$key] ) ) {
00447                         $value = $val[$key];
00448                 }
00449                 wfProfileOut( __METHOD__ );
00450                 return $value;
00451         }
00452 
00453         // }}}
00454         // {{{ get_multi()
00455 
00463         public function get_multi( $keys ) {
00464                 if ( !$this->_active ) {
00465                         return false;
00466                 }
00467 
00468                 if ( isset( $this->stats['get_multi'] ) ) {
00469                         $this->stats['get_multi']++;
00470                 } else {
00471                         $this->stats['get_multi'] = 1;
00472                 }
00473                 $sock_keys = array();
00474 
00475                 foreach ( $keys as $key ) {
00476                         $sock = $this->get_sock( $key );
00477                         if ( !is_resource( $sock ) ) {
00478                                 continue;
00479                         }
00480                         $key = is_array( $key ) ? $key[1] : $key;
00481                         if ( !isset( $sock_keys[$sock] ) ) {
00482                                 $sock_keys[$sock] = array();
00483                                 $socks[] = $sock;
00484                         }
00485                         $sock_keys[$sock][] = $key;
00486                 }
00487 
00488                 // Send out the requests
00489                 foreach ( $socks as $sock ) {
00490                         $cmd = 'get';
00491                         foreach ( $sock_keys[$sock] as $key ) {
00492                                 $cmd .= ' ' . $key;
00493                         }
00494                         $cmd .= "\r\n";
00495 
00496                         if ( $this->_safe_fwrite( $sock, $cmd, strlen( $cmd ) ) ) {
00497                                 $gather[] = $sock;
00498                         } else {
00499                                 $this->_dead_sock( $sock );
00500                         }
00501                 }
00502 
00503                 // Parse responses
00504                 $val = array();
00505                 foreach ( $gather as $sock ) {
00506                         $this->_load_items( $sock, $val );
00507                 }
00508 
00509                 if ( $this->_debug ) {
00510                         foreach ( $val as $k => $v ) {
00511                                 $this->_debugprint( sprintf( "MemCache: got %s\n", $k ) );
00512                         }
00513                 }
00514 
00515                 return $val;
00516         }
00517 
00518         // }}}
00519         // {{{ incr()
00520 
00531         public function incr( $key, $amt = 1 ) {
00532                 return $this->_incrdecr( 'incr', $key, $amt );
00533         }
00534 
00535         // }}}
00536         // {{{ replace()
00537 
00551         public function replace( $key, $value, $exp = 0 ) {
00552                 return $this->_set( 'replace', $key, $value, $exp );
00553         }
00554 
00555         // }}}
00556         // {{{ run_command()
00557 
00573         public function run_command( $sock, $cmd ) {
00574                 if ( !is_resource( $sock ) ) {
00575                         return array();
00576                 }
00577 
00578                 if ( !$this->_safe_fwrite( $sock, $cmd, strlen( $cmd ) ) ) {
00579                         return array();
00580                 }
00581 
00582                 while ( true ) {
00583                         $res = fgets( $sock );
00584                         $ret[] = $res;
00585                         if ( preg_match( '/^END/', $res ) ) {
00586                                 break;
00587                         }
00588                         if ( strlen( $res ) == 0 ) {
00589                                 break;
00590                         }
00591                 }
00592                 return $ret;
00593         }
00594 
00595         // }}}
00596         // {{{ set()
00597 
00612         public function set( $key, $value, $exp = 0 ) {
00613                 return $this->_set( 'set', $key, $value, $exp );
00614         }
00615 
00616         // }}}
00617         // {{{ set_compress_threshold()
00618 
00624         public function set_compress_threshold( $thresh ) {
00625                 $this->_compress_threshold = $thresh;
00626         }
00627 
00628         // }}}
00629         // {{{ set_debug()
00630 
00638         public function set_debug( $dbg ) {
00639                 $this->_debug = $dbg;
00640         }
00641 
00642         // }}}
00643         // {{{ set_servers()
00644 
00652         public function set_servers( $list ) {
00653                 $this->_servers = $list;
00654                 $this->_active = count( $list );
00655                 $this->_buckets = null;
00656                 $this->_bucketcount = 0;
00657 
00658                 $this->_single_sock = null;
00659                 if ( $this->_active == 1 ) {
00660                         $this->_single_sock = $this->_servers[0];
00661                 }
00662         }
00663 
00670         public function set_timeout( $seconds, $microseconds ) {
00671                 $this->_timeout_seconds = $seconds;
00672                 $this->_timeout_microseconds = $microseconds;
00673         }
00674 
00675         // }}}
00676         // }}}
00677         // {{{ private methods
00678         // {{{ _close_sock()
00679 
00687         function _close_sock( $sock ) {
00688                 $host = array_search( $sock, $this->_cache_sock );
00689                 fclose( $this->_cache_sock[$host] );
00690                 unset( $this->_cache_sock[$host] );
00691         }
00692 
00693         // }}}
00694         // {{{ _connect_sock()
00695 
00705         function _connect_sock( &$sock, $host ) {
00706                 list( $ip, $port ) = explode( ':', $host );
00707                 $sock = false;
00708                 $timeout = $this->_connect_timeout;
00709                 $errno = $errstr = null;
00710                 for( $i = 0; !$sock && $i < $this->_connect_attempts; $i++ ) {
00711                         wfSuppressWarnings();
00712                         if ( $this->_persistent == 1 ) {
00713                                 $sock = pfsockopen( $ip, $port, $errno, $errstr, $timeout );
00714                         } else {
00715                                 $sock = fsockopen( $ip, $port, $errno, $errstr, $timeout );
00716                         }
00717                         wfRestoreWarnings();
00718                 }
00719                 if ( !$sock ) {
00720                         if ( $this->_debug ) {
00721                                 $this->_debugprint( "Error connecting to $host: $errstr\n" );
00722                         }
00723                         return false;
00724                 }
00725 
00726                 // Initialise timeout
00727                 stream_set_timeout( $sock, $this->_timeout_seconds, $this->_timeout_microseconds );
00728 
00729                 return true;
00730         }
00731 
00732         // }}}
00733         // {{{ _dead_sock()
00734 
00742         function _dead_sock( $sock ) {
00743                 $host = array_search( $sock, $this->_cache_sock );
00744                 $this->_dead_host( $host );
00745         }
00746 
00747         function _dead_host( $host ) {
00748                 $parts = explode( ':', $host );
00749                 $ip = $parts[0];
00750                 $this->_host_dead[$ip] = time() + 30 + intval( rand( 0, 10 ) );
00751                 $this->_host_dead[$host] = $this->_host_dead[$ip];
00752                 unset( $this->_cache_sock[$host] );
00753         }
00754 
00755         // }}}
00756         // {{{ get_sock()
00757 
00766         function get_sock( $key ) {
00767                 if ( !$this->_active ) {
00768                         return false;
00769                 }
00770 
00771                 if ( $this->_single_sock !== null ) {
00772                         $this->_flush_read_buffer( $this->_single_sock );
00773                         return $this->sock_to_host( $this->_single_sock );
00774                 }
00775 
00776                 $hv = is_array( $key ) ? intval( $key[0] ) : $this->_hashfunc( $key );
00777 
00778                 if ( $this->_buckets === null ) {
00779                         foreach ( $this->_servers as $v ) {
00780                                 if ( is_array( $v ) ) {
00781                                         for( $i = 0; $i < $v[1]; $i++ ) {
00782                                                 $bu[] = $v[0];
00783                                         }
00784                                 } else {
00785                                         $bu[] = $v;
00786                                 }
00787                         }
00788                         $this->_buckets = $bu;
00789                         $this->_bucketcount = count( $bu );
00790                 }
00791 
00792                 $realkey = is_array( $key ) ? $key[1] : $key;
00793                 for( $tries = 0; $tries < 20; $tries++ ) {
00794                         $host = $this->_buckets[$hv % $this->_bucketcount];
00795                         $sock = $this->sock_to_host( $host );
00796                         if ( is_resource( $sock ) ) {
00797                                 $this->_flush_read_buffer( $sock );
00798                                 return $sock;
00799                         }
00800                         $hv = $this->_hashfunc( $hv . $realkey );
00801                 }
00802 
00803                 return false;
00804         }
00805 
00806         // }}}
00807         // {{{ _hashfunc()
00808 
00817         function _hashfunc( $key ) {
00818                 # Hash function must on [0,0x7ffffff]
00819                 # We take the first 31 bits of the MD5 hash, which unlike the hash
00820                 # function used in a previous version of this client, works
00821                 return hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff;
00822         }
00823 
00824         // }}}
00825         // {{{ _incrdecr()
00826 
00837         function _incrdecr( $cmd, $key, $amt = 1 ) {
00838                 if ( !$this->_active ) {
00839                         return null;
00840                 }
00841 
00842                 $sock = $this->get_sock( $key );
00843                 if ( !is_resource( $sock ) ) {
00844                         return null;
00845                 }
00846 
00847                 $key = is_array( $key ) ? $key[1] : $key;
00848                 if ( isset( $this->stats[$cmd] ) ) {
00849                         $this->stats[$cmd]++;
00850                 } else {
00851                         $this->stats[$cmd] = 1;
00852                 }
00853                 if ( !$this->_safe_fwrite( $sock, "$cmd $key $amt\r\n" ) ) {
00854                         return $this->_dead_sock( $sock );
00855                 }
00856 
00857                 $line = fgets( $sock );
00858                 $match = array();
00859                 if ( !preg_match( '/^(\d+)/', $line, $match ) ) {
00860                         return null;
00861                 }
00862                 return $match[1];
00863         }
00864 
00865         // }}}
00866         // {{{ _load_items()
00867 
00876         function _load_items( $sock, &$ret ) {
00877                 while ( 1 ) {
00878                         $decl = fgets( $sock );
00879                         if ( $decl == "END\r\n" ) {
00880                                 return true;
00881                         } elseif ( preg_match( '/^VALUE (\S+) (\d+) (\d+)\r\n$/', $decl, $match ) ) {
00882                                 list( $rkey, $flags, $len ) = array( $match[1], $match[2], $match[3] );
00883                                 $bneed = $len + 2;
00884                                 $offset = 0;
00885 
00886                                 while ( $bneed > 0 ) {
00887                                         $data = fread( $sock, $bneed );
00888                                         $n = strlen( $data );
00889                                         if ( $n == 0 ) {
00890                                                 break;
00891                                         }
00892                                         $offset += $n;
00893                                         $bneed -= $n;
00894                                         if ( isset( $ret[$rkey] ) ) {
00895                                                 $ret[$rkey] .= $data;
00896                                         } else {
00897                                                 $ret[$rkey] = $data;
00898                                         }
00899                                 }
00900 
00901                                 if ( $offset != $len + 2 ) {
00902                                         // Something is borked!
00903                                         if ( $this->_debug ) {
00904                                                 $this->_debugprint( sprintf( "Something is borked!  key %s expecting %d got %d length\n", $rkey, $len + 2, $offset ) );
00905                                         }
00906 
00907                                         unset( $ret[$rkey] );
00908                                         $this->_close_sock( $sock );
00909                                         return false;
00910                                 }
00911 
00912                                 if ( $this->_have_zlib && $flags & self::COMPRESSED ) {
00913                                         $ret[$rkey] = gzuncompress( $ret[$rkey] );
00914                                 }
00915 
00916                                 $ret[$rkey] = rtrim( $ret[$rkey] );
00917 
00918                                 if ( $flags & self::SERIALIZED ) {
00919                                         $ret[$rkey] = unserialize( $ret[$rkey] );
00920                                 }
00921 
00922                         } else {
00923                                 $this->_debugprint( "Error parsing memcached response\n" );
00924                                 return 0;
00925                         }
00926                 }
00927         }
00928 
00929         // }}}
00930         // {{{ _set()
00931 
00947         function _set( $cmd, $key, $val, $exp ) {
00948                 if ( !$this->_active ) {
00949                         return false;
00950                 }
00951 
00952                 $sock = $this->get_sock( $key );
00953                 if ( !is_resource( $sock ) ) {
00954                         return false;
00955                 }
00956 
00957                 if ( isset( $this->stats[$cmd] ) ) {
00958                         $this->stats[$cmd]++;
00959                 } else {
00960                         $this->stats[$cmd] = 1;
00961                 }
00962 
00963                 // TTLs higher than 30 days will be detected as absolute TTLs
00964                 // (UNIX timestamps), and will result in the cache entry being
00965                 // discarded immediately because the expiry is in the past.
00966                 // Clamp expiries >30d at 30d, unless they're >=1e9 in which
00967                 // case they are likely to really be absolute (1e9 = 2011-09-09)
00968                 if ( $exp > 2592000 && $exp < 1000000000 ) {
00969                         $exp = 2592000;
00970                 }
00971 
00972                 $flags = 0;
00973 
00974                 if ( !is_scalar( $val ) ) {
00975                         $val = serialize( $val );
00976                         $flags |= self::SERIALIZED;
00977                         if ( $this->_debug ) {
00978                                 $this->_debugprint( sprintf( "client: serializing data as it is not scalar\n" ) );
00979                         }
00980                 }
00981 
00982                 $len = strlen( $val );
00983 
00984                 if ( $this->_have_zlib && $this->_compress_enable &&
00985                          $this->_compress_threshold && $len >= $this->_compress_threshold )
00986                 {
00987                         $c_val = gzcompress( $val, 9 );
00988                         $c_len = strlen( $c_val );
00989 
00990                         if ( $c_len < $len * ( 1 - self::COMPRESSION_SAVINGS ) ) {
00991                                 if ( $this->_debug ) {
00992                                         $this->_debugprint( sprintf( "client: compressing data; was %d bytes is now %d bytes\n", $len, $c_len ) );
00993                                 }
00994                                 $val = $c_val;
00995                                 $len = $c_len;
00996                                 $flags |= self::COMPRESSED;
00997                         }
00998                 }
00999                 if ( !$this->_safe_fwrite( $sock, "$cmd $key $flags $exp $len\r\n$val\r\n" ) ) {
01000                         return $this->_dead_sock( $sock );
01001                 }
01002 
01003                 $line = trim( fgets( $sock ) );
01004 
01005                 if ( $this->_debug ) {
01006                         $this->_debugprint( sprintf( "%s %s (%s)\n", $cmd, $key, $line ) );
01007                 }
01008                 if ( $line == "STORED" ) {
01009                         return true;
01010                 }
01011                 return false;
01012         }
01013 
01014         // }}}
01015         // {{{ sock_to_host()
01016 
01025         function sock_to_host( $host ) {
01026                 if ( isset( $this->_cache_sock[$host] ) ) {
01027                         return $this->_cache_sock[$host];
01028                 }
01029 
01030                 $sock = null;
01031                 $now = time();
01032                 list( $ip, /* $port */) = explode( ':', $host );
01033                 if ( isset( $this->_host_dead[$host] ) && $this->_host_dead[$host] > $now ||
01034                         isset( $this->_host_dead[$ip] ) && $this->_host_dead[$ip] > $now
01035                 ) {
01036                         return null;
01037                 }
01038 
01039                 if ( !$this->_connect_sock( $sock, $host ) ) {
01040                         return $this->_dead_host( $host );
01041                 }
01042 
01043                 // Do not buffer writes
01044                 stream_set_write_buffer( $sock, 0 );
01045 
01046                 $this->_cache_sock[$host] = $sock;
01047 
01048                 return $this->_cache_sock[$host];
01049         }
01050 
01051         function _debugprint( $str ) {
01052                 print( $str );
01053         }
01054 
01060         /*
01061         function _safe_fwrite( $f, $buf, $len = false ) {
01062                 stream_set_blocking( $f, 0 );
01063 
01064                 if ( $len === false ) {
01065                         wfDebug( "Writing " . strlen( $buf ) . " bytes\n" );
01066                         $bytesWritten = fwrite( $f, $buf );
01067                 } else {
01068                         wfDebug( "Writing $len bytes\n" );
01069                         $bytesWritten = fwrite( $f, $buf, $len );
01070                 }
01071                 $n = stream_select( $r = null, $w = array( $f ), $e = null, 10, 0 );
01072                 #   $this->_timeout_seconds, $this->_timeout_microseconds );
01073 
01074                 wfDebug( "stream_select returned $n\n" );
01075                 stream_set_blocking( $f, 1 );
01076                 return $n == 1;
01077                 return $bytesWritten;
01078         }*/
01079 
01083         function _safe_fwrite( $f, $buf, $len = false ) {
01084                 if ( $len === false ) {
01085                         $bytesWritten = fwrite( $f, $buf );
01086                 } else {
01087                         $bytesWritten = fwrite( $f, $buf, $len );
01088                 }
01089                 return $bytesWritten;
01090         }
01091 
01095         function _flush_read_buffer( $f ) {
01096                 if ( !is_resource( $f ) ) {
01097                         return;
01098                 }
01099                 $n = stream_select( $r = array( $f ), $w = null, $e = null, 0, 0 );
01100                 while ( $n == 1 && !feof( $f ) ) {
01101                         fread( $f, 1024 );
01102                         $n = stream_select( $r = array( $f ), $w = null, $e = null, 0, 0 );
01103                 }
01104         }
01105 
01106         // }}}
01107         // }}}
01108         // }}}
01109 }
01110 
01111 // vim: sts=3 sw=3 et
01112 
01113 // }}}
01114 
01115 class MemCachedClientforWiki extends MWMemcached {
01116         function _debugprint( $text ) {
01117                 wfDebug( "memcached: $text" );
01118         }
01119 }