MediaWiki
REL1_20
|
00001 <?php 00066 // {{{ requirements 00067 // }}} 00068 00069 // {{{ class MWMemcached 00076 class MWMemcached { 00077 // {{{ properties 00078 // {{{ public 00079 00080 // {{{ constants 00081 // {{{ flags 00082 00086 const SERIALIZED = 1; 00087 00091 const COMPRESSED = 2; 00092 00093 // }}} 00094 00098 const COMPRESSION_SAVINGS = 0.20; 00099 00100 // }}} 00101 00102 00109 var $stats; 00110 00111 // }}} 00112 // {{{ private 00113 00120 var $_cache_sock; 00121 00128 var $_debug; 00129 00136 var $_host_dead; 00137 00144 var $_have_zlib; 00145 00152 var $_compress_enable; 00153 00160 var $_compress_threshold; 00161 00168 var $_persistent; 00169 00176 var $_single_sock; 00177 00184 var $_servers; 00185 00192 var $_buckets; 00193 00200 var $_bucketcount; 00201 00208 var $_active; 00209 00216 var $_timeout_seconds; 00217 00224 var $_timeout_microseconds; 00225 00229 var $_connect_timeout; 00230 00234 var $_connect_attempts; 00235 00236 // }}} 00237 // }}} 00238 // {{{ methods 00239 // {{{ public functions 00240 // {{{ memcached() 00241 00249 public function __construct( $args ) { 00250 $this->set_servers( isset( $args['servers'] ) ? $args['servers'] : array() ); 00251 $this->_debug = isset( $args['debug'] ) ? $args['debug'] : false; 00252 $this->stats = array(); 00253 $this->_compress_threshold = isset( $args['compress_threshold'] ) ? $args['compress_threshold'] : 0; 00254 $this->_persistent = isset( $args['persistent'] ) ? $args['persistent'] : false; 00255 $this->_compress_enable = true; 00256 $this->_have_zlib = function_exists( 'gzcompress' ); 00257 00258 $this->_cache_sock = array(); 00259 $this->_host_dead = array(); 00260 00261 $this->_timeout_seconds = 0; 00262 $this->_timeout_microseconds = isset( $args['timeout'] ) ? $args['timeout'] : 500000; 00263 00264 $this->_connect_timeout = isset( $args['connect_timeout'] ) ? $args['connect_timeout'] : 0.1; 00265 $this->_connect_attempts = 2; 00266 } 00267 00268 // }}} 00269 // {{{ add() 00270 00285 public function add( $key, $val, $exp = 0 ) { 00286 return $this->_set( 'add', $key, $val, $exp ); 00287 } 00288 00289 // }}} 00290 // {{{ decr() 00291 00300 public function decr( $key, $amt = 1 ) { 00301 return $this->_incrdecr( 'decr', $key, $amt ); 00302 } 00303 00304 // }}} 00305 // {{{ delete() 00306 00315 public function delete( $key, $time = 0 ) { 00316 if ( !$this->_active ) { 00317 return false; 00318 } 00319 00320 $sock = $this->get_sock( $key ); 00321 if ( !is_resource( $sock ) ) { 00322 return false; 00323 } 00324 00325 $key = is_array( $key ) ? $key[1] : $key; 00326 00327 if ( isset( $this->stats['delete'] ) ) { 00328 $this->stats['delete']++; 00329 } else { 00330 $this->stats['delete'] = 1; 00331 } 00332 $cmd = "delete $key $time\r\n"; 00333 if( !$this->_fwrite( $sock, $cmd ) ) { 00334 return false; 00335 } 00336 $res = $this->_fgets( $sock ); 00337 00338 if ( $this->_debug ) { 00339 $this->_debugprint( sprintf( "MemCache: delete %s (%s)\n", $key, $res ) ); 00340 } 00341 00342 if ( $res == "DELETED" || $res == "NOT_FOUND" ) { 00343 return true; 00344 } 00345 00346 return false; 00347 } 00348 00354 public function lock( $key, $timeout = 0 ) { 00355 /* stub */ 00356 return true; 00357 } 00358 00363 public function unlock( $key ) { 00364 /* stub */ 00365 return true; 00366 } 00367 00368 // }}} 00369 // {{{ disconnect_all() 00370 00374 public function disconnect_all() { 00375 foreach ( $this->_cache_sock as $sock ) { 00376 fclose( $sock ); 00377 } 00378 00379 $this->_cache_sock = array(); 00380 } 00381 00382 // }}} 00383 // {{{ enable_compress() 00384 00390 public function enable_compress( $enable ) { 00391 $this->_compress_enable = $enable; 00392 } 00393 00394 // }}} 00395 // {{{ forget_dead_hosts() 00396 00400 public function forget_dead_hosts() { 00401 $this->_host_dead = array(); 00402 } 00403 00404 // }}} 00405 // {{{ get() 00406 00414 public function get( $key ) { 00415 wfProfileIn( __METHOD__ ); 00416 00417 if ( $this->_debug ) { 00418 $this->_debugprint( "get($key)\n" ); 00419 } 00420 00421 if ( !$this->_active ) { 00422 wfProfileOut( __METHOD__ ); 00423 return false; 00424 } 00425 00426 $sock = $this->get_sock( $key ); 00427 00428 if ( !is_resource( $sock ) ) { 00429 wfProfileOut( __METHOD__ ); 00430 return false; 00431 } 00432 00433 $key = is_array( $key ) ? $key[1] : $key; 00434 if ( isset( $this->stats['get'] ) ) { 00435 $this->stats['get']++; 00436 } else { 00437 $this->stats['get'] = 1; 00438 } 00439 00440 $cmd = "get $key\r\n"; 00441 if ( !$this->_fwrite( $sock, $cmd ) ) { 00442 wfProfileOut( __METHOD__ ); 00443 return false; 00444 } 00445 00446 $val = array(); 00447 $this->_load_items( $sock, $val ); 00448 00449 if ( $this->_debug ) { 00450 foreach ( $val as $k => $v ) { 00451 $this->_debugprint( sprintf( "MemCache: sock %s got %s\n", serialize( $sock ), $k ) ); 00452 } 00453 } 00454 00455 $value = false; 00456 if ( isset( $val[$key] ) ) { 00457 $value = $val[$key]; 00458 } 00459 wfProfileOut( __METHOD__ ); 00460 return $value; 00461 } 00462 00463 // }}} 00464 // {{{ get_multi() 00465 00473 public function get_multi( $keys ) { 00474 if ( !$this->_active ) { 00475 return false; 00476 } 00477 00478 if ( isset( $this->stats['get_multi'] ) ) { 00479 $this->stats['get_multi']++; 00480 } else { 00481 $this->stats['get_multi'] = 1; 00482 } 00483 $sock_keys = array(); 00484 $socks = array(); 00485 foreach ( $keys as $key ) { 00486 $sock = $this->get_sock( $key ); 00487 if ( !is_resource( $sock ) ) { 00488 continue; 00489 } 00490 $key = is_array( $key ) ? $key[1] : $key; 00491 if ( !isset( $sock_keys[$sock] ) ) { 00492 $sock_keys[ intval( $sock ) ] = array(); 00493 $socks[] = $sock; 00494 } 00495 $sock_keys[ intval( $sock ) ][] = $key; 00496 } 00497 00498 $gather = array(); 00499 // Send out the requests 00500 foreach ( $socks as $sock ) { 00501 $cmd = 'get'; 00502 foreach ( $sock_keys[ intval( $sock ) ] as $key ) { 00503 $cmd .= ' ' . $key; 00504 } 00505 $cmd .= "\r\n"; 00506 00507 if ( $this->_fwrite( $sock, $cmd ) ) { 00508 $gather[] = $sock; 00509 } 00510 } 00511 00512 // Parse responses 00513 $val = array(); 00514 foreach ( $gather as $sock ) { 00515 $this->_load_items( $sock, $val ); 00516 } 00517 00518 if ( $this->_debug ) { 00519 foreach ( $val as $k => $v ) { 00520 $this->_debugprint( sprintf( "MemCache: got %s\n", $k ) ); 00521 } 00522 } 00523 00524 return $val; 00525 } 00526 00527 // }}} 00528 // {{{ incr() 00529 00540 public function incr( $key, $amt = 1 ) { 00541 return $this->_incrdecr( 'incr', $key, $amt ); 00542 } 00543 00544 // }}} 00545 // {{{ replace() 00546 00560 public function replace( $key, $value, $exp = 0 ) { 00561 return $this->_set( 'replace', $key, $value, $exp ); 00562 } 00563 00564 // }}} 00565 // {{{ run_command() 00566 00576 public function run_command( $sock, $cmd ) { 00577 if ( !is_resource( $sock ) ) { 00578 return array(); 00579 } 00580 00581 if ( !$this->_fwrite( $sock, $cmd ) ) { 00582 return array(); 00583 } 00584 00585 $ret = array(); 00586 while ( true ) { 00587 $res = $this->_fgets( $sock ); 00588 $ret[] = $res; 00589 if ( preg_match( '/^END/', $res ) ) { 00590 break; 00591 } 00592 if ( strlen( $res ) == 0 ) { 00593 break; 00594 } 00595 } 00596 return $ret; 00597 } 00598 00599 // }}} 00600 // {{{ set() 00601 00616 public function set( $key, $value, $exp = 0 ) { 00617 return $this->_set( 'set', $key, $value, $exp ); 00618 } 00619 00620 // }}} 00621 // {{{ set_compress_threshold() 00622 00628 public function set_compress_threshold( $thresh ) { 00629 $this->_compress_threshold = $thresh; 00630 } 00631 00632 // }}} 00633 // {{{ set_debug() 00634 00642 public function set_debug( $dbg ) { 00643 $this->_debug = $dbg; 00644 } 00645 00646 // }}} 00647 // {{{ set_servers() 00648 00656 public function set_servers( $list ) { 00657 $this->_servers = $list; 00658 $this->_active = count( $list ); 00659 $this->_buckets = null; 00660 $this->_bucketcount = 0; 00661 00662 $this->_single_sock = null; 00663 if ( $this->_active == 1 ) { 00664 $this->_single_sock = $this->_servers[0]; 00665 } 00666 } 00667 00674 public function set_timeout( $seconds, $microseconds ) { 00675 $this->_timeout_seconds = $seconds; 00676 $this->_timeout_microseconds = $microseconds; 00677 } 00678 00679 // }}} 00680 // }}} 00681 // {{{ private methods 00682 // {{{ _close_sock() 00683 00691 function _close_sock( $sock ) { 00692 $host = array_search( $sock, $this->_cache_sock ); 00693 fclose( $this->_cache_sock[$host] ); 00694 unset( $this->_cache_sock[$host] ); 00695 } 00696 00697 // }}} 00698 // {{{ _connect_sock() 00699 00709 function _connect_sock( &$sock, $host ) { 00710 list( $ip, $port ) = explode( ':', $host ); 00711 $sock = false; 00712 $timeout = $this->_connect_timeout; 00713 $errno = $errstr = null; 00714 for( $i = 0; !$sock && $i < $this->_connect_attempts; $i++ ) { 00715 wfSuppressWarnings(); 00716 if ( $this->_persistent == 1 ) { 00717 $sock = pfsockopen( $ip, $port, $errno, $errstr, $timeout ); 00718 } else { 00719 $sock = fsockopen( $ip, $port, $errno, $errstr, $timeout ); 00720 } 00721 wfRestoreWarnings(); 00722 } 00723 if ( !$sock ) { 00724 $this->_error_log( "Error connecting to $host: $errstr\n" ); 00725 $this->_dead_host( $host ); 00726 return false; 00727 } 00728 00729 // Initialise timeout 00730 stream_set_timeout( $sock, $this->_timeout_seconds, $this->_timeout_microseconds ); 00731 00732 // If the connection was persistent, flush the read buffer in case there 00733 // was a previous incomplete request on this connection 00734 if ( $this->_persistent ) { 00735 $this->_flush_read_buffer( $sock ); 00736 } 00737 return true; 00738 } 00739 00740 // }}} 00741 // {{{ _dead_sock() 00742 00750 function _dead_sock( $sock ) { 00751 $host = array_search( $sock, $this->_cache_sock ); 00752 $this->_dead_host( $host ); 00753 } 00754 00758 function _dead_host( $host ) { 00759 $parts = explode( ':', $host ); 00760 $ip = $parts[0]; 00761 $this->_host_dead[$ip] = time() + 30 + intval( rand( 0, 10 ) ); 00762 $this->_host_dead[$host] = $this->_host_dead[$ip]; 00763 unset( $this->_cache_sock[$host] ); 00764 } 00765 00766 // }}} 00767 // {{{ get_sock() 00768 00777 function get_sock( $key ) { 00778 if ( !$this->_active ) { 00779 return false; 00780 } 00781 00782 if ( $this->_single_sock !== null ) { 00783 return $this->sock_to_host( $this->_single_sock ); 00784 } 00785 00786 $hv = is_array( $key ) ? intval( $key[0] ) : $this->_hashfunc( $key ); 00787 if ( $this->_buckets === null ) { 00788 $bu = array(); 00789 foreach ( $this->_servers as $v ) { 00790 if ( is_array( $v ) ) { 00791 for( $i = 0; $i < $v[1]; $i++ ) { 00792 $bu[] = $v[0]; 00793 } 00794 } else { 00795 $bu[] = $v; 00796 } 00797 } 00798 $this->_buckets = $bu; 00799 $this->_bucketcount = count( $bu ); 00800 } 00801 00802 $realkey = is_array( $key ) ? $key[1] : $key; 00803 for( $tries = 0; $tries < 20; $tries++ ) { 00804 $host = $this->_buckets[$hv % $this->_bucketcount]; 00805 $sock = $this->sock_to_host( $host ); 00806 if ( is_resource( $sock ) ) { 00807 return $sock; 00808 } 00809 $hv = $this->_hashfunc( $hv . $realkey ); 00810 } 00811 00812 return false; 00813 } 00814 00815 // }}} 00816 // {{{ _hashfunc() 00817 00826 function _hashfunc( $key ) { 00827 # Hash function must be in [0,0x7ffffff] 00828 # We take the first 31 bits of the MD5 hash, which unlike the hash 00829 # function used in a previous version of this client, works 00830 return hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff; 00831 } 00832 00833 // }}} 00834 // {{{ _incrdecr() 00835 00846 function _incrdecr( $cmd, $key, $amt = 1 ) { 00847 if ( !$this->_active ) { 00848 return null; 00849 } 00850 00851 $sock = $this->get_sock( $key ); 00852 if ( !is_resource( $sock ) ) { 00853 return null; 00854 } 00855 00856 $key = is_array( $key ) ? $key[1] : $key; 00857 if ( isset( $this->stats[$cmd] ) ) { 00858 $this->stats[$cmd]++; 00859 } else { 00860 $this->stats[$cmd] = 1; 00861 } 00862 if ( !$this->_fwrite( $sock, "$cmd $key $amt\r\n" ) ) { 00863 return null; 00864 } 00865 00866 $line = $this->_fgets( $sock ); 00867 $match = array(); 00868 if ( !preg_match( '/^(\d+)/', $line, $match ) ) { 00869 return null; 00870 } 00871 return $match[1]; 00872 } 00873 00874 // }}} 00875 // {{{ _load_items() 00876 00886 function _load_items( $sock, &$ret ) { 00887 while ( 1 ) { 00888 $decl = $this->_fgets( $sock ); 00889 if( $decl === false ) { 00890 return false; 00891 } elseif ( $decl == "END" ) { 00892 return true; 00893 } elseif ( preg_match( '/^VALUE (\S+) (\d+) (\d+)$/', $decl, $match ) ) { 00894 list( $rkey, $flags, $len ) = array( $match[1], $match[2], $match[3] ); 00895 $data = $this->_fread( $sock, $len + 2 ); 00896 if ( $data === false ) { 00897 return false; 00898 } 00899 if ( substr( $data, -2 ) !== "\r\n" ) { 00900 $this->_handle_error( $sock, 00901 'line ending missing from data block from $1' ); 00902 return false; 00903 } 00904 $data = substr( $data, 0, -2 ); 00905 $ret[$rkey] = $data; 00906 00907 if ( $this->_have_zlib && $flags & self::COMPRESSED ) { 00908 $ret[$rkey] = gzuncompress( $ret[$rkey] ); 00909 } 00910 00911 if ( $flags & self::SERIALIZED ) { 00912 $ret[$rkey] = unserialize( $ret[$rkey] ); 00913 } 00914 00915 } else { 00916 $this->_handle_error( $sock, 'Error parsing response from $1' ); 00917 return false; 00918 } 00919 } 00920 } 00921 00922 // }}} 00923 // {{{ _set() 00924 00940 function _set( $cmd, $key, $val, $exp ) { 00941 if ( !$this->_active ) { 00942 return false; 00943 } 00944 00945 $sock = $this->get_sock( $key ); 00946 if ( !is_resource( $sock ) ) { 00947 return false; 00948 } 00949 00950 if ( isset( $this->stats[$cmd] ) ) { 00951 $this->stats[$cmd]++; 00952 } else { 00953 $this->stats[$cmd] = 1; 00954 } 00955 00956 $flags = 0; 00957 00958 if ( !is_scalar( $val ) ) { 00959 $val = serialize( $val ); 00960 $flags |= self::SERIALIZED; 00961 if ( $this->_debug ) { 00962 $this->_debugprint( sprintf( "client: serializing data as it is not scalar\n" ) ); 00963 } 00964 } 00965 00966 $len = strlen( $val ); 00967 00968 if ( $this->_have_zlib && $this->_compress_enable && 00969 $this->_compress_threshold && $len >= $this->_compress_threshold ) 00970 { 00971 $c_val = gzcompress( $val, 9 ); 00972 $c_len = strlen( $c_val ); 00973 00974 if ( $c_len < $len * ( 1 - self::COMPRESSION_SAVINGS ) ) { 00975 if ( $this->_debug ) { 00976 $this->_debugprint( sprintf( "client: compressing data; was %d bytes is now %d bytes\n", $len, $c_len ) ); 00977 } 00978 $val = $c_val; 00979 $len = $c_len; 00980 $flags |= self::COMPRESSED; 00981 } 00982 } 00983 if ( !$this->_fwrite( $sock, "$cmd $key $flags $exp $len\r\n$val\r\n" ) ) { 00984 return false; 00985 } 00986 00987 $line = $this->_fgets( $sock ); 00988 00989 if ( $this->_debug ) { 00990 $this->_debugprint( sprintf( "%s %s (%s)\n", $cmd, $key, $line ) ); 00991 } 00992 if ( $line == "STORED" ) { 00993 return true; 00994 } 00995 return false; 00996 } 00997 00998 // }}} 00999 // {{{ sock_to_host() 01000 01009 function sock_to_host( $host ) { 01010 if ( isset( $this->_cache_sock[$host] ) ) { 01011 return $this->_cache_sock[$host]; 01012 } 01013 01014 $sock = null; 01015 $now = time(); 01016 list( $ip, /* $port */) = explode( ':', $host ); 01017 if ( isset( $this->_host_dead[$host] ) && $this->_host_dead[$host] > $now || 01018 isset( $this->_host_dead[$ip] ) && $this->_host_dead[$ip] > $now 01019 ) { 01020 return null; 01021 } 01022 01023 if ( !$this->_connect_sock( $sock, $host ) ) { 01024 return null; 01025 } 01026 01027 // Do not buffer writes 01028 stream_set_write_buffer( $sock, 0 ); 01029 01030 $this->_cache_sock[$host] = $sock; 01031 01032 return $this->_cache_sock[$host]; 01033 } 01034 01038 function _debugprint( $text ) { 01039 global $wgDebugLogGroups; 01040 if( !isset( $wgDebugLogGroups['memcached'] ) ) { 01041 # Prefix message since it will end up in main debug log file 01042 $text = "memcached: $text"; 01043 } 01044 wfDebugLog( 'memcached', $text ); 01045 } 01046 01050 function _error_log( $text ) { 01051 wfDebugLog( 'memcached-serious', "Memcached error: $text" ); 01052 } 01053 01061 function _fwrite( $sock, $buf ) { 01062 $bytesWritten = 0; 01063 $bufSize = strlen( $buf ); 01064 while ( $bytesWritten < $bufSize ) { 01065 $result = fwrite( $sock, $buf ); 01066 $data = stream_get_meta_data( $sock ); 01067 if ( $data['timed_out'] ) { 01068 $this->_handle_error( $sock, 'timeout writing to $1' ); 01069 return false; 01070 } 01071 // Contrary to the documentation, fwrite() returns zero on error in PHP 5.3. 01072 if ( $result === false || $result === 0 ) { 01073 $this->_handle_error( $sock, 'error writing to $1' ); 01074 return false; 01075 } 01076 $bytesWritten += $result; 01077 } 01078 01079 return true; 01080 } 01081 01085 function _handle_error( $sock, $msg ) { 01086 $peer = stream_socket_get_name( $sock, true ); 01087 if ( strval( $peer ) === '' ) { 01088 $peer = array_search( $sock, $this->_cache_sock ); 01089 if ( $peer === false ) { 01090 $peer = '[unknown host]'; 01091 } 01092 } 01093 $msg = str_replace( '$1', $peer, $msg ); 01094 $this->_error_log( "$msg\n" ); 01095 $this->_dead_sock( $sock ); 01096 } 01097 01106 function _fread( $sock, $len ) { 01107 $buf = ''; 01108 while ( $len > 0 ) { 01109 $result = fread( $sock, $len ); 01110 $data = stream_get_meta_data( $sock ); 01111 if ( $data['timed_out'] ) { 01112 $this->_handle_error( $sock, 'timeout reading from $1' ); 01113 return false; 01114 } 01115 if ( $result === false ) { 01116 $this->_handle_error( $sock, 'error reading buffer from $1' ); 01117 return false; 01118 } 01119 if ( $result === '' ) { 01120 // This will happen if the remote end of the socket is shut down 01121 $this->_handle_error( $sock, 'unexpected end of file reading from $1' ); 01122 return false; 01123 } 01124 $len -= strlen( $result ); 01125 $buf .= $result; 01126 } 01127 return $buf; 01128 } 01129 01137 function _fgets( $sock ) { 01138 $result = fgets( $sock ); 01139 // fgets() may return a partial line if there is a select timeout after 01140 // a successful recv(), so we have to check for a timeout even if we 01141 // got a string response. 01142 $data = stream_get_meta_data( $sock ); 01143 if ( $data['timed_out'] ) { 01144 $this->_handle_error( $sock, 'timeout reading line from $1' ); 01145 return false; 01146 } 01147 if ( $result === false ) { 01148 $this->_handle_error( $sock, 'error reading line from $1' ); 01149 return false; 01150 } 01151 if ( substr( $result, -2 ) === "\r\n" ) { 01152 $result = substr( $result, 0, -2 ); 01153 } elseif ( substr( $result, -1 ) === "\n" ) { 01154 $result = substr( $result, 0, -1 ); 01155 } else { 01156 $this->_handle_error( $sock, 'line ending missing in response from $1' ); 01157 return false; 01158 } 01159 return $result; 01160 } 01161 01166 function _flush_read_buffer( $f ) { 01167 if ( !is_resource( $f ) ) { 01168 return; 01169 } 01170 $n = stream_select( $r = array( $f ), $w = null, $e = null, 0, 0 ); 01171 while ( $n == 1 && !feof( $f ) ) { 01172 fread( $f, 1024 ); 01173 $n = stream_select( $r = array( $f ), $w = null, $e = null, 0, 0 ); 01174 } 01175 } 01176 01177 // }}} 01178 // }}} 01179 // }}} 01180 } 01181 01182 01183 // }}} 01184 01185 class MemCachedClientforWiki extends MWMemcached { 01186 }