MediaWiki
REL1_20
|
00001 <?php 00030 class SquidPurgeClient { 00031 var $host, $port, $ip; 00032 00033 var $readState = 'idle'; 00034 var $writeBuffer = ''; 00035 var $requests = array(); 00036 var $currentRequestIndex; 00037 00038 const EINTR = 4; 00039 const EAGAIN = 11; 00040 const EINPROGRESS = 115; 00041 const BUFFER_SIZE = 8192; 00042 00046 var $socket; 00047 00048 var $readBuffer; 00049 00050 var $bodyRemaining; 00051 00056 public function __construct( $server, $options = array() ) { 00057 $parts = explode( ':', $server, 2 ); 00058 $this->host = $parts[0]; 00059 $this->port = isset( $parts[1] ) ? $parts[1] : 80; 00060 } 00061 00068 protected function getSocket() { 00069 if ( $this->socket !== null ) { 00070 return $this->socket; 00071 } 00072 00073 $ip = $this->getIP(); 00074 if ( !$ip ) { 00075 $this->log( "DNS error" ); 00076 $this->markDown(); 00077 return false; 00078 } 00079 $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP ); 00080 socket_set_nonblock( $this->socket ); 00081 wfSuppressWarnings(); 00082 $ok = socket_connect( $this->socket, $ip, $this->port ); 00083 wfRestoreWarnings(); 00084 if ( !$ok ) { 00085 $error = socket_last_error( $this->socket ); 00086 if ( $error !== self::EINPROGRESS ) { 00087 $this->log( "connection error: " . socket_strerror( $error ) ); 00088 $this->markDown(); 00089 return false; 00090 } 00091 } 00092 00093 return $this->socket; 00094 } 00095 00100 public function getReadSocketsForSelect() { 00101 if ( $this->readState == 'idle' ) { 00102 return array(); 00103 } 00104 $socket = $this->getSocket(); 00105 if ( $socket === false ) { 00106 return array(); 00107 } 00108 return array( $socket ); 00109 } 00110 00115 public function getWriteSocketsForSelect() { 00116 if ( !strlen( $this->writeBuffer ) ) { 00117 return array(); 00118 } 00119 $socket = $this->getSocket(); 00120 if ( $socket === false ) { 00121 return array(); 00122 } 00123 return array( $socket ); 00124 } 00125 00130 protected function getIP() { 00131 if ( $this->ip === null ) { 00132 if ( IP::isIPv4( $this->host ) ) { 00133 $this->ip = $this->host; 00134 } elseif ( IP::isIPv6( $this->host ) ) { 00135 throw new MWException( '$wgSquidServers does not support IPv6' ); 00136 } else { 00137 wfSuppressWarnings(); 00138 $this->ip = gethostbyname( $this->host ); 00139 if ( $this->ip === $this->host ) { 00140 $this->ip = false; 00141 } 00142 wfRestoreWarnings(); 00143 } 00144 } 00145 return $this->ip; 00146 } 00147 00152 protected function markDown() { 00153 $this->close(); 00154 $this->socket = false; 00155 } 00156 00160 public function close() { 00161 if ( $this->socket ) { 00162 wfSuppressWarnings(); 00163 socket_set_block( $this->socket ); 00164 socket_shutdown( $this->socket ); 00165 socket_close( $this->socket ); 00166 wfRestoreWarnings(); 00167 } 00168 $this->socket = null; 00169 $this->readBuffer = ''; 00170 // Write buffer is kept since it may contain a request for the next socket 00171 } 00172 00178 public function queuePurge( $url ) { 00179 $url = SquidUpdate::expand( str_replace( "\n", '', $url ) ); 00180 $this->requests[] = "PURGE $url HTTP/1.0\r\n" . 00181 "Connection: Keep-Alive\r\n" . 00182 "Proxy-Connection: Keep-Alive\r\n" . 00183 "User-Agent: " . Http::userAgent() . ' ' . __CLASS__ . "\r\n\r\n"; 00184 if ( $this->currentRequestIndex === null ) { 00185 $this->nextRequest(); 00186 } 00187 } 00188 00192 public function isIdle() { 00193 return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle'; 00194 } 00195 00199 public function doWrites() { 00200 if ( !strlen( $this->writeBuffer ) ) { 00201 return; 00202 } 00203 $socket = $this->getSocket(); 00204 if ( !$socket ) { 00205 return; 00206 } 00207 00208 if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) { 00209 $buf = $this->writeBuffer; 00210 $flags = MSG_EOR; 00211 } else { 00212 $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE ); 00213 $flags = 0; 00214 } 00215 wfSuppressWarnings(); 00216 $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags ); 00217 wfRestoreWarnings(); 00218 00219 if ( $bytesSent === false ) { 00220 $error = socket_last_error( $socket ); 00221 if ( $error != self::EAGAIN && $error != self::EINTR ) { 00222 $this->log( 'write error: ' . socket_strerror( $error ) ); 00223 $this->markDown(); 00224 } 00225 return; 00226 } 00227 00228 $this->writeBuffer = substr( $this->writeBuffer, $bytesSent ); 00229 } 00230 00234 public function doReads() { 00235 $socket = $this->getSocket(); 00236 if ( !$socket ) { 00237 return; 00238 } 00239 00240 $buf = ''; 00241 wfSuppressWarnings(); 00242 $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 ); 00243 wfRestoreWarnings(); 00244 if ( $bytesRead === false ) { 00245 $error = socket_last_error( $socket ); 00246 if ( $error != self::EAGAIN && $error != self::EINTR ) { 00247 $this->log( 'read error: ' . socket_strerror( $error ) ); 00248 $this->markDown(); 00249 return; 00250 } 00251 } elseif ( $bytesRead === 0 ) { 00252 // Assume EOF 00253 $this->close(); 00254 return; 00255 } 00256 00257 $this->readBuffer .= $buf; 00258 while ( $this->socket && $this->processReadBuffer() === 'continue' ); 00259 } 00260 00265 protected function processReadBuffer() { 00266 switch ( $this->readState ) { 00267 case 'idle': 00268 return 'done'; 00269 case 'status': 00270 case 'header': 00271 $lines = explode( "\r\n", $this->readBuffer, 2 ); 00272 if ( count( $lines ) < 2 ) { 00273 return 'done'; 00274 } 00275 if ( $this->readState == 'status' ) { 00276 $this->processStatusLine( $lines[0] ); 00277 } else { // header 00278 $this->processHeaderLine( $lines[0] ); 00279 } 00280 $this->readBuffer = $lines[1]; 00281 return 'continue'; 00282 case 'body': 00283 if ( $this->bodyRemaining !== null ) { 00284 if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) { 00285 $this->bodyRemaining -= strlen( $this->readBuffer ); 00286 $this->readBuffer = ''; 00287 return 'done'; 00288 } else { 00289 $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining ); 00290 $this->bodyRemaining = 0; 00291 $this->nextRequest(); 00292 return 'continue'; 00293 } 00294 } else { 00295 // No content length, read all data to EOF 00296 $this->readBuffer = ''; 00297 return 'done'; 00298 } 00299 default: 00300 throw new MWException( __METHOD__.': unexpected state' ); 00301 } 00302 } 00303 00308 protected function processStatusLine( $line ) { 00309 if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) { 00310 $this->log( 'invalid status line' ); 00311 $this->markDown(); 00312 return; 00313 } 00314 list( , , , $status, $reason ) = $m; 00315 $status = intval( $status ); 00316 if ( $status !== 200 && $status !== 404 ) { 00317 $this->log( "unexpected status code: $status $reason" ); 00318 $this->markDown(); 00319 return; 00320 } 00321 $this->readState = 'header'; 00322 } 00323 00327 protected function processHeaderLine( $line ) { 00328 if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) { 00329 $this->bodyRemaining = intval( $m[1] ); 00330 } elseif ( $line === '' ) { 00331 $this->readState = 'body'; 00332 } 00333 } 00334 00335 protected function nextRequest() { 00336 if ( $this->currentRequestIndex !== null ) { 00337 unset( $this->requests[$this->currentRequestIndex] ); 00338 } 00339 if ( count( $this->requests ) ) { 00340 $this->readState = 'status'; 00341 $this->currentRequestIndex = key( $this->requests ); 00342 $this->writeBuffer = $this->requests[$this->currentRequestIndex]; 00343 } else { 00344 $this->readState = 'idle'; 00345 $this->currentRequestIndex = null; 00346 $this->writeBuffer = ''; 00347 } 00348 $this->bodyRemaining = null; 00349 } 00350 00354 protected function log( $msg ) { 00355 wfDebugLog( 'squid', __CLASS__." ($this->host): $msg\n" ); 00356 } 00357 } 00358 00359 class SquidPurgeClientPool { 00360 00364 var $clients = array(); 00365 var $timeout = 5; 00366 00370 function __construct( $options = array() ) { 00371 if ( isset( $options['timeout'] ) ) { 00372 $this->timeout = $options['timeout']; 00373 } 00374 } 00375 00380 public function addClient( $client ) { 00381 $this->clients[] = $client; 00382 } 00383 00384 public function run() { 00385 $done = false; 00386 $startTime = microtime( true ); 00387 while ( !$done ) { 00388 $readSockets = $writeSockets = array(); 00392 foreach ( $this->clients as $clientIndex => $client ) { 00393 $sockets = $client->getReadSocketsForSelect(); 00394 foreach ( $sockets as $i => $socket ) { 00395 $readSockets["$clientIndex/$i"] = $socket; 00396 } 00397 $sockets = $client->getWriteSocketsForSelect(); 00398 foreach ( $sockets as $i => $socket ) { 00399 $writeSockets["$clientIndex/$i"] = $socket; 00400 } 00401 } 00402 if ( !count( $readSockets ) && !count( $writeSockets ) ) { 00403 break; 00404 } 00405 $exceptSockets = null; 00406 $timeout = min( $startTime + $this->timeout - microtime( true ), 1 ); 00407 wfSuppressWarnings(); 00408 $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout ); 00409 wfRestoreWarnings(); 00410 if ( $numReady === false ) { 00411 wfDebugLog( 'squid', __METHOD__.': Error in stream_select: ' . 00412 socket_strerror( socket_last_error() ) . "\n" ); 00413 break; 00414 } 00415 // Check for timeout, use 1% tolerance since we aimed at having socket_select() 00416 // exit at precisely the overall timeout 00417 if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) { 00418 wfDebugLog( 'squid', __CLASS__.": timeout ({$this->timeout}s)\n" ); 00419 break; 00420 } elseif ( !$numReady ) { 00421 continue; 00422 } 00423 00424 foreach ( $readSockets as $key => $socket ) { 00425 list( $clientIndex, ) = explode( '/', $key ); 00426 $client = $this->clients[$clientIndex]; 00427 $client->doReads(); 00428 } 00429 foreach ( $writeSockets as $key => $socket ) { 00430 list( $clientIndex, ) = explode( '/', $key ); 00431 $client = $this->clients[$clientIndex]; 00432 $client->doWrites(); 00433 } 00434 00435 $done = true; 00436 foreach ( $this->clients as $client ) { 00437 if ( !$client->isIdle() ) { 00438 $done = false; 00439 } 00440 } 00441 } 00442 foreach ( $this->clients as $client ) { 00443 $client->close(); 00444 } 00445 } 00446 }