MediaWiki
REL1_22
|
00001 <?php 00030 class SquidPurgeClient { 00031 var $host, $port, $ip; 00032 00033 var $readState = 'idle'; 00034 var $writeBuffer = ''; 00035 var $requests = array(); 00036 var $currentRequestIndex; 00037 00038 const EINTR = 4; 00039 const EAGAIN = 11; 00040 const EINPROGRESS = 115; 00041 const BUFFER_SIZE = 8192; 00042 00046 var $socket; 00047 00048 var $readBuffer; 00049 00050 var $bodyRemaining; 00051 00056 public function __construct( $server, $options = array() ) { 00057 $parts = explode( ':', $server, 2 ); 00058 $this->host = $parts[0]; 00059 $this->port = isset( $parts[1] ) ? $parts[1] : 80; 00060 } 00061 00068 protected function getSocket() { 00069 if ( $this->socket !== null ) { 00070 return $this->socket; 00071 } 00072 00073 $ip = $this->getIP(); 00074 if ( !$ip ) { 00075 $this->log( "DNS error" ); 00076 $this->markDown(); 00077 return false; 00078 } 00079 $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP ); 00080 socket_set_nonblock( $this->socket ); 00081 wfSuppressWarnings(); 00082 $ok = socket_connect( $this->socket, $ip, $this->port ); 00083 wfRestoreWarnings(); 00084 if ( !$ok ) { 00085 $error = socket_last_error( $this->socket ); 00086 if ( $error !== self::EINPROGRESS ) { 00087 $this->log( "connection error: " . socket_strerror( $error ) ); 00088 $this->markDown(); 00089 return false; 00090 } 00091 } 00092 00093 return $this->socket; 00094 } 00095 00100 public function getReadSocketsForSelect() { 00101 if ( $this->readState == 'idle' ) { 00102 return array(); 00103 } 00104 $socket = $this->getSocket(); 00105 if ( $socket === false ) { 00106 return array(); 00107 } 00108 return array( $socket ); 00109 } 00110 00115 public function getWriteSocketsForSelect() { 00116 if ( !strlen( $this->writeBuffer ) ) { 00117 return array(); 00118 } 00119 $socket = $this->getSocket(); 00120 if ( $socket === false ) { 00121 return array(); 00122 } 00123 return array( $socket ); 00124 } 00125 00130 protected function getIP() { 00131 if ( $this->ip === null ) { 00132 if ( IP::isIPv4( $this->host ) ) { 00133 $this->ip = $this->host; 00134 } elseif ( IP::isIPv6( $this->host ) ) { 00135 throw new MWException( '$wgSquidServers does not support IPv6' ); 00136 } else { 00137 wfSuppressWarnings(); 00138 $this->ip = gethostbyname( $this->host ); 00139 if ( $this->ip === $this->host ) { 00140 $this->ip = false; 00141 } 00142 wfRestoreWarnings(); 00143 } 00144 } 00145 return $this->ip; 00146 } 00147 00152 protected function markDown() { 00153 $this->close(); 00154 $this->socket = false; 00155 } 00156 00160 public function close() { 00161 if ( $this->socket ) { 00162 wfSuppressWarnings(); 00163 socket_set_block( $this->socket ); 00164 socket_shutdown( $this->socket ); 00165 socket_close( $this->socket ); 00166 wfRestoreWarnings(); 00167 } 00168 $this->socket = null; 00169 $this->readBuffer = ''; 00170 // Write buffer is kept since it may contain a request for the next socket 00171 } 00172 00178 public function queuePurge( $url ) { 00179 global $wgSquidPurgeUseHostHeader; 00180 $url = SquidUpdate::expand( str_replace( "\n", '', $url ) ); 00181 $request = array(); 00182 if ( $wgSquidPurgeUseHostHeader ) { 00183 $url = wfParseUrl( $url ); 00184 $host = $url['host']; 00185 if ( isset( $url['port'] ) && strlen( $url['port'] ) > 0 ) { 00186 $host .= ":" . $url['port']; 00187 } 00188 $path = $url['path']; 00189 if ( isset( $url['query'] ) && is_string( $url['query'] ) ) { 00190 $path = wfAppendQuery( $path, $url['query'] ); 00191 } 00192 $request[] = "PURGE $path HTTP/1.1"; 00193 $request[] = "Host: $host"; 00194 } else { 00195 $request[] = "PURGE $url HTTP/1.0"; 00196 } 00197 $request[] = "Connection: Keep-Alive"; 00198 $request[] = "Proxy-Connection: Keep-Alive"; 00199 $request[] = "User-Agent: " . Http::userAgent() . ' ' . __CLASS__; 00200 // Two ''s to create \r\n\r\n 00201 $request[] = ''; 00202 $request[] = ''; 00203 00204 $this->requests[] = implode( "\r\n", $request ); 00205 if ( $this->currentRequestIndex === null ) { 00206 $this->nextRequest(); 00207 } 00208 } 00209 00213 public function isIdle() { 00214 return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle'; 00215 } 00216 00220 public function doWrites() { 00221 if ( !strlen( $this->writeBuffer ) ) { 00222 return; 00223 } 00224 $socket = $this->getSocket(); 00225 if ( !$socket ) { 00226 return; 00227 } 00228 00229 if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) { 00230 $buf = $this->writeBuffer; 00231 $flags = MSG_EOR; 00232 } else { 00233 $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE ); 00234 $flags = 0; 00235 } 00236 wfSuppressWarnings(); 00237 $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags ); 00238 wfRestoreWarnings(); 00239 00240 if ( $bytesSent === false ) { 00241 $error = socket_last_error( $socket ); 00242 if ( $error != self::EAGAIN && $error != self::EINTR ) { 00243 $this->log( 'write error: ' . socket_strerror( $error ) ); 00244 $this->markDown(); 00245 } 00246 return; 00247 } 00248 00249 $this->writeBuffer = substr( $this->writeBuffer, $bytesSent ); 00250 } 00251 00255 public function doReads() { 00256 $socket = $this->getSocket(); 00257 if ( !$socket ) { 00258 return; 00259 } 00260 00261 $buf = ''; 00262 wfSuppressWarnings(); 00263 $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 ); 00264 wfRestoreWarnings(); 00265 if ( $bytesRead === false ) { 00266 $error = socket_last_error( $socket ); 00267 if ( $error != self::EAGAIN && $error != self::EINTR ) { 00268 $this->log( 'read error: ' . socket_strerror( $error ) ); 00269 $this->markDown(); 00270 return; 00271 } 00272 } elseif ( $bytesRead === 0 ) { 00273 // Assume EOF 00274 $this->close(); 00275 return; 00276 } 00277 00278 $this->readBuffer .= $buf; 00279 while ( $this->socket && $this->processReadBuffer() === 'continue' ); 00280 } 00281 00286 protected function processReadBuffer() { 00287 switch ( $this->readState ) { 00288 case 'idle': 00289 return 'done'; 00290 case 'status': 00291 case 'header': 00292 $lines = explode( "\r\n", $this->readBuffer, 2 ); 00293 if ( count( $lines ) < 2 ) { 00294 return 'done'; 00295 } 00296 if ( $this->readState == 'status' ) { 00297 $this->processStatusLine( $lines[0] ); 00298 } else { // header 00299 $this->processHeaderLine( $lines[0] ); 00300 } 00301 $this->readBuffer = $lines[1]; 00302 return 'continue'; 00303 case 'body': 00304 if ( $this->bodyRemaining !== null ) { 00305 if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) { 00306 $this->bodyRemaining -= strlen( $this->readBuffer ); 00307 $this->readBuffer = ''; 00308 return 'done'; 00309 } else { 00310 $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining ); 00311 $this->bodyRemaining = 0; 00312 $this->nextRequest(); 00313 return 'continue'; 00314 } 00315 } else { 00316 // No content length, read all data to EOF 00317 $this->readBuffer = ''; 00318 return 'done'; 00319 } 00320 default: 00321 throw new MWException( __METHOD__ . ': unexpected state' ); 00322 } 00323 } 00324 00329 protected function processStatusLine( $line ) { 00330 if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) { 00331 $this->log( 'invalid status line' ); 00332 $this->markDown(); 00333 return; 00334 } 00335 list( , , , $status, $reason ) = $m; 00336 $status = intval( $status ); 00337 if ( $status !== 200 && $status !== 404 ) { 00338 $this->log( "unexpected status code: $status $reason" ); 00339 $this->markDown(); 00340 return; 00341 } 00342 $this->readState = 'header'; 00343 } 00344 00348 protected function processHeaderLine( $line ) { 00349 if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) { 00350 $this->bodyRemaining = intval( $m[1] ); 00351 } elseif ( $line === '' ) { 00352 $this->readState = 'body'; 00353 } 00354 } 00355 00356 protected function nextRequest() { 00357 if ( $this->currentRequestIndex !== null ) { 00358 unset( $this->requests[$this->currentRequestIndex] ); 00359 } 00360 if ( count( $this->requests ) ) { 00361 $this->readState = 'status'; 00362 $this->currentRequestIndex = key( $this->requests ); 00363 $this->writeBuffer = $this->requests[$this->currentRequestIndex]; 00364 } else { 00365 $this->readState = 'idle'; 00366 $this->currentRequestIndex = null; 00367 $this->writeBuffer = ''; 00368 } 00369 $this->bodyRemaining = null; 00370 } 00371 00375 protected function log( $msg ) { 00376 wfDebugLog( 'squid', __CLASS__ . " ($this->host): $msg\n" ); 00377 } 00378 } 00379 00380 class SquidPurgeClientPool { 00381 00385 var $clients = array(); 00386 var $timeout = 5; 00387 00391 function __construct( $options = array() ) { 00392 if ( isset( $options['timeout'] ) ) { 00393 $this->timeout = $options['timeout']; 00394 } 00395 } 00396 00401 public function addClient( $client ) { 00402 $this->clients[] = $client; 00403 } 00404 00405 public function run() { 00406 $done = false; 00407 $startTime = microtime( true ); 00408 while ( !$done ) { 00409 $readSockets = $writeSockets = array(); 00413 foreach ( $this->clients as $clientIndex => $client ) { 00414 $sockets = $client->getReadSocketsForSelect(); 00415 foreach ( $sockets as $i => $socket ) { 00416 $readSockets["$clientIndex/$i"] = $socket; 00417 } 00418 $sockets = $client->getWriteSocketsForSelect(); 00419 foreach ( $sockets as $i => $socket ) { 00420 $writeSockets["$clientIndex/$i"] = $socket; 00421 } 00422 } 00423 if ( !count( $readSockets ) && !count( $writeSockets ) ) { 00424 break; 00425 } 00426 $exceptSockets = null; 00427 $timeout = min( $startTime + $this->timeout - microtime( true ), 1 ); 00428 wfSuppressWarnings(); 00429 $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout ); 00430 wfRestoreWarnings(); 00431 if ( $numReady === false ) { 00432 wfDebugLog( 'squid', __METHOD__ . ': Error in stream_select: ' . 00433 socket_strerror( socket_last_error() ) . "\n" ); 00434 break; 00435 } 00436 // Check for timeout, use 1% tolerance since we aimed at having socket_select() 00437 // exit at precisely the overall timeout 00438 if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) { 00439 wfDebugLog( 'squid', __CLASS__ . ": timeout ({$this->timeout}s)\n" ); 00440 break; 00441 } elseif ( !$numReady ) { 00442 continue; 00443 } 00444 00445 foreach ( $readSockets as $key => $socket ) { 00446 list( $clientIndex, ) = explode( '/', $key ); 00447 $client = $this->clients[$clientIndex]; 00448 $client->doReads(); 00449 } 00450 foreach ( $writeSockets as $key => $socket ) { 00451 list( $clientIndex, ) = explode( '/', $key ); 00452 $client = $this->clients[$clientIndex]; 00453 $client->doWrites(); 00454 } 00455 00456 $done = true; 00457 foreach ( $this->clients as $client ) { 00458 if ( !$client->isIdle() ) { 00459 $done = false; 00460 } 00461 } 00462 } 00463 foreach ( $this->clients as $client ) { 00464 $client->close(); 00465 } 00466 } 00467 }