MediaWiki
REL1_19
|
00001 <?php 00064 // {{{ requirements 00065 // }}} 00066 00067 // {{{ class MWMemcached 00074 class MWMemcached { 00075 // {{{ properties 00076 // {{{ public 00077 00078 // {{{ constants 00079 // {{{ flags 00080 00084 const SERIALIZED = 1; 00085 00089 const COMPRESSED = 2; 00090 00091 // }}} 00092 00096 const COMPRESSION_SAVINGS = 0.20; 00097 00098 // }}} 00099 00100 00107 var $stats; 00108 00109 // }}} 00110 // {{{ private 00111 00118 var $_cache_sock; 00119 00126 var $_debug; 00127 00134 var $_host_dead; 00135 00142 var $_have_zlib; 00143 00150 var $_compress_enable; 00151 00158 var $_compress_threshold; 00159 00166 var $_persistent; 00167 00174 var $_single_sock; 00175 00182 var $_servers; 00183 00190 var $_buckets; 00191 00198 var $_bucketcount; 00199 00206 var $_active; 00207 00214 var $_timeout_seconds; 00215 00222 var $_timeout_microseconds; 00223 00227 var $_connect_timeout; 00228 00232 var $_connect_attempts; 00233 00234 // }}} 00235 // }}} 00236 // {{{ methods 00237 // {{{ public functions 00238 // {{{ memcached() 00239 00247 public function __construct( $args ) { 00248 $this->set_servers( isset( $args['servers'] ) ? $args['servers'] : array() ); 00249 $this->_debug = isset( $args['debug'] ) ? $args['debug'] : false; 00250 $this->stats = array(); 00251 $this->_compress_threshold = isset( $args['compress_threshold'] ) ? $args['compress_threshold'] : 0; 00252 $this->_persistent = isset( $args['persistent'] ) ? $args['persistent'] : false; 00253 $this->_compress_enable = true; 00254 $this->_have_zlib = function_exists( 'gzcompress' ); 00255 00256 $this->_cache_sock = array(); 00257 $this->_host_dead = array(); 00258 00259 $this->_timeout_seconds = 0; 00260 $this->_timeout_microseconds = isset( $args['timeout'] ) ? $args['timeout'] : 100000; 00261 00262 $this->_connect_timeout = isset( $args['connect_timeout'] ) ? $args['connect_timeout'] : 0.1; 00263 $this->_connect_attempts = 2; 00264 } 00265 00266 // }}} 00267 // {{{ add() 00268 00283 public function add( $key, $val, $exp = 0 ) { 00284 return $this->_set( 'add', $key, $val, $exp ); 00285 } 00286 00287 // }}} 00288 // {{{ decr() 00289 00298 public function decr( $key, $amt = 1 ) { 00299 return $this->_incrdecr( 'decr', $key, $amt ); 00300 } 00301 00302 // }}} 00303 // {{{ delete() 00304 00313 public function delete( $key, $time = 0 ) { 00314 if ( !$this->_active ) { 00315 return false; 00316 } 00317 00318 $sock = $this->get_sock( $key ); 00319 if ( !is_resource( $sock ) ) { 00320 return false; 00321 } 00322 00323 $key = is_array( $key ) ? $key[1] : $key; 00324 00325 if ( isset( $this->stats['delete'] ) ) { 00326 $this->stats['delete']++; 00327 } else { 00328 $this->stats['delete'] = 1; 00329 } 00330 $cmd = "delete $key $time\r\n"; 00331 if( !$this->_safe_fwrite( $sock, $cmd, strlen( $cmd ) ) ) { 00332 $this->_dead_sock( $sock ); 00333 return false; 00334 } 00335 $res = trim( fgets( $sock ) ); 00336 00337 if ( $this->_debug ) { 00338 $this->_debugprint( sprintf( "MemCache: delete %s (%s)\n", $key, $res ) ); 00339 } 00340 00341 if ( $res == "DELETED" ) { 00342 return true; 00343 } 00344 return false; 00345 } 00346 00347 public function lock( $key, $timeout = 0 ) { 00348 /* stub */ 00349 return true; 00350 } 00351 00352 public function unlock( $key ) { 00353 /* stub */ 00354 return true; 00355 } 00356 00357 // }}} 00358 // {{{ disconnect_all() 00359 00363 public function disconnect_all() { 00364 foreach ( $this->_cache_sock as $sock ) { 00365 fclose( $sock ); 00366 } 00367 00368 $this->_cache_sock = array(); 00369 } 00370 00371 // }}} 00372 // {{{ enable_compress() 00373 00379 public function enable_compress( $enable ) { 00380 $this->_compress_enable = $enable; 00381 } 00382 00383 // }}} 00384 // {{{ forget_dead_hosts() 00385 00389 public function forget_dead_hosts() { 00390 $this->_host_dead = array(); 00391 } 00392 00393 // }}} 00394 // {{{ get() 00395 00403 public function get( $key ) { 00404 wfProfileIn( __METHOD__ ); 00405 00406 if ( $this->_debug ) { 00407 $this->_debugprint( "get($key)\n" ); 00408 } 00409 00410 if ( !$this->_active ) { 00411 wfProfileOut( __METHOD__ ); 00412 return false; 00413 } 00414 00415 $sock = $this->get_sock( $key ); 00416 00417 if ( !is_resource( $sock ) ) { 00418 wfProfileOut( __METHOD__ ); 00419 return false; 00420 } 00421 00422 $key = is_array( $key ) ? $key[1] : $key; 00423 if ( isset( $this->stats['get'] ) ) { 00424 $this->stats['get']++; 00425 } else { 00426 $this->stats['get'] = 1; 00427 } 00428 00429 $cmd = "get $key\r\n"; 00430 if ( !$this->_safe_fwrite( $sock, $cmd, strlen( $cmd ) ) ) { 00431 $this->_dead_sock( $sock ); 00432 wfProfileOut( __METHOD__ ); 00433 return false; 00434 } 00435 00436 $val = array(); 00437 $this->_load_items( $sock, $val ); 00438 00439 if ( $this->_debug ) { 00440 foreach ( $val as $k => $v ) { 00441 $this->_debugprint( sprintf( "MemCache: sock %s got %s\n", serialize( $sock ), $k ) ); 00442 } 00443 } 00444 00445 $value = false; 00446 if ( isset( $val[$key] ) ) { 00447 $value = $val[$key]; 00448 } 00449 wfProfileOut( __METHOD__ ); 00450 return $value; 00451 } 00452 00453 // }}} 00454 // {{{ get_multi() 00455 00463 public function get_multi( $keys ) { 00464 if ( !$this->_active ) { 00465 return false; 00466 } 00467 00468 if ( isset( $this->stats['get_multi'] ) ) { 00469 $this->stats['get_multi']++; 00470 } else { 00471 $this->stats['get_multi'] = 1; 00472 } 00473 $sock_keys = array(); 00474 00475 foreach ( $keys as $key ) { 00476 $sock = $this->get_sock( $key ); 00477 if ( !is_resource( $sock ) ) { 00478 continue; 00479 } 00480 $key = is_array( $key ) ? $key[1] : $key; 00481 if ( !isset( $sock_keys[$sock] ) ) { 00482 $sock_keys[$sock] = array(); 00483 $socks[] = $sock; 00484 } 00485 $sock_keys[$sock][] = $key; 00486 } 00487 00488 // Send out the requests 00489 foreach ( $socks as $sock ) { 00490 $cmd = 'get'; 00491 foreach ( $sock_keys[$sock] as $key ) { 00492 $cmd .= ' ' . $key; 00493 } 00494 $cmd .= "\r\n"; 00495 00496 if ( $this->_safe_fwrite( $sock, $cmd, strlen( $cmd ) ) ) { 00497 $gather[] = $sock; 00498 } else { 00499 $this->_dead_sock( $sock ); 00500 } 00501 } 00502 00503 // Parse responses 00504 $val = array(); 00505 foreach ( $gather as $sock ) { 00506 $this->_load_items( $sock, $val ); 00507 } 00508 00509 if ( $this->_debug ) { 00510 foreach ( $val as $k => $v ) { 00511 $this->_debugprint( sprintf( "MemCache: got %s\n", $k ) ); 00512 } 00513 } 00514 00515 return $val; 00516 } 00517 00518 // }}} 00519 // {{{ incr() 00520 00531 public function incr( $key, $amt = 1 ) { 00532 return $this->_incrdecr( 'incr', $key, $amt ); 00533 } 00534 00535 // }}} 00536 // {{{ replace() 00537 00551 public function replace( $key, $value, $exp = 0 ) { 00552 return $this->_set( 'replace', $key, $value, $exp ); 00553 } 00554 00555 // }}} 00556 // {{{ run_command() 00557 00573 public function run_command( $sock, $cmd ) { 00574 if ( !is_resource( $sock ) ) { 00575 return array(); 00576 } 00577 00578 if ( !$this->_safe_fwrite( $sock, $cmd, strlen( $cmd ) ) ) { 00579 return array(); 00580 } 00581 00582 while ( true ) { 00583 $res = fgets( $sock ); 00584 $ret[] = $res; 00585 if ( preg_match( '/^END/', $res ) ) { 00586 break; 00587 } 00588 if ( strlen( $res ) == 0 ) { 00589 break; 00590 } 00591 } 00592 return $ret; 00593 } 00594 00595 // }}} 00596 // {{{ set() 00597 00612 public function set( $key, $value, $exp = 0 ) { 00613 return $this->_set( 'set', $key, $value, $exp ); 00614 } 00615 00616 // }}} 00617 // {{{ set_compress_threshold() 00618 00624 public function set_compress_threshold( $thresh ) { 00625 $this->_compress_threshold = $thresh; 00626 } 00627 00628 // }}} 00629 // {{{ set_debug() 00630 00638 public function set_debug( $dbg ) { 00639 $this->_debug = $dbg; 00640 } 00641 00642 // }}} 00643 // {{{ set_servers() 00644 00652 public function set_servers( $list ) { 00653 $this->_servers = $list; 00654 $this->_active = count( $list ); 00655 $this->_buckets = null; 00656 $this->_bucketcount = 0; 00657 00658 $this->_single_sock = null; 00659 if ( $this->_active == 1 ) { 00660 $this->_single_sock = $this->_servers[0]; 00661 } 00662 } 00663 00670 public function set_timeout( $seconds, $microseconds ) { 00671 $this->_timeout_seconds = $seconds; 00672 $this->_timeout_microseconds = $microseconds; 00673 } 00674 00675 // }}} 00676 // }}} 00677 // {{{ private methods 00678 // {{{ _close_sock() 00679 00687 function _close_sock( $sock ) { 00688 $host = array_search( $sock, $this->_cache_sock ); 00689 fclose( $this->_cache_sock[$host] ); 00690 unset( $this->_cache_sock[$host] ); 00691 } 00692 00693 // }}} 00694 // {{{ _connect_sock() 00695 00705 function _connect_sock( &$sock, $host ) { 00706 list( $ip, $port ) = explode( ':', $host ); 00707 $sock = false; 00708 $timeout = $this->_connect_timeout; 00709 $errno = $errstr = null; 00710 for( $i = 0; !$sock && $i < $this->_connect_attempts; $i++ ) { 00711 wfSuppressWarnings(); 00712 if ( $this->_persistent == 1 ) { 00713 $sock = pfsockopen( $ip, $port, $errno, $errstr, $timeout ); 00714 } else { 00715 $sock = fsockopen( $ip, $port, $errno, $errstr, $timeout ); 00716 } 00717 wfRestoreWarnings(); 00718 } 00719 if ( !$sock ) { 00720 if ( $this->_debug ) { 00721 $this->_debugprint( "Error connecting to $host: $errstr\n" ); 00722 } 00723 return false; 00724 } 00725 00726 // Initialise timeout 00727 stream_set_timeout( $sock, $this->_timeout_seconds, $this->_timeout_microseconds ); 00728 00729 return true; 00730 } 00731 00732 // }}} 00733 // {{{ _dead_sock() 00734 00742 function _dead_sock( $sock ) { 00743 $host = array_search( $sock, $this->_cache_sock ); 00744 $this->_dead_host( $host ); 00745 } 00746 00747 function _dead_host( $host ) { 00748 $parts = explode( ':', $host ); 00749 $ip = $parts[0]; 00750 $this->_host_dead[$ip] = time() + 30 + intval( rand( 0, 10 ) ); 00751 $this->_host_dead[$host] = $this->_host_dead[$ip]; 00752 unset( $this->_cache_sock[$host] ); 00753 } 00754 00755 // }}} 00756 // {{{ get_sock() 00757 00766 function get_sock( $key ) { 00767 if ( !$this->_active ) { 00768 return false; 00769 } 00770 00771 if ( $this->_single_sock !== null ) { 00772 $this->_flush_read_buffer( $this->_single_sock ); 00773 return $this->sock_to_host( $this->_single_sock ); 00774 } 00775 00776 $hv = is_array( $key ) ? intval( $key[0] ) : $this->_hashfunc( $key ); 00777 00778 if ( $this->_buckets === null ) { 00779 foreach ( $this->_servers as $v ) { 00780 if ( is_array( $v ) ) { 00781 for( $i = 0; $i < $v[1]; $i++ ) { 00782 $bu[] = $v[0]; 00783 } 00784 } else { 00785 $bu[] = $v; 00786 } 00787 } 00788 $this->_buckets = $bu; 00789 $this->_bucketcount = count( $bu ); 00790 } 00791 00792 $realkey = is_array( $key ) ? $key[1] : $key; 00793 for( $tries = 0; $tries < 20; $tries++ ) { 00794 $host = $this->_buckets[$hv % $this->_bucketcount]; 00795 $sock = $this->sock_to_host( $host ); 00796 if ( is_resource( $sock ) ) { 00797 $this->_flush_read_buffer( $sock ); 00798 return $sock; 00799 } 00800 $hv = $this->_hashfunc( $hv . $realkey ); 00801 } 00802 00803 return false; 00804 } 00805 00806 // }}} 00807 // {{{ _hashfunc() 00808 00817 function _hashfunc( $key ) { 00818 # Hash function must on [0,0x7ffffff] 00819 # We take the first 31 bits of the MD5 hash, which unlike the hash 00820 # function used in a previous version of this client, works 00821 return hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff; 00822 } 00823 00824 // }}} 00825 // {{{ _incrdecr() 00826 00837 function _incrdecr( $cmd, $key, $amt = 1 ) { 00838 if ( !$this->_active ) { 00839 return null; 00840 } 00841 00842 $sock = $this->get_sock( $key ); 00843 if ( !is_resource( $sock ) ) { 00844 return null; 00845 } 00846 00847 $key = is_array( $key ) ? $key[1] : $key; 00848 if ( isset( $this->stats[$cmd] ) ) { 00849 $this->stats[$cmd]++; 00850 } else { 00851 $this->stats[$cmd] = 1; 00852 } 00853 if ( !$this->_safe_fwrite( $sock, "$cmd $key $amt\r\n" ) ) { 00854 return $this->_dead_sock( $sock ); 00855 } 00856 00857 $line = fgets( $sock ); 00858 $match = array(); 00859 if ( !preg_match( '/^(\d+)/', $line, $match ) ) { 00860 return null; 00861 } 00862 return $match[1]; 00863 } 00864 00865 // }}} 00866 // {{{ _load_items() 00867 00876 function _load_items( $sock, &$ret ) { 00877 while ( 1 ) { 00878 $decl = fgets( $sock ); 00879 if ( $decl == "END\r\n" ) { 00880 return true; 00881 } elseif ( preg_match( '/^VALUE (\S+) (\d+) (\d+)\r\n$/', $decl, $match ) ) { 00882 list( $rkey, $flags, $len ) = array( $match[1], $match[2], $match[3] ); 00883 $bneed = $len + 2; 00884 $offset = 0; 00885 00886 while ( $bneed > 0 ) { 00887 $data = fread( $sock, $bneed ); 00888 $n = strlen( $data ); 00889 if ( $n == 0 ) { 00890 break; 00891 } 00892 $offset += $n; 00893 $bneed -= $n; 00894 if ( isset( $ret[$rkey] ) ) { 00895 $ret[$rkey] .= $data; 00896 } else { 00897 $ret[$rkey] = $data; 00898 } 00899 } 00900 00901 if ( $offset != $len + 2 ) { 00902 // Something is borked! 00903 if ( $this->_debug ) { 00904 $this->_debugprint( sprintf( "Something is borked! key %s expecting %d got %d length\n", $rkey, $len + 2, $offset ) ); 00905 } 00906 00907 unset( $ret[$rkey] ); 00908 $this->_close_sock( $sock ); 00909 return false; 00910 } 00911 00912 if ( $this->_have_zlib && $flags & self::COMPRESSED ) { 00913 $ret[$rkey] = gzuncompress( $ret[$rkey] ); 00914 } 00915 00916 $ret[$rkey] = rtrim( $ret[$rkey] ); 00917 00918 if ( $flags & self::SERIALIZED ) { 00919 $ret[$rkey] = unserialize( $ret[$rkey] ); 00920 } 00921 00922 } else { 00923 $this->_debugprint( "Error parsing memcached response\n" ); 00924 return 0; 00925 } 00926 } 00927 } 00928 00929 // }}} 00930 // {{{ _set() 00931 00947 function _set( $cmd, $key, $val, $exp ) { 00948 if ( !$this->_active ) { 00949 return false; 00950 } 00951 00952 $sock = $this->get_sock( $key ); 00953 if ( !is_resource( $sock ) ) { 00954 return false; 00955 } 00956 00957 if ( isset( $this->stats[$cmd] ) ) { 00958 $this->stats[$cmd]++; 00959 } else { 00960 $this->stats[$cmd] = 1; 00961 } 00962 00963 // TTLs higher than 30 days will be detected as absolute TTLs 00964 // (UNIX timestamps), and will result in the cache entry being 00965 // discarded immediately because the expiry is in the past. 00966 // Clamp expiries >30d at 30d, unless they're >=1e9 in which 00967 // case they are likely to really be absolute (1e9 = 2011-09-09) 00968 if ( $exp > 2592000 && $exp < 1000000000 ) { 00969 $exp = 2592000; 00970 } 00971 00972 $flags = 0; 00973 00974 if ( !is_scalar( $val ) ) { 00975 $val = serialize( $val ); 00976 $flags |= self::SERIALIZED; 00977 if ( $this->_debug ) { 00978 $this->_debugprint( sprintf( "client: serializing data as it is not scalar\n" ) ); 00979 } 00980 } 00981 00982 $len = strlen( $val ); 00983 00984 if ( $this->_have_zlib && $this->_compress_enable && 00985 $this->_compress_threshold && $len >= $this->_compress_threshold ) 00986 { 00987 $c_val = gzcompress( $val, 9 ); 00988 $c_len = strlen( $c_val ); 00989 00990 if ( $c_len < $len * ( 1 - self::COMPRESSION_SAVINGS ) ) { 00991 if ( $this->_debug ) { 00992 $this->_debugprint( sprintf( "client: compressing data; was %d bytes is now %d bytes\n", $len, $c_len ) ); 00993 } 00994 $val = $c_val; 00995 $len = $c_len; 00996 $flags |= self::COMPRESSED; 00997 } 00998 } 00999 if ( !$this->_safe_fwrite( $sock, "$cmd $key $flags $exp $len\r\n$val\r\n" ) ) { 01000 return $this->_dead_sock( $sock ); 01001 } 01002 01003 $line = trim( fgets( $sock ) ); 01004 01005 if ( $this->_debug ) { 01006 $this->_debugprint( sprintf( "%s %s (%s)\n", $cmd, $key, $line ) ); 01007 } 01008 if ( $line == "STORED" ) { 01009 return true; 01010 } 01011 return false; 01012 } 01013 01014 // }}} 01015 // {{{ sock_to_host() 01016 01025 function sock_to_host( $host ) { 01026 if ( isset( $this->_cache_sock[$host] ) ) { 01027 return $this->_cache_sock[$host]; 01028 } 01029 01030 $sock = null; 01031 $now = time(); 01032 list( $ip, /* $port */) = explode( ':', $host ); 01033 if ( isset( $this->_host_dead[$host] ) && $this->_host_dead[$host] > $now || 01034 isset( $this->_host_dead[$ip] ) && $this->_host_dead[$ip] > $now 01035 ) { 01036 return null; 01037 } 01038 01039 if ( !$this->_connect_sock( $sock, $host ) ) { 01040 return $this->_dead_host( $host ); 01041 } 01042 01043 // Do not buffer writes 01044 stream_set_write_buffer( $sock, 0 ); 01045 01046 $this->_cache_sock[$host] = $sock; 01047 01048 return $this->_cache_sock[$host]; 01049 } 01050 01051 function _debugprint( $str ) { 01052 print( $str ); 01053 } 01054 01060 /* 01061 function _safe_fwrite( $f, $buf, $len = false ) { 01062 stream_set_blocking( $f, 0 ); 01063 01064 if ( $len === false ) { 01065 wfDebug( "Writing " . strlen( $buf ) . " bytes\n" ); 01066 $bytesWritten = fwrite( $f, $buf ); 01067 } else { 01068 wfDebug( "Writing $len bytes\n" ); 01069 $bytesWritten = fwrite( $f, $buf, $len ); 01070 } 01071 $n = stream_select( $r = null, $w = array( $f ), $e = null, 10, 0 ); 01072 # $this->_timeout_seconds, $this->_timeout_microseconds ); 01073 01074 wfDebug( "stream_select returned $n\n" ); 01075 stream_set_blocking( $f, 1 ); 01076 return $n == 1; 01077 return $bytesWritten; 01078 }*/ 01079 01083 function _safe_fwrite( $f, $buf, $len = false ) { 01084 if ( $len === false ) { 01085 $bytesWritten = fwrite( $f, $buf ); 01086 } else { 01087 $bytesWritten = fwrite( $f, $buf, $len ); 01088 } 01089 return $bytesWritten; 01090 } 01091 01095 function _flush_read_buffer( $f ) { 01096 if ( !is_resource( $f ) ) { 01097 return; 01098 } 01099 $n = stream_select( $r = array( $f ), $w = null, $e = null, 0, 0 ); 01100 while ( $n == 1 && !feof( $f ) ) { 01101 fread( $f, 1024 ); 01102 $n = stream_select( $r = array( $f ), $w = null, $e = null, 0, 0 ); 01103 } 01104 } 01105 01106 // }}} 01107 // }}} 01108 // }}} 01109 } 01110 01111 // vim: sts=3 sw=3 et 01112 01113 // }}} 01114 01115 class MemCachedClientforWiki extends MWMemcached { 01116 function _debugprint( $text ) { 01117 wfDebug( "memcached: $text" ); 01118 } 01119 }