MediaWiki  REL1_22
SquidPurgeClient.php
Go to the documentation of this file.
00001 <?php
00030 class SquidPurgeClient {
00031     var $host, $port, $ip;
00032 
00033     var $readState = 'idle';
00034     var $writeBuffer = '';
00035     var $requests = array();
00036     var $currentRequestIndex;
00037 
00038     const EINTR = 4;
00039     const EAGAIN = 11;
00040     const EINPROGRESS = 115;
00041     const BUFFER_SIZE = 8192;
00042 
00046     var $socket;
00047 
00048     var $readBuffer;
00049 
00050     var $bodyRemaining;
00051 
00056     public function __construct( $server, $options = array() ) {
00057         $parts = explode( ':', $server, 2 );
00058         $this->host = $parts[0];
00059         $this->port = isset( $parts[1] ) ? $parts[1] : 80;
00060     }
00061 
00068     protected function getSocket() {
00069         if ( $this->socket !== null ) {
00070             return $this->socket;
00071         }
00072 
00073         $ip = $this->getIP();
00074         if ( !$ip ) {
00075             $this->log( "DNS error" );
00076             $this->markDown();
00077             return false;
00078         }
00079         $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP );
00080         socket_set_nonblock( $this->socket );
00081         wfSuppressWarnings();
00082         $ok = socket_connect( $this->socket, $ip, $this->port );
00083         wfRestoreWarnings();
00084         if ( !$ok ) {
00085             $error = socket_last_error( $this->socket );
00086             if ( $error !== self::EINPROGRESS ) {
00087                 $this->log( "connection error: " . socket_strerror( $error ) );
00088                 $this->markDown();
00089                 return false;
00090             }
00091         }
00092 
00093         return $this->socket;
00094     }
00095 
00100     public function getReadSocketsForSelect() {
00101         if ( $this->readState == 'idle' ) {
00102             return array();
00103         }
00104         $socket = $this->getSocket();
00105         if ( $socket === false ) {
00106             return array();
00107         }
00108         return array( $socket );
00109     }
00110 
00115     public function getWriteSocketsForSelect() {
00116         if ( !strlen( $this->writeBuffer ) ) {
00117             return array();
00118         }
00119         $socket = $this->getSocket();
00120         if ( $socket === false ) {
00121             return array();
00122         }
00123         return array( $socket );
00124     }
00125 
00130     protected function getIP() {
00131         if ( $this->ip === null ) {
00132             if ( IP::isIPv4( $this->host ) ) {
00133                 $this->ip = $this->host;
00134             } elseif ( IP::isIPv6( $this->host ) ) {
00135                 throw new MWException( '$wgSquidServers does not support IPv6' );
00136             } else {
00137                 wfSuppressWarnings();
00138                 $this->ip = gethostbyname( $this->host );
00139                 if ( $this->ip === $this->host ) {
00140                     $this->ip = false;
00141                 }
00142                 wfRestoreWarnings();
00143             }
00144         }
00145         return $this->ip;
00146     }
00147 
00152     protected function markDown() {
00153         $this->close();
00154         $this->socket = false;
00155     }
00156 
00160     public function close() {
00161         if ( $this->socket ) {
00162             wfSuppressWarnings();
00163             socket_set_block( $this->socket );
00164             socket_shutdown( $this->socket );
00165             socket_close( $this->socket );
00166             wfRestoreWarnings();
00167         }
00168         $this->socket = null;
00169         $this->readBuffer = '';
00170         // Write buffer is kept since it may contain a request for the next socket
00171     }
00172 
00178     public function queuePurge( $url ) {
00179         global $wgSquidPurgeUseHostHeader;
00180         $url = SquidUpdate::expand( str_replace( "\n", '', $url ) );
00181         $request = array();
00182         if ( $wgSquidPurgeUseHostHeader ) {
00183             $url = wfParseUrl( $url );
00184             $host = $url['host'];
00185             if ( isset( $url['port'] ) && strlen( $url['port'] ) > 0 ) {
00186                 $host .= ":" . $url['port'];
00187             }
00188             $path = $url['path'];
00189             if ( isset( $url['query'] ) && is_string( $url['query'] ) ) {
00190                 $path = wfAppendQuery( $path, $url['query'] );
00191             }
00192             $request[] = "PURGE $path HTTP/1.1";
00193             $request[] = "Host: $host";
00194         } else {
00195             $request[] = "PURGE $url HTTP/1.0";
00196         }
00197         $request[] = "Connection: Keep-Alive";
00198         $request[] = "Proxy-Connection: Keep-Alive";
00199         $request[] = "User-Agent: " . Http::userAgent() . ' ' . __CLASS__;
00200         // Two ''s to create \r\n\r\n
00201         $request[] = '';
00202         $request[] = '';
00203 
00204         $this->requests[] = implode( "\r\n", $request );
00205         if ( $this->currentRequestIndex === null ) {
00206             $this->nextRequest();
00207         }
00208     }
00209 
00213     public function isIdle() {
00214         return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle';
00215     }
00216 
00220     public function doWrites() {
00221         if ( !strlen( $this->writeBuffer ) ) {
00222             return;
00223         }
00224         $socket = $this->getSocket();
00225         if ( !$socket ) {
00226             return;
00227         }
00228 
00229         if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) {
00230             $buf = $this->writeBuffer;
00231             $flags = MSG_EOR;
00232         } else {
00233             $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE );
00234             $flags = 0;
00235         }
00236         wfSuppressWarnings();
00237         $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags );
00238         wfRestoreWarnings();
00239 
00240         if ( $bytesSent === false ) {
00241             $error = socket_last_error( $socket );
00242             if ( $error != self::EAGAIN && $error != self::EINTR ) {
00243                 $this->log( 'write error: ' . socket_strerror( $error ) );
00244                 $this->markDown();
00245             }
00246             return;
00247         }
00248 
00249         $this->writeBuffer = substr( $this->writeBuffer, $bytesSent );
00250     }
00251 
00255     public function doReads() {
00256         $socket = $this->getSocket();
00257         if ( !$socket ) {
00258             return;
00259         }
00260 
00261         $buf = '';
00262         wfSuppressWarnings();
00263         $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 );
00264         wfRestoreWarnings();
00265         if ( $bytesRead === false ) {
00266             $error = socket_last_error( $socket );
00267             if ( $error != self::EAGAIN && $error != self::EINTR ) {
00268                 $this->log( 'read error: ' . socket_strerror( $error ) );
00269                 $this->markDown();
00270                 return;
00271             }
00272         } elseif ( $bytesRead === 0 ) {
00273             // Assume EOF
00274             $this->close();
00275             return;
00276         }
00277 
00278         $this->readBuffer .= $buf;
00279         while ( $this->socket && $this->processReadBuffer() === 'continue' );
00280     }
00281 
00286     protected function processReadBuffer() {
00287         switch ( $this->readState ) {
00288         case 'idle':
00289             return 'done';
00290         case 'status':
00291         case 'header':
00292             $lines = explode( "\r\n", $this->readBuffer, 2 );
00293             if ( count( $lines ) < 2 ) {
00294                 return 'done';
00295             }
00296             if ( $this->readState == 'status' ) {
00297                 $this->processStatusLine( $lines[0] );
00298             } else { // header
00299                 $this->processHeaderLine( $lines[0] );
00300             }
00301             $this->readBuffer = $lines[1];
00302             return 'continue';
00303         case 'body':
00304             if ( $this->bodyRemaining !== null ) {
00305                 if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) {
00306                     $this->bodyRemaining -= strlen( $this->readBuffer );
00307                     $this->readBuffer = '';
00308                     return 'done';
00309                 } else {
00310                     $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining );
00311                     $this->bodyRemaining = 0;
00312                     $this->nextRequest();
00313                     return 'continue';
00314                 }
00315             } else {
00316                 // No content length, read all data to EOF
00317                 $this->readBuffer = '';
00318                 return 'done';
00319             }
00320         default:
00321             throw new MWException( __METHOD__ . ': unexpected state' );
00322         }
00323     }
00324 
00329     protected function processStatusLine( $line ) {
00330         if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) {
00331             $this->log( 'invalid status line' );
00332             $this->markDown();
00333             return;
00334         }
00335         list( , , , $status, $reason ) = $m;
00336         $status = intval( $status );
00337         if ( $status !== 200 && $status !== 404 ) {
00338             $this->log( "unexpected status code: $status $reason" );
00339             $this->markDown();
00340             return;
00341         }
00342         $this->readState = 'header';
00343     }
00344 
00348     protected function processHeaderLine( $line ) {
00349         if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) {
00350             $this->bodyRemaining = intval( $m[1] );
00351         } elseif ( $line === '' ) {
00352             $this->readState = 'body';
00353         }
00354     }
00355 
00356     protected function nextRequest() {
00357         if ( $this->currentRequestIndex !== null ) {
00358             unset( $this->requests[$this->currentRequestIndex] );
00359         }
00360         if ( count( $this->requests ) ) {
00361             $this->readState = 'status';
00362             $this->currentRequestIndex = key( $this->requests );
00363             $this->writeBuffer = $this->requests[$this->currentRequestIndex];
00364         } else {
00365             $this->readState = 'idle';
00366             $this->currentRequestIndex = null;
00367             $this->writeBuffer = '';
00368         }
00369         $this->bodyRemaining = null;
00370     }
00371 
00375     protected function log( $msg ) {
00376         wfDebugLog( 'squid', __CLASS__ . " ($this->host): $msg\n" );
00377     }
00378 }
00379 
00380 class SquidPurgeClientPool {
00381 
00385     var $clients = array();
00386     var $timeout = 5;
00387 
00391     function __construct( $options = array() ) {
00392         if ( isset( $options['timeout'] ) ) {
00393             $this->timeout = $options['timeout'];
00394         }
00395     }
00396 
00401     public function addClient( $client ) {
00402         $this->clients[] = $client;
00403     }
00404 
00405     public function run() {
00406         $done = false;
00407         $startTime = microtime( true );
00408         while ( !$done ) {
00409             $readSockets = $writeSockets = array();
00413             foreach ( $this->clients as $clientIndex => $client ) {
00414                 $sockets = $client->getReadSocketsForSelect();
00415                 foreach ( $sockets as $i => $socket ) {
00416                     $readSockets["$clientIndex/$i"] = $socket;
00417                 }
00418                 $sockets = $client->getWriteSocketsForSelect();
00419                 foreach ( $sockets as $i => $socket ) {
00420                     $writeSockets["$clientIndex/$i"] = $socket;
00421                 }
00422             }
00423             if ( !count( $readSockets ) && !count( $writeSockets ) ) {
00424                 break;
00425             }
00426             $exceptSockets = null;
00427             $timeout = min( $startTime + $this->timeout - microtime( true ), 1 );
00428             wfSuppressWarnings();
00429             $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout );
00430             wfRestoreWarnings();
00431             if ( $numReady === false ) {
00432                 wfDebugLog( 'squid', __METHOD__ . ': Error in stream_select: ' .
00433                     socket_strerror( socket_last_error() ) . "\n" );
00434                 break;
00435             }
00436             // Check for timeout, use 1% tolerance since we aimed at having socket_select()
00437             // exit at precisely the overall timeout
00438             if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) {
00439                 wfDebugLog( 'squid', __CLASS__ . ": timeout ({$this->timeout}s)\n" );
00440                 break;
00441             } elseif ( !$numReady ) {
00442                 continue;
00443             }
00444 
00445             foreach ( $readSockets as $key => $socket ) {
00446                 list( $clientIndex, ) = explode( '/', $key );
00447                 $client = $this->clients[$clientIndex];
00448                 $client->doReads();
00449             }
00450             foreach ( $writeSockets as $key => $socket ) {
00451                 list( $clientIndex, ) = explode( '/', $key );
00452                 $client = $this->clients[$clientIndex];
00453                 $client->doWrites();
00454             }
00455 
00456             $done = true;
00457             foreach ( $this->clients as $client ) {
00458                 if ( !$client->isIdle() ) {
00459                     $done = false;
00460                 }
00461             }
00462         }
00463         foreach ( $this->clients as $client ) {
00464             $client->close();
00465         }
00466     }
00467 }