MediaWiki
REL1_24
|
00001 <?php 00030 class SquidPurgeClient { 00032 protected $host; 00033 00035 protected $port; 00036 00038 protected $ip; 00039 00041 protected $readState = 'idle'; 00042 00044 protected $writeBuffer = ''; 00045 00047 protected $requests = array(); 00048 00050 protected $currentRequestIndex; 00051 00052 const EINTR = 4; 00053 const EAGAIN = 11; 00054 const EINPROGRESS = 115; 00055 const BUFFER_SIZE = 8192; 00056 00061 protected $socket; 00062 00064 protected $readBuffer; 00065 00067 protected $bodyRemaining; 00068 00073 public function __construct( $server, $options = array() ) { 00074 $parts = explode( ':', $server, 2 ); 00075 $this->host = $parts[0]; 00076 $this->port = isset( $parts[1] ) ? $parts[1] : 80; 00077 } 00078 00085 protected function getSocket() { 00086 if ( $this->socket !== null ) { 00087 return $this->socket; 00088 } 00089 00090 $ip = $this->getIP(); 00091 if ( !$ip ) { 00092 $this->log( "DNS error" ); 00093 $this->markDown(); 00094 return false; 00095 } 00096 $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP ); 00097 socket_set_nonblock( $this->socket ); 00098 wfSuppressWarnings(); 00099 $ok = socket_connect( $this->socket, $ip, $this->port ); 00100 wfRestoreWarnings(); 00101 if ( !$ok ) { 00102 $error = socket_last_error( $this->socket ); 00103 if ( $error !== self::EINPROGRESS ) { 00104 $this->log( "connection error: " . socket_strerror( $error ) ); 00105 $this->markDown(); 00106 return false; 00107 } 00108 } 00109 00110 return $this->socket; 00111 } 00112 00117 public function getReadSocketsForSelect() { 00118 if ( $this->readState == 'idle' ) { 00119 return array(); 00120 } 00121 $socket = $this->getSocket(); 00122 if ( $socket === false ) { 00123 return array(); 00124 } 00125 return array( $socket ); 00126 } 00127 00132 public function getWriteSocketsForSelect() { 00133 if ( !strlen( $this->writeBuffer ) ) { 00134 return array(); 00135 } 00136 $socket = $this->getSocket(); 00137 if ( $socket === false ) { 00138 return array(); 00139 } 00140 return array( $socket ); 00141 } 00142 00149 protected function getIP() { 00150 if ( $this->ip === null ) { 00151 if ( IP::isIPv4( $this->host ) ) { 00152 $this->ip = $this->host; 00153 } elseif ( IP::isIPv6( $this->host ) ) { 00154 throw new MWException( '$wgSquidServers does not support IPv6' ); 00155 } else { 00156 wfSuppressWarnings(); 00157 $this->ip = gethostbyname( $this->host ); 00158 if ( $this->ip === $this->host ) { 00159 $this->ip = false; 00160 } 00161 wfRestoreWarnings(); 00162 } 00163 } 00164 return $this->ip; 00165 } 00166 00171 protected function markDown() { 00172 $this->close(); 00173 $this->socket = false; 00174 } 00175 00179 public function close() { 00180 if ( $this->socket ) { 00181 wfSuppressWarnings(); 00182 socket_set_block( $this->socket ); 00183 socket_shutdown( $this->socket ); 00184 socket_close( $this->socket ); 00185 wfRestoreWarnings(); 00186 } 00187 $this->socket = null; 00188 $this->readBuffer = ''; 00189 // Write buffer is kept since it may contain a request for the next socket 00190 } 00191 00197 public function queuePurge( $url ) { 00198 global $wgSquidPurgeUseHostHeader; 00199 $url = SquidUpdate::expand( str_replace( "\n", '', $url ) ); 00200 $request = array(); 00201 if ( $wgSquidPurgeUseHostHeader ) { 00202 $url = wfParseUrl( $url ); 00203 $host = $url['host']; 00204 if ( isset( $url['port'] ) && strlen( $url['port'] ) > 0 ) { 00205 $host .= ":" . $url['port']; 00206 } 00207 $path = $url['path']; 00208 if ( isset( $url['query'] ) && is_string( $url['query'] ) ) { 00209 $path = wfAppendQuery( $path, $url['query'] ); 00210 } 00211 $request[] = "PURGE $path HTTP/1.1"; 00212 $request[] = "Host: $host"; 00213 } else { 00214 $request[] = "PURGE $url HTTP/1.0"; 00215 } 00216 $request[] = "Connection: Keep-Alive"; 00217 $request[] = "Proxy-Connection: Keep-Alive"; 00218 $request[] = "User-Agent: " . Http::userAgent() . ' ' . __CLASS__; 00219 // Two ''s to create \r\n\r\n 00220 $request[] = ''; 00221 $request[] = ''; 00222 00223 $this->requests[] = implode( "\r\n", $request ); 00224 if ( $this->currentRequestIndex === null ) { 00225 $this->nextRequest(); 00226 } 00227 } 00228 00232 public function isIdle() { 00233 return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle'; 00234 } 00235 00239 public function doWrites() { 00240 if ( !strlen( $this->writeBuffer ) ) { 00241 return; 00242 } 00243 $socket = $this->getSocket(); 00244 if ( !$socket ) { 00245 return; 00246 } 00247 00248 if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) { 00249 $buf = $this->writeBuffer; 00250 $flags = MSG_EOR; 00251 } else { 00252 $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE ); 00253 $flags = 0; 00254 } 00255 wfSuppressWarnings(); 00256 $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags ); 00257 wfRestoreWarnings(); 00258 00259 if ( $bytesSent === false ) { 00260 $error = socket_last_error( $socket ); 00261 if ( $error != self::EAGAIN && $error != self::EINTR ) { 00262 $this->log( 'write error: ' . socket_strerror( $error ) ); 00263 $this->markDown(); 00264 } 00265 return; 00266 } 00267 00268 $this->writeBuffer = substr( $this->writeBuffer, $bytesSent ); 00269 } 00270 00274 public function doReads() { 00275 $socket = $this->getSocket(); 00276 if ( !$socket ) { 00277 return; 00278 } 00279 00280 $buf = ''; 00281 wfSuppressWarnings(); 00282 $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 ); 00283 wfRestoreWarnings(); 00284 if ( $bytesRead === false ) { 00285 $error = socket_last_error( $socket ); 00286 if ( $error != self::EAGAIN && $error != self::EINTR ) { 00287 $this->log( 'read error: ' . socket_strerror( $error ) ); 00288 $this->markDown(); 00289 return; 00290 } 00291 } elseif ( $bytesRead === 0 ) { 00292 // Assume EOF 00293 $this->close(); 00294 return; 00295 } 00296 00297 $this->readBuffer .= $buf; 00298 while ( $this->socket && $this->processReadBuffer() === 'continue' ); 00299 } 00300 00305 protected function processReadBuffer() { 00306 switch ( $this->readState ) { 00307 case 'idle': 00308 return 'done'; 00309 case 'status': 00310 case 'header': 00311 $lines = explode( "\r\n", $this->readBuffer, 2 ); 00312 if ( count( $lines ) < 2 ) { 00313 return 'done'; 00314 } 00315 if ( $this->readState == 'status' ) { 00316 $this->processStatusLine( $lines[0] ); 00317 } else { // header 00318 $this->processHeaderLine( $lines[0] ); 00319 } 00320 $this->readBuffer = $lines[1]; 00321 return 'continue'; 00322 case 'body': 00323 if ( $this->bodyRemaining !== null ) { 00324 if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) { 00325 $this->bodyRemaining -= strlen( $this->readBuffer ); 00326 $this->readBuffer = ''; 00327 return 'done'; 00328 } else { 00329 $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining ); 00330 $this->bodyRemaining = 0; 00331 $this->nextRequest(); 00332 return 'continue'; 00333 } 00334 } else { 00335 // No content length, read all data to EOF 00336 $this->readBuffer = ''; 00337 return 'done'; 00338 } 00339 default: 00340 throw new MWException( __METHOD__ . ': unexpected state' ); 00341 } 00342 } 00343 00347 protected function processStatusLine( $line ) { 00348 if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) { 00349 $this->log( 'invalid status line' ); 00350 $this->markDown(); 00351 return; 00352 } 00353 list( , , , $status, $reason ) = $m; 00354 $status = intval( $status ); 00355 if ( $status !== 200 && $status !== 404 ) { 00356 $this->log( "unexpected status code: $status $reason" ); 00357 $this->markDown(); 00358 return; 00359 } 00360 $this->readState = 'header'; 00361 } 00362 00366 protected function processHeaderLine( $line ) { 00367 if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) { 00368 $this->bodyRemaining = intval( $m[1] ); 00369 } elseif ( $line === '' ) { 00370 $this->readState = 'body'; 00371 } 00372 } 00373 00374 protected function nextRequest() { 00375 if ( $this->currentRequestIndex !== null ) { 00376 unset( $this->requests[$this->currentRequestIndex] ); 00377 } 00378 if ( count( $this->requests ) ) { 00379 $this->readState = 'status'; 00380 $this->currentRequestIndex = key( $this->requests ); 00381 $this->writeBuffer = $this->requests[$this->currentRequestIndex]; 00382 } else { 00383 $this->readState = 'idle'; 00384 $this->currentRequestIndex = null; 00385 $this->writeBuffer = ''; 00386 } 00387 $this->bodyRemaining = null; 00388 } 00389 00393 protected function log( $msg ) { 00394 wfDebugLog( 'squid', __CLASS__ . " ($this->host): $msg" ); 00395 } 00396 } 00397 00398 class SquidPurgeClientPool { 00400 protected $clients = array(); 00401 00403 protected $timeout = 5; 00404 00408 function __construct( $options = array() ) { 00409 if ( isset( $options['timeout'] ) ) { 00410 $this->timeout = $options['timeout']; 00411 } 00412 } 00413 00418 public function addClient( $client ) { 00419 $this->clients[] = $client; 00420 } 00421 00422 public function run() { 00423 $done = false; 00424 $startTime = microtime( true ); 00425 while ( !$done ) { 00426 $readSockets = $writeSockets = array(); 00430 foreach ( $this->clients as $clientIndex => $client ) { 00431 $sockets = $client->getReadSocketsForSelect(); 00432 foreach ( $sockets as $i => $socket ) { 00433 $readSockets["$clientIndex/$i"] = $socket; 00434 } 00435 $sockets = $client->getWriteSocketsForSelect(); 00436 foreach ( $sockets as $i => $socket ) { 00437 $writeSockets["$clientIndex/$i"] = $socket; 00438 } 00439 } 00440 if ( !count( $readSockets ) && !count( $writeSockets ) ) { 00441 break; 00442 } 00443 $exceptSockets = null; 00444 $timeout = min( $startTime + $this->timeout - microtime( true ), 1 ); 00445 wfSuppressWarnings(); 00446 $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout ); 00447 wfRestoreWarnings(); 00448 if ( $numReady === false ) { 00449 wfDebugLog( 'squid', __METHOD__ . ': Error in stream_select: ' . 00450 socket_strerror( socket_last_error() ) . "\n" ); 00451 break; 00452 } 00453 // Check for timeout, use 1% tolerance since we aimed at having socket_select() 00454 // exit at precisely the overall timeout 00455 if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) { 00456 wfDebugLog( 'squid', __CLASS__ . ": timeout ({$this->timeout}s)\n" ); 00457 break; 00458 } elseif ( !$numReady ) { 00459 continue; 00460 } 00461 00462 foreach ( $readSockets as $key => $socket ) { 00463 list( $clientIndex, ) = explode( '/', $key ); 00464 $client = $this->clients[$clientIndex]; 00465 $client->doReads(); 00466 } 00467 foreach ( $writeSockets as $key => $socket ) { 00468 list( $clientIndex, ) = explode( '/', $key ); 00469 $client = $this->clients[$clientIndex]; 00470 $client->doWrites(); 00471 } 00472 00473 $done = true; 00474 foreach ( $this->clients as $client ) { 00475 if ( !$client->isIdle() ) { 00476 $done = false; 00477 } 00478 } 00479 } 00480 foreach ( $this->clients as $client ) { 00481 $client->close(); 00482 } 00483 } 00484 }